Skip to content

Commit

Permalink
Feat/pd mode improve (sofastack#486)
Browse files Browse the repository at this point in the history
* Allows regions to configure their own serverList
  • Loading branch information
fengjiachun authored Jul 10, 2020
1 parent 8dc7729 commit a5ce313
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rhea.client.pd.HeartbeatSender;
Expand Down Expand Up @@ -154,7 +156,10 @@ public synchronized boolean init(final StoreEngineOptions opts) {
for (final RegionEngineOptions rOpts : rOptsList) {
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
rOpts.setServerAddress(serverAddress);
rOpts.setInitialServerList(opts.getInitialServerList());
if (Strings.isBlank(rOpts.getInitialServerList())) {
// if blank, extends parent's value
rOpts.setInitialServerList(opts.getInitialServerList());
}
if (rOpts.getNodeOptions() == null) {
// copy common node options
rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
Expand Down Expand Up @@ -663,6 +668,9 @@ private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store s
Requires.requireTrue(rOptsList.size() == regionList.size());
for (int i = 0; i < rOptsList.size(); i++) {
final RegionEngineOptions rOpts = rOptsList.get(i);
if (!inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {
continue;
}
final Region region = regionList.get(i);
if (Strings.isBlank(rOpts.getRaftDataPath())) {
final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
Expand All @@ -682,6 +690,18 @@ private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store s
return true;
}

private boolean inConfiguration(final String curr, final String all) {
final PeerId currPeer = new PeerId();
if (!currPeer.parse(curr)) {
return false;
}
final Configuration allConf = new Configuration();
if (!allConf.parse(all)) {
return false;
}
return allConf.contains(currPeer) || allConf.getLearners().contains(currPeer);
}

private void registerRegionKVService(final RegionKVService regionKVService) {
final RegionKVService preService = this.regionKVServiceTable.putIfAbsent(regionKVService.getRegionId(),
regionKVService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import org.junit.Assert;

Expand Down Expand Up @@ -60,7 +61,7 @@ public static void main(String[] args) throws Exception {
}

private void putAndGetValue() {
final RheaKVStore store = getLeaderStore(-1);
final RheaKVStore store = getLeaderStore(ThreadLocalRandom.current().nextInt(1, 2));
final String key = UUID.randomUUID().toString();
final byte[] value = makeValue(UUID.randomUUID().toString());
store.bPut(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.io.FileUtils;
Expand All @@ -27,6 +29,7 @@
import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
import com.alipay.sofa.jraft.rhea.client.pd.PlacementDriverClient;
import com.alipay.sofa.jraft.rhea.errors.NotLeaderException;
import com.alipay.sofa.jraft.rhea.options.RegionEngineOptions;
import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
import com.alipay.sofa.jraft.util.Endpoint;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -38,8 +41,10 @@
*/
public class RheaKVTestCluster {

private static final String[] CONF = { "/pd_conf/rhea_pd_test_1.yaml",
"/pd_conf/rhea_pd_test_2.yaml", "/pd_conf/rhea_pd_test_3.yaml" };
private static final String[] CONF = { "/pd_conf/rhea_pd_test_1.yaml", //
"/pd_conf/rhea_pd_test_2.yaml", //
"/pd_conf/rhea_pd_test_3.yaml" //
};

private volatile String tempDbPath;
private volatile String tempRaftPath;
Expand All @@ -65,10 +70,15 @@ protected void start() throws IOException, InterruptedException {
this.tempRaftPath = file.getAbsolutePath();
System.out.println("make dir: " + this.tempRaftPath);
}

final Set<Long> regionIds = new HashSet<>();
for (final String c : CONF) {
final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
final InputStream in = RheaKVTestCluster.class.getResourceAsStream(c);
final RheaKVStoreOptions opts = mapper.readValue(in, RheaKVStoreOptions.class);
for (final RegionEngineOptions rOpts : opts.getStoreEngineOptions().getRegionEngineOptionsList()) {
regionIds.add(rOpts.getRegionId());
}
final RheaKVStore rheaKVStore = new DefaultRheaKVStore();
if (rheaKVStore.init(opts)) {
stores.add(rheaKVStore);
Expand All @@ -77,8 +87,10 @@ protected void start() throws IOException, InterruptedException {
}
}
final PlacementDriverClient pdClient = stores.get(0).getPlacementDriverClient();
final Endpoint leader1 = pdClient.getLeader(-1, true, 10000);
System.out.println("The region -1 leader is: " + leader1);
for (final Long regionId : regionIds) {
final Endpoint leader = pdClient.getLeader(regionId, true, 10000);
System.out.println("The region " + regionId + " leader is: " + leader);
}
}

protected void shutdown() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ storeEngineOptions:
serverAddress:
ip: 127.0.0.1
port: 18181
regionEngineOptionsList:
- { regionId: 1, endKey: g, initialServerList: "127.0.0.1:18181,127.0.0.1:18182,127.0.0.1:18183/learner"}
- { regionId: 2, startKey: g}
leastKeysOnSplit: 10

initialServerList: 127.0.0.1:18181,127.0.0.1:18182,127.0.0.1:18183
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ storeEngineOptions:
serverAddress:
ip: 127.0.0.1
port: 18182
regionEngineOptionsList:
- { regionId: 1, endKey: g, initialServerList: "127.0.0.1:18181,127.0.0.1:18182,127.0.0.1:18183/learner"}
- { regionId: 2, startKey: g}
leastKeysOnSplit: 10

initialServerList: 127.0.0.1:18181,127.0.0.1:18182,127.0.0.1:18183
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ storeEngineOptions:
serverAddress:
ip: 127.0.0.1
port: 18183
regionEngineOptionsList:
- { regionId: 1, endKey: g, initialServerList: "127.0.0.1:18181,127.0.0.1:18182,127.0.0.1:18183/learner"}
- { regionId: 2, startKey: g}
leastKeysOnSplit: 10

initialServerList: 127.0.0.1:18181,127.0.0.1:18182,127.0.0.1:18183

0 comments on commit a5ce313

Please sign in to comment.