Skip to content

Commit

Permalink
fix/state listener (sofastack#285)
Browse files Browse the repository at this point in the history
* (feat) code format

* (feat) sofastack#284

* (feat) sofastack#284
  • Loading branch information
fengjiachun authored Oct 10, 2019
1 parent 5b0ad62 commit 2fde182
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public synchronized Node getRaftNode() {
* Starts the raft group service, returns the raft node.
*/
public synchronized Node start() {
return this.start(true);
return start(true);
}

/**
Expand Down
32 changes: 16 additions & 16 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/entity/Ballot.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public UnfoundPeerId(PeerId peerId, int index, boolean found) {
/**
* Init the ballot with current conf and old conf.
*
* @param conf current configuration
* @param oldConf old configuration
* @param conf current configuration
* @param oldConf old configuration
* @return true if init success
*/
public boolean init(Configuration conf, Configuration oldConf) {
public boolean init(final Configuration conf, final Configuration oldConf) {
this.peers.clear();
this.oldPeers.clear();
quorum = oldQuorum = 0;
this.quorum = this.oldQuorum = 0;
int index = 0;
if (conf != null) {
for (PeerId peer : conf) {
for (final PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
}
Expand All @@ -76,17 +76,17 @@ public boolean init(Configuration conf, Configuration oldConf) {
return true;
}
index = 0;
for (PeerId peer : oldConf) {
for (final PeerId peer : oldConf) {
this.oldPeers.add(new UnfoundPeerId(peer, index++, false));
}

this.oldQuorum = this.oldPeers.size() / 2 + 1;
return true;
}

private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int posHint) {
private UnfoundPeerId findPeer(final PeerId peerId, final List<UnfoundPeerId> peers, final int posHint) {
if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) {
for (UnfoundPeerId ufp : peers) {
for (final UnfoundPeerId ufp : peers) {
if (ufp.peerId.equals(peerId)) {
return ufp;
}
Expand All @@ -97,8 +97,8 @@ private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int pos
return peers.get(posHint);
}

public PosHint grant(PeerId peerId, PosHint hint) {
UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0);
public PosHint grant(final PeerId peerId, final PosHint hint) {
UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);
if (peer != null) {
if (!peer.found) {
peer.found = true;
Expand All @@ -108,15 +108,15 @@ public PosHint grant(PeerId peerId, PosHint hint) {
} else {
hint.pos0 = -1;
}
if (oldPeers.isEmpty()) {
if (this.oldPeers.isEmpty()) {
hint.pos1 = -1;
return hint;
}
peer = findPeer(peerId, oldPeers, hint.pos1);
peer = findPeer(peerId, this.oldPeers, hint.pos1);
if (peer != null) {
if (!peer.found) {
peer.found = true;
oldQuorum--;
this.oldQuorum--;
}
hint.pos1 = peer.index;
} else {
Expand All @@ -126,8 +126,8 @@ public PosHint grant(PeerId peerId, PosHint hint) {
return hint;
}

public void grant(PeerId peerId) {
this.grant(peerId, new PosHint());
public void grant(final PeerId peerId) {
grant(peerId, new PosHint());
}

/**
Expand All @@ -136,6 +136,6 @@ public void grant(PeerId peerId) {
* @return true if the ballot is granted
*/
public boolean isGranted() {
return this.quorum <= 0 && oldQuorum <= 0;
return this.quorum <= 0 && this.oldQuorum <= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public static void main(final String[] args) {
electionOpts.setInitialServerAddressList(initialConfStr);

final ElectionNode node = new ElectionNode();
node.init(electionOpts);
node.addLeaderStateListener(new LeaderStateListener() {

@Override
Expand All @@ -56,5 +55,6 @@ public void onLeaderStop(long leaderTerm) {
System.out.println("[ElectionBootstrap] Leader stop on term: " + leaderTerm);
}
});
node.init(electionOpts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
Expand All @@ -40,13 +42,14 @@
*/
public class ElectionNode implements Lifecycle<ElectionNodeOptions> {

private static final Logger LOG = LoggerFactory.getLogger(ElectionNode.class);
private static final Logger LOG = LoggerFactory.getLogger(ElectionNode.class);

private RaftGroupService raftGroupService;
private Node node;
private ElectionOnlyStateMachine fsm;
private final List<LeaderStateListener> listeners = new CopyOnWriteArrayList<>();
private RaftGroupService raftGroupService;
private Node node;
private ElectionOnlyStateMachine fsm;

private boolean started;
private boolean started;

@Override
public boolean init(final ElectionNodeOptions opts) {
Expand All @@ -59,13 +62,13 @@ public boolean init(final ElectionNodeOptions opts) {
if (nodeOpts == null) {
nodeOpts = new NodeOptions();
}
this.fsm = new ElectionOnlyStateMachine();
this.fsm = new ElectionOnlyStateMachine(this.listeners);
nodeOpts.setFsm(this.fsm);
final Configuration initialConf = new Configuration();
if (!initialConf.parse(opts.getInitialServerAddressList())) {
throw new IllegalArgumentException("Fail to parse initConf: " + opts.getInitialServerAddressList());
}
// 设置初始集群配置
// Set the initial cluster configuration
nodeOpts.setInitialConf(initialConf);
final String dataPath = opts.getDataPath();
try {
Expand All @@ -74,12 +77,11 @@ public boolean init(final ElectionNodeOptions opts) {
LOG.error("Fail to make dir for dataPath {}.", dataPath);
return false;
}
// 设置存储路径
// 日志, 必须
// Set the data path
// Log, required
nodeOpts.setLogUri(Paths.get(dataPath, "log").toString());
// 元信息, 必须
// Metadata, required
nodeOpts.setRaftMetaUri(Paths.get(dataPath, "meta").toString());
// 纯选举场景不需要设置 snapshot, 不设置可避免启动 snapshot timer
// nodeOpts.setSnapshotUri(Paths.get(dataPath, "snapshot").toString());

final String groupId = opts.getGroupId();
Expand All @@ -88,11 +90,8 @@ public boolean init(final ElectionNodeOptions opts) {
throw new IllegalArgumentException("Fail to parse serverId: " + opts.getServerAddress());
}
final RpcServer rpcServer = new RpcServer(serverId.getPort());
// 注册 raft 处理器
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
// 初始化 raft group 服务框架
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOpts, rpcServer);
// 启动
this.node = this.raftGroupService.start();
if (this.node != null) {
this.started = true;
Expand Down Expand Up @@ -134,6 +133,6 @@ public boolean isLeader() {
}

public void addLeaderStateListener(final LeaderStateListener listener) {
this.fsm.addLeaderStateListener(listener);
this.listeners.add(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alipay.sofa.jraft.example.election;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
Expand All @@ -36,7 +35,11 @@ public class ElectionOnlyStateMachine extends StateMachineAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ElectionOnlyStateMachine.class);

private final AtomicLong leaderTerm = new AtomicLong(-1L);
private final List<LeaderStateListener> listeners = new CopyOnWriteArrayList<>();
private final List<LeaderStateListener> listeners;

public ElectionOnlyStateMachine(List<LeaderStateListener> listeners) {
this.listeners = listeners;
}

@Override
public void onApply(final Iterator it) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rhea;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

import com.alipay.sofa.jraft.rhea.util.Maps;

/**
* The container of raft state listener, each key(id) corresponds to a listener group.
*
* @author jiachun.fjc
*/
public class StateListenerContainer<K> {

private final ConcurrentMap<K, List<StateListener>> stateListeners = Maps.newConcurrentMap();

public boolean addStateListener(final K id, final StateListener listener) {
List<StateListener> group = this.stateListeners.get(id);
if (group == null) {
final List<StateListener> newGroup = new CopyOnWriteArrayList<>();
group = this.stateListeners.putIfAbsent(id, newGroup);
if (group == null) {
group = newGroup;
}
}
return group.add(listener);
}

public List<StateListener> getStateListenerGroup(final K id) {
final List<StateListener> group = this.stateListeners.get(id);
return group == null ? Collections.emptyList() : group;
}

public boolean removeStateListener(final K id, final StateListener listener) {
final List<StateListener> group = this.stateListeners.get(id);
if (group == null) {
return false;
}
return group.remove(listener);
}

public void clear() {
this.stateListeners.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class StoreEngine implements Lifecycle<StoreEngineOptions> {

private final ConcurrentMap<Long, RegionKVService> regionKVServiceTable = Maps.newConcurrentMapLong();
private final ConcurrentMap<Long, RegionEngine> regionEngineTable = Maps.newConcurrentMapLong();
private final StateListenerContainer<Long> stateListenerContainer;
private final PlacementDriverClient pdClient;
private final long clusterId;

Expand Down Expand Up @@ -117,9 +118,10 @@ public class StoreEngine implements Lifecycle<StoreEngineOptions> {

private boolean started;

public StoreEngine(PlacementDriverClient pdClient) {
this.pdClient = pdClient;
public StoreEngine(PlacementDriverClient pdClient, StateListenerContainer<Long> stateListenerContainer) {
this.pdClient = Requires.requireNonNull(pdClient, "pdClient");
this.clusterId = pdClient.getClusterId();
this.stateListenerContainer = Requires.requireNonNull(stateListenerContainer, "stateListenerContainer");
}

@Override
Expand Down Expand Up @@ -412,6 +414,10 @@ public boolean removeAndStopRegionEngine(final long regionId) {
return false;
}

public StateListenerContainer<Long> getStateListenerContainer() {
return stateListenerContainer;
}

public List<Long> getLeaderRegionIds() {
final List<Long> regionIds = Lists.newArrayListWithCapacity(this.regionEngineTable.size());
for (final RegionEngine regionEngine : this.regionEngineTable.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.alipay.sofa.jraft.rhea.LeaderStateListener;
import com.alipay.sofa.jraft.rhea.RegionEngine;
import com.alipay.sofa.jraft.rhea.StateListener;
import com.alipay.sofa.jraft.rhea.StateListenerContainer;
import com.alipay.sofa.jraft.rhea.StoreEngine;
import com.alipay.sofa.jraft.rhea.client.failover.FailoverClosure;
import com.alipay.sofa.jraft.rhea.client.failover.ListRetryCallable;
Expand Down Expand Up @@ -184,26 +185,28 @@
*/
public class DefaultRheaKVStore implements RheaKVStore {

private static final Logger LOG = LoggerFactory.getLogger(DefaultRheaKVStore.class);
private static final Logger LOG = LoggerFactory
.getLogger(DefaultRheaKVStore.class);

static {
ExtSerializerSupports.init();
}

private StoreEngine storeEngine;
private PlacementDriverClient pdClient;
private RheaKVRpcService rheaKVRpcService;
private RheaKVStoreOptions opts;
private int failoverRetries;
private long futureTimeoutMillis;
private boolean onlyLeaderRead;
private Dispatcher kvDispatcher;
private BatchingOptions batchingOpts;
private GetBatching getBatching;
private GetBatching getBatchingOnlySafe;
private PutBatching putBatching;
private final StateListenerContainer<Long> stateListenerContainer = new StateListenerContainer<>();
private StoreEngine storeEngine;
private PlacementDriverClient pdClient;
private RheaKVRpcService rheaKVRpcService;
private RheaKVStoreOptions opts;
private int failoverRetries;
private long futureTimeoutMillis;
private boolean onlyLeaderRead;
private Dispatcher kvDispatcher;
private BatchingOptions batchingOpts;
private GetBatching getBatching;
private GetBatching getBatchingOnlySafe;
private PutBatching putBatching;

private volatile boolean started;
private volatile boolean started;

@Override
public synchronized boolean init(final RheaKVStoreOptions opts) {
Expand Down Expand Up @@ -234,7 +237,7 @@ public synchronized boolean init(final RheaKVStoreOptions opts) {
final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
if (stOpts != null) {
stOpts.setInitialServerList(opts.getInitialServerList());
this.storeEngine = new StoreEngine(this.pdClient);
this.storeEngine = new StoreEngine(this.pdClient, this.stateListenerContainer);
if (!this.storeEngine.init(stOpts)) {
LOG.error("Fail to init [StoreEngine].");
return false;
Expand Down Expand Up @@ -309,6 +312,7 @@ public synchronized void shutdown() {
if (this.putBatching != null) {
this.putBatching.shutdown();
}
this.stateListenerContainer.clear();
LOG.info("[DefaultRheaKVStore] shutdown successfully.");
}

Expand Down Expand Up @@ -1408,15 +1412,7 @@ public void addFollowerStateListener(final long regionId, final FollowerStateLis

@Override
public void addStateListener(final long regionId, final StateListener listener) {
checkState();
if (this.storeEngine == null) {
throw new IllegalStateException("current node do not have store engine");
}
final RegionEngine regionEngine = this.storeEngine.getRegionEngine(regionId);
if (regionEngine == null) {
throw new IllegalStateException("current node do not have this region engine[" + regionId + "]");
}
regionEngine.getFsm().addStateListener(listener);
this.stateListenerContainer.addStateListener(regionId, listener);
}

public long getClusterId() {
Expand Down
Loading

0 comments on commit 2fde182

Please sign in to comment.