Skip to content

Commit

Permalink
feat/Examples about priority-based semi-deterministic leader election. (
Browse files Browse the repository at this point in the history
sofastack#417)

* feat/Examples about priority-based semi-deterministic leader election for users.

* feat/fix some codes conflicts.

* feat/fix typo.

* feat/fix typo.
  • Loading branch information
zongtanghu authored Apr 13, 2020
1 parent 5d47497 commit a851e80
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.jraft.example.election;

import com.alipay.sofa.jraft.entity.PeerId;

/**
*
* @author jiachun.fjc
Expand Down Expand Up @@ -46,8 +48,13 @@ public static void main(final String[] args) {
final ElectionNode node = new ElectionNode();
node.addLeaderStateListener(new LeaderStateListener() {

PeerId serverId = node.getNode().getLeaderId();
String ip = serverId.getIp();
int port = serverId.getPort();

@Override
public void onLeaderStart(long leaderTerm) {
System.out.println("[ElectionBootstrap] Leader's ip is: " + ip + ", port: " + port);
System.out.println("[ElectionBootstrap] Leader start on term: " + leaderTerm);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ElectionNode implements Lifecycle<ElectionNodeOptions> {
@Override
public boolean init(final ElectionNodeOptions opts) {
if (this.started) {
LOG.info("[ElectionNode: {}] already started.");
LOG.info("[ElectionNode: {}] already started.", opts.getServerAddress());
return true;
}
// node options
Expand Down Expand Up @@ -112,7 +112,7 @@ public void shutdown() {
}
}
this.started = false;
LOG.info("[RegionEngine] shutdown successfully: {}.", this);
LOG.info("[ElectionNode] shutdown successfully: {}.", this);
}

public Node getNode() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.priorityelection;

/**
* @author zongtanghu
*/
public interface LeaderStateListener {

/**
* Called when current node becomes leader
*/
void onLeaderStart(final long leaderTerm);

/**
* Called when current node loses leadership.
*/
void onLeaderStop(final long leaderTerm);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.priorityelection;

import com.alipay.sofa.jraft.entity.PeerId;

/**
*
* @author zongtanghu
*/
public class PriorityElectionBootstrap {

// Start elections by 3 instance. Note that if multiple instances are started on the same machine,
// the first parameter `dataPath` should not be the same,
// the second parameter `groupId` should be set the same value, eg: election_test,
// the third parameter `serverId` should be set ip and port with priority value, eg: 127.0.0.1:8081::100, and middle postion can be empty string,
// the fourth parameter `initialConfStr` should be set the all of endpoints in raft cluster, eg : 127.0.0.1:8081::100,127.0.0.1:8082::40,127.0.0.1:8083::40.

public static void main(final String[] args) {
if (args.length < 4) {
System.out
.println("Useage : java com.alipay.sofa.jraft.example.priorityelection.PriorityElectionBootstrap {dataPath} {groupId} {serverId} {initConf}");
System.out
.println("Example: java com.alipay.sofa.jraft.example.priorityelection.PriorityElectionBootstrap /tmp/server1 election_test 127.0.0.1:8081::100 127.0.0.1:8081::100,127.0.0.1:8082::40,127.0.0.1:8083::40");
System.exit(1);
}
final String dataPath = args[0];
final String groupId = args[1];
final String serverIdStr = args[2];
final String initialConfStr = args[3];

final PriorityElectionNodeOptions priorityElectionOpts = new PriorityElectionNodeOptions();
priorityElectionOpts.setDataPath(dataPath);
priorityElectionOpts.setGroupId(groupId);
priorityElectionOpts.setServerAddress(serverIdStr);
priorityElectionOpts.setInitialServerAddressList(initialConfStr);

final PriorityElectionNode node = new PriorityElectionNode();
node.addLeaderStateListener(new LeaderStateListener() {

@Override
public void onLeaderStart(long leaderTerm) {

PeerId serverId = node.getNode().getLeaderId();
int priority = serverId.getPriority();
String ip = serverId.getIp();
int port = serverId.getPort();

System.out.println("[PriorityElectionBootstrap] Leader's ip is: " + ip + ", port: " + port
+ ", priority: " + priority);
System.out.println("[PriorityElectionBootstrap] Leader start on term: " + leaderTerm);
}

@Override
public void onLeaderStop(long leaderTerm) {
System.out.println("[PriorityElectionBootstrap] Leader stop on term: " + leaderTerm);
}
});
node.init(priorityElectionOpts);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.priorityelection;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.internal.ThrowUtil;

/**
* @author zongtanghu
*/
public class PriorityElectionNode implements Lifecycle<PriorityElectionNodeOptions> {

private static final Logger LOG = LoggerFactory.getLogger(PriorityElectionNode.class);

private final List<LeaderStateListener> listeners = new CopyOnWriteArrayList<>();
private RaftGroupService raftGroupService;
private Node node;
private PriorityElectionOnlyStateMachine fsm;

private boolean started;

@Override
public boolean init(final PriorityElectionNodeOptions opts) {
if (this.started) {
LOG.info("[PriorityElectionNode: {}] already started.", opts.getServerAddress());
return true;
}
// node options
NodeOptions nodeOpts = opts.getNodeOptions();
if (nodeOpts == null) {
nodeOpts = new NodeOptions();
}
this.fsm = new PriorityElectionOnlyStateMachine(this.listeners);
// Set the initial PriorityElectionOnlyStateMachine
nodeOpts.setFsm(this.fsm);
final Configuration initialConf = new Configuration();
if (!initialConf.parse(opts.getInitialServerAddressList())) {
throw new IllegalArgumentException("Fail to parse initConf: " + opts.getInitialServerAddressList());
}
// Set the initial cluster configuration
nodeOpts.setInitialConf(initialConf);
final String dataPath = opts.getDataPath();
try {
FileUtils.forceMkdir(new File(dataPath));
} catch (final IOException e) {
LOG.error("Fail to make dir for dataPath {}.", dataPath);
return false;
}
// Set the data path
// Log, required
nodeOpts.setLogUri(Paths.get(dataPath, "log").toString());
// Metadata, required
nodeOpts.setRaftMetaUri(Paths.get(dataPath, "meta").toString());
// nodeOpts.setSnapshotUri(Paths.get(dataPath, "snapshot").toString());

final String groupId = opts.getGroupId();
final PeerId serverId = new PeerId();
if (!serverId.parse(opts.getServerAddress())) {
throw new IllegalArgumentException("Fail to parse serverId: " + opts.getServerAddress());
}
// Set priority value, required
nodeOpts.setElectionPriority(serverId.getPriority());

final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint());
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOpts, rpcServer);
this.node = this.raftGroupService.start();
if (this.node != null) {
this.started = true;
}
return this.started;
}

@Override
public void shutdown() {
if (!this.started) {
return;
}
if (this.raftGroupService != null) {
this.raftGroupService.shutdown();
try {
this.raftGroupService.join();
} catch (final InterruptedException e) {
ThrowUtil.throwException(e);
}
}
this.started = false;
LOG.info("[ElectionNode] shutdown successfully: {}.", this);
}

public Node getNode() {
return node;
}

public PriorityElectionOnlyStateMachine getFsm() {
return fsm;
}

public boolean isStarted() {
return started;
}

public boolean isLeader() {
return this.fsm.isLeader();
}

public void addLeaderStateListener(final LeaderStateListener listener) {
this.listeners.add(listener);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.priorityelection;

import com.alipay.sofa.jraft.option.NodeOptions;

/**
* @author zongtanghu
*/
public class PriorityElectionNodeOptions {

private String dataPath;
// raft group id
private String groupId;
// ip:port::priority
private String serverAddress;
// ip:port::priority,ip:port::priority,ip:port::priority
private String initialServerAddressList;
// raft node options
private NodeOptions nodeOptions;

public String getDataPath() {
return dataPath;
}

public void setDataPath(String dataPath) {
this.dataPath = dataPath;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

public String getServerAddress() {
return serverAddress;
}

public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}

public String getInitialServerAddressList() {
return initialServerAddressList;
}

public void setInitialServerAddressList(String initialServerAddressList) {
this.initialServerAddressList = initialServerAddressList;
}

public NodeOptions getNodeOptions() {
return nodeOptions;
}

public void setNodeOptions(NodeOptions nodeOptions) {
this.nodeOptions = nodeOptions;
}

@Override
public String toString() {
return "PriorityElectionNodeOptions{" + "dataPath='" + dataPath + '\'' + ", groupId='" + groupId + '\''
+ ", serverAddress='" + serverAddress + '\'' + ", initialServerAddressList='" + initialServerAddressList
+ '\'' + ", nodeOptions=" + nodeOptions + '}';
}
}
Loading

0 comments on commit a851e80

Please sign in to comment.