From a851e8024bedfe85240f9987f330e18b796a9676 Mon Sep 17 00:00:00 2001 From: Hu Zongtang Date: Mon, 13 Apr 2020 11:58:44 +0800 Subject: [PATCH] feat/Examples about priority-based semi-deterministic leader election. (#417) * feat/Examples about priority-based semi-deterministic leader election for users. * feat/fix some codes conflicts. * feat/fix typo. * feat/fix typo. --- .../example/election/ElectionBootstrap.java | 7 + .../jraft/example/election/ElectionNode.java | 4 +- .../priorityelection/LeaderStateListener.java | 33 ++++ .../PriorityElectionBootstrap.java | 76 ++++++++++ .../PriorityElectionNode.java | 141 ++++++++++++++++++ .../PriorityElectionNodeOptions.java | 82 ++++++++++ .../PriorityElectionOnlyStateMachine.java | 79 ++++++++++ 7 files changed, 420 insertions(+), 2 deletions(-) create mode 100644 jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/LeaderStateListener.java create mode 100644 jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionBootstrap.java create mode 100644 jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java create mode 100644 jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNodeOptions.java create mode 100644 jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionOnlyStateMachine.java diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionBootstrap.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionBootstrap.java index f0b0a94fc..15977fc43 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionBootstrap.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionBootstrap.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.jraft.example.election; +import com.alipay.sofa.jraft.entity.PeerId; + /** * * @author jiachun.fjc @@ -46,8 +48,13 @@ public static void main(final String[] args) { final ElectionNode node = new ElectionNode(); node.addLeaderStateListener(new LeaderStateListener() { + PeerId serverId = node.getNode().getLeaderId(); + String ip = serverId.getIp(); + int port = serverId.getPort(); + @Override public void onLeaderStart(long leaderTerm) { + System.out.println("[ElectionBootstrap] Leader's ip is: " + ip + ", port: " + port); System.out.println("[ElectionBootstrap] Leader start on term: " + leaderTerm); } diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionNode.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionNode.java index 447f1efec..e80ba3698 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionNode.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/election/ElectionNode.java @@ -54,7 +54,7 @@ public class ElectionNode implements Lifecycle { @Override public boolean init(final ElectionNodeOptions opts) { if (this.started) { - LOG.info("[ElectionNode: {}] already started."); + LOG.info("[ElectionNode: {}] already started.", opts.getServerAddress()); return true; } // node options @@ -112,7 +112,7 @@ public void shutdown() { } } this.started = false; - LOG.info("[RegionEngine] shutdown successfully: {}.", this); + LOG.info("[ElectionNode] shutdown successfully: {}.", this); } public Node getNode() { diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/LeaderStateListener.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/LeaderStateListener.java new file mode 100644 index 000000000..0fc2b1750 --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/LeaderStateListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.priorityelection; + +/** + * @author zongtanghu + */ +public interface LeaderStateListener { + + /** + * Called when current node becomes leader + */ + void onLeaderStart(final long leaderTerm); + + /** + * Called when current node loses leadership. + */ + void onLeaderStop(final long leaderTerm); +} diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionBootstrap.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionBootstrap.java new file mode 100644 index 000000000..cb6a5661d --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionBootstrap.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.priorityelection; + +import com.alipay.sofa.jraft.entity.PeerId; + +/** + * + * @author zongtanghu + */ +public class PriorityElectionBootstrap { + + // Start elections by 3 instance. Note that if multiple instances are started on the same machine, + // the first parameter `dataPath` should not be the same, + // the second parameter `groupId` should be set the same value, eg: election_test, + // the third parameter `serverId` should be set ip and port with priority value, eg: 127.0.0.1:8081::100, and middle postion can be empty string, + // the fourth parameter `initialConfStr` should be set the all of endpoints in raft cluster, eg : 127.0.0.1:8081::100,127.0.0.1:8082::40,127.0.0.1:8083::40. + + public static void main(final String[] args) { + if (args.length < 4) { + System.out + .println("Useage : java com.alipay.sofa.jraft.example.priorityelection.PriorityElectionBootstrap {dataPath} {groupId} {serverId} {initConf}"); + System.out + .println("Example: java com.alipay.sofa.jraft.example.priorityelection.PriorityElectionBootstrap /tmp/server1 election_test 127.0.0.1:8081::100 127.0.0.1:8081::100,127.0.0.1:8082::40,127.0.0.1:8083::40"); + System.exit(1); + } + final String dataPath = args[0]; + final String groupId = args[1]; + final String serverIdStr = args[2]; + final String initialConfStr = args[3]; + + final PriorityElectionNodeOptions priorityElectionOpts = new PriorityElectionNodeOptions(); + priorityElectionOpts.setDataPath(dataPath); + priorityElectionOpts.setGroupId(groupId); + priorityElectionOpts.setServerAddress(serverIdStr); + priorityElectionOpts.setInitialServerAddressList(initialConfStr); + + final PriorityElectionNode node = new PriorityElectionNode(); + node.addLeaderStateListener(new LeaderStateListener() { + + @Override + public void onLeaderStart(long leaderTerm) { + + PeerId serverId = node.getNode().getLeaderId(); + int priority = serverId.getPriority(); + String ip = serverId.getIp(); + int port = serverId.getPort(); + + System.out.println("[PriorityElectionBootstrap] Leader's ip is: " + ip + ", port: " + port + + ", priority: " + priority); + System.out.println("[PriorityElectionBootstrap] Leader start on term: " + leaderTerm); + } + + @Override + public void onLeaderStop(long leaderTerm) { + System.out.println("[PriorityElectionBootstrap] Leader stop on term: " + leaderTerm); + } + }); + node.init(priorityElectionOpts); + } + +} diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java new file mode 100644 index 000000000..98e51fb41 --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.priorityelection; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.Lifecycle; +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.util.internal.ThrowUtil; + +/** + * @author zongtanghu + */ +public class PriorityElectionNode implements Lifecycle { + + private static final Logger LOG = LoggerFactory.getLogger(PriorityElectionNode.class); + + private final List listeners = new CopyOnWriteArrayList<>(); + private RaftGroupService raftGroupService; + private Node node; + private PriorityElectionOnlyStateMachine fsm; + + private boolean started; + + @Override + public boolean init(final PriorityElectionNodeOptions opts) { + if (this.started) { + LOG.info("[PriorityElectionNode: {}] already started.", opts.getServerAddress()); + return true; + } + // node options + NodeOptions nodeOpts = opts.getNodeOptions(); + if (nodeOpts == null) { + nodeOpts = new NodeOptions(); + } + this.fsm = new PriorityElectionOnlyStateMachine(this.listeners); + // Set the initial PriorityElectionOnlyStateMachine + nodeOpts.setFsm(this.fsm); + final Configuration initialConf = new Configuration(); + if (!initialConf.parse(opts.getInitialServerAddressList())) { + throw new IllegalArgumentException("Fail to parse initConf: " + opts.getInitialServerAddressList()); + } + // Set the initial cluster configuration + nodeOpts.setInitialConf(initialConf); + final String dataPath = opts.getDataPath(); + try { + FileUtils.forceMkdir(new File(dataPath)); + } catch (final IOException e) { + LOG.error("Fail to make dir for dataPath {}.", dataPath); + return false; + } + // Set the data path + // Log, required + nodeOpts.setLogUri(Paths.get(dataPath, "log").toString()); + // Metadata, required + nodeOpts.setRaftMetaUri(Paths.get(dataPath, "meta").toString()); + // nodeOpts.setSnapshotUri(Paths.get(dataPath, "snapshot").toString()); + + final String groupId = opts.getGroupId(); + final PeerId serverId = new PeerId(); + if (!serverId.parse(opts.getServerAddress())) { + throw new IllegalArgumentException("Fail to parse serverId: " + opts.getServerAddress()); + } + // Set priority value, required + nodeOpts.setElectionPriority(serverId.getPriority()); + + final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); + this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOpts, rpcServer); + this.node = this.raftGroupService.start(); + if (this.node != null) { + this.started = true; + } + return this.started; + } + + @Override + public void shutdown() { + if (!this.started) { + return; + } + if (this.raftGroupService != null) { + this.raftGroupService.shutdown(); + try { + this.raftGroupService.join(); + } catch (final InterruptedException e) { + ThrowUtil.throwException(e); + } + } + this.started = false; + LOG.info("[ElectionNode] shutdown successfully: {}.", this); + } + + public Node getNode() { + return node; + } + + public PriorityElectionOnlyStateMachine getFsm() { + return fsm; + } + + public boolean isStarted() { + return started; + } + + public boolean isLeader() { + return this.fsm.isLeader(); + } + + public void addLeaderStateListener(final LeaderStateListener listener) { + this.listeners.add(listener); + } + +} diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNodeOptions.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNodeOptions.java new file mode 100644 index 000000000..9f563096f --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNodeOptions.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.priorityelection; + +import com.alipay.sofa.jraft.option.NodeOptions; + +/** + * @author zongtanghu + */ +public class PriorityElectionNodeOptions { + + private String dataPath; + // raft group id + private String groupId; + // ip:port::priority + private String serverAddress; + // ip:port::priority,ip:port::priority,ip:port::priority + private String initialServerAddressList; + // raft node options + private NodeOptions nodeOptions; + + public String getDataPath() { + return dataPath; + } + + public void setDataPath(String dataPath) { + this.dataPath = dataPath; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getServerAddress() { + return serverAddress; + } + + public void setServerAddress(String serverAddress) { + this.serverAddress = serverAddress; + } + + public String getInitialServerAddressList() { + return initialServerAddressList; + } + + public void setInitialServerAddressList(String initialServerAddressList) { + this.initialServerAddressList = initialServerAddressList; + } + + public NodeOptions getNodeOptions() { + return nodeOptions; + } + + public void setNodeOptions(NodeOptions nodeOptions) { + this.nodeOptions = nodeOptions; + } + + @Override + public String toString() { + return "PriorityElectionNodeOptions{" + "dataPath='" + dataPath + '\'' + ", groupId='" + groupId + '\'' + + ", serverAddress='" + serverAddress + '\'' + ", initialServerAddressList='" + initialServerAddressList + + '\'' + ", nodeOptions=" + nodeOptions + '}'; + } +} diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionOnlyStateMachine.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionOnlyStateMachine.java new file mode 100644 index 000000000..d8c200c64 --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionOnlyStateMachine.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.priorityelection; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.StateMachineAdapter; + +/** + * @author zongtanghu + */ +public class PriorityElectionOnlyStateMachine extends StateMachineAdapter { + + private static final Logger LOG = LoggerFactory + .getLogger(PriorityElectionOnlyStateMachine.class); + + private final AtomicLong leaderTerm = new AtomicLong(-1L); + private final List listeners; + + public PriorityElectionOnlyStateMachine(List listeners) { + this.listeners = listeners; + } + + @Override + public void onApply(final Iterator it) { + // election only, do nothing + while (it.hasNext()) { + LOG.info("On apply with term: {} and index: {}. ", it.getTerm(), it.getIndex()); + it.next(); + } + } + + @Override + public void onLeaderStart(final long term) { + super.onLeaderStart(term); + this.leaderTerm.set(term); + for (final LeaderStateListener listener : this.listeners) { // iterator the snapshot + listener.onLeaderStart(term); + } + } + + @Override + public void onLeaderStop(final Status status) { + super.onLeaderStop(status); + final long oldTerm = leaderTerm.get(); + this.leaderTerm.set(-1L); + for (final LeaderStateListener listener : this.listeners) { // iterator the snapshot + listener.onLeaderStop(oldTerm); + } + } + + public boolean isLeader() { + return this.leaderTerm.get() > 0; + } + + public void addLeaderStateListener(final LeaderStateListener listener) { + this.listeners.add(listener); + } +}