Skip to content

Commit

Permalink
some fix/improve (sofastack#489)
Browse files Browse the repository at this point in the history
* typo

* [feat] impl Describer for RouteTable

* [feat] impl Describer for RegionEngine

* [feat] impl Describer for StoreEngine

* [feat] impl Describer for StoreEngine

* [rheakv] avoid sending repeatedly

* [rheakv] avoid sending repeatedly

* lock free improve

* format

* upgrade rocksdb(v5.18.4) for aarch64 facebook/rocksdb#6497

* upgrade bolt to 1.6.2

* read-index ut

* let append-entry fail-fast

* typo

* rocksdb:5.18.3

* add weak read for `isLeader()`

* improve `checkDeadNodes`

* improve `checkDeadNodes`

* fail-fast on `addReplicator`

* format

* add `LongHeldDetectingReadWriteLock`

* add node-lock-blocked metric

* add UT: LongHeldDetectingReadWriteLockTest

* by CR

* by CR
  • Loading branch information
fengjiachun authored Jul 29, 2020
1 parent 406d34d commit 136cf10
Show file tree
Hide file tree
Showing 29 changed files with 609 additions and 67 deletions.
6 changes: 6 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
boolean isLeader();

/**
* Returns true when the node is leader.
* @param blocking if true, will be blocked until the node finish it's state change
*/
boolean isLeader(final boolean blocking);

/**
* Shutdown local replica node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public interface ReplicatorGroup extends Describer {
* @param peer target peer
* @return true on success
*/
boolean addReplicator(final PeerId peer);
default boolean addReplicator(final PeerId peer) {
return addReplicator(peer, ReplicatorType.Follower);
}

/**
* Add a replicator attached with |peer|
Expand All @@ -67,7 +69,24 @@ public interface ReplicatorGroup extends Describer {
* @param replicatorType replicator type
* @return true on success
*/
boolean addReplicator(final PeerId peer, ReplicatorType replicatorType);
default boolean addReplicator(final PeerId peer, ReplicatorType replicatorType) {
return addReplicator(peer, replicatorType, true);
}

/**
* Try to add a replicator attached with |peer|
* will be a notification when the replicator catches up according to the
* arguments.
* NOTE: when calling this function, the replicators starts to work
* immediately, and might call Node#stepDown which might have race with
* the caller, you should deal with this situation.
*
* @param peer target peer
* @param replicatorType replicator type
* @param sync synchronous
* @return true on success
*/
boolean addReplicator(final PeerId peer, ReplicatorType replicatorType, boolean sync);

/**
* Send heartbeat to a peer.
Expand Down
20 changes: 19 additions & 1 deletion jraft-core/src/main/java/com/alipay/sofa/jraft/RouteTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.alipay.sofa.jraft.rpc.CliClientService;
import com.alipay.sofa.jraft.rpc.CliRequests;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.Requires;
import com.google.protobuf.Message;

Expand All @@ -44,7 +45,7 @@
*
* 2018-Apr-09 10:41:21 AM
*/
public class RouteTable {
public class RouteTable implements Describer {

private static final Logger LOG = LoggerFactory.getLogger(RouteTable.class);

Expand Down Expand Up @@ -353,14 +354,31 @@ public boolean removeGroup(final String groupId) {
return this.groupConfTable.remove(groupId) != null;
}

@Override
public String toString() {
return "RouteTable{" + "groupConfTable=" + groupConfTable + '}';
}

private RouteTable() {
}

@Override
public void describe(final Printer out) {
out.println("RouteTable:") //
.print(" ") //
.println(toString());
}

private static class GroupConf {

private final StampedLock stampedLock = new StampedLock();

private Configuration conf;
private PeerId leader;

@Override
public String toString() {
return "GroupConf{" + "conf=" + conf + ", leader=" + leader + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public abstract class ReadIndexClosure implements Closure {
private long index = INVALID_LOG_INDEX;
private byte[] requestContext;

private volatile int state;
private volatile int state = PENDING;

public ReadIndexClosure() {
this(DEFAULT_TIMEOUT);
Expand All @@ -75,7 +75,6 @@ public ReadIndexClosure() {
* @param timeoutMs timeout millis
*/
public ReadIndexClosure(long timeoutMs) {
this.state = PENDING;
if (timeoutMs >= 0) {
// Lazy to init the timer
TimeoutScanner.TIMER.newTimeout(new TimeoutTask(this), timeoutMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -130,7 +129,7 @@ public void run(final Status status) {

try {
run(status, this.index, this.requestContext);
} catch (Throwable t) {
} catch (final Throwable t) {
LOG.error("Fail to run ReadIndexClosure with status: {}.", status, t);
}
}
Expand Down
92 changes: 79 additions & 13 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -114,9 +113,11 @@
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import com.alipay.sofa.jraft.util.SignalHelper;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
import com.alipay.sofa.jraft.util.timer.RaftTimerFactory;
import com.google.protobuf.Message;
import com.lmax.disruptor.BlockingWaitStrategy;
Expand Down Expand Up @@ -164,7 +165,8 @@ public class NodeImpl implements Node, RaftServerService {
0);

/** Internal states */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReadWriteLock readWriteLock = new NodeReadWriteLock(
this);
protected final Lock writeLock = this.readWriteLock
.writeLock();
protected final Lock readLock = this.readWriteLock
Expand Down Expand Up @@ -210,7 +212,7 @@ public class NodeImpl implements Node, RaftServerService {
private Disruptor<LogEntryAndClosure> applyDisruptor;
private RingBuffer<LogEntryAndClosure> applyQueue;

/** Metrics*/
/** Metrics */
private NodeMetrics metrics;

private NodeId nodeId;
Expand All @@ -223,6 +225,33 @@ public class NodeImpl implements Node, RaftServerService {
/** The number of elections time out for current node */
private volatile int electionTimeoutCounter;

private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock {

static final long MAX_BLOCKING_MS_TO_REPORT = SystemPropertyUtil.getLong(
"jraft.node.detecting.lock.max_blocking_ms_to_report", -1);

private final Node node;

public NodeReadWriteLock(Node node) {
super(MAX_BLOCKING_MS_TO_REPORT, TimeUnit.MILLISECONDS);
this.node = node;
}

@Override
public void report(final AcquireMode acquireMode, final Thread heldThread,
final Collection<Thread> queuedThreads, final long blockedNanos) {
final long blockedMs = TimeUnit.NANOSECONDS.toMillis(blockedNanos);
LOG.warn(
"Raft-Node-Lock report: currentThread={}, acquireMode={}, heldThread={}, queuedThreads={}, blockedMs={}.",
Thread.currentThread(), acquireMode, heldThread, queuedThreads, blockedMs);

final NodeMetrics metrics = this.node.getNodeMetrics();
if (metrics != null) {
metrics.recordLatency("node-lock-blocked", blockedMs);
}
}
}

/**
* Node service event.
*
Expand Down Expand Up @@ -2123,22 +2152,27 @@ private void onCaughtUp(final PeerId peer, final long term, final long version,
}
}

private void checkDeadNodes(final Configuration conf, final long monotonicNowMs) {
private boolean checkDeadNodes(final Configuration conf, final long monotonicNowMs,
final boolean stepDownOnCheckFail) {
// Check learner replicators at first.
for (PeerId peer : conf.getLearners()) {
for (final PeerId peer : conf.getLearners()) {
checkReplicator(peer);
}
// Ensure quorum nodes alive.
final List<PeerId> peers = conf.listPeers();
final Configuration deadNodes = new Configuration();
if (checkDeadNodes0(peers, monotonicNowMs, true, deadNodes)) {
return;
return true;
}
LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.",
getNodeId(), this.currTerm, deadNodes, conf);
final Status status = new Status();
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), peers.size());
stepDown(this.currTerm, false, status);
if (stepDownOnCheckFail) {
LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.",
getNodeId(), this.currTerm, deadNodes, conf);
final Status status = new Status();
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(),
peers.size());
stepDown(this.currTerm, false, status);
}
return false;
}

private boolean checkDeadNodes0(final List<PeerId> peers, final long monotonicNowMs, final boolean checkReplicator,
Expand Down Expand Up @@ -2189,17 +2223,41 @@ private List<PeerId> getAliveNodes(final Collection<PeerId> peers, final long mo
return alivePeers;
}

@SuppressWarnings({ "LoopStatementThatDoesntLoop", "ConstantConditions" })
private void handleStepDownTimeout() {
do {
this.readLock.lock();
try {
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm,
this.state);
return;
}
final long monotonicNowMs = Utils.monotonicMs();
if (!checkDeadNodes(this.conf.getConf(), monotonicNowMs, false)) {
break;
}
if (!this.conf.getOldConf().isEmpty()) {
if (!checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, false)) {
break;
}
}
return;
} finally {
this.readLock.unlock();
}
} while (false);

this.writeLock.lock();
try {
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state);
return;
}
final long monotonicNowMs = Utils.monotonicMs();
checkDeadNodes(this.conf.getConf(), monotonicNowMs);
checkDeadNodes(this.conf.getConf(), monotonicNowMs, true);
if (!this.conf.getOldConf().isEmpty()) {
checkDeadNodes(this.conf.getOldConf(), monotonicNowMs);
checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, true);
}
} finally {
this.writeLock.unlock();
Expand Down Expand Up @@ -2636,6 +2694,14 @@ private void handleVoteTimeout() {

@Override
public boolean isLeader() {
return isLeader(true);
}

@Override
public boolean isLeader(final boolean blocking) {
if (!blocking) {
return this.state == State.STATE_LEADER;
}
this.readLock.lock();
try {
return this.state == State.STATE_LEADER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import com.alipay.sofa.jraft.option.ReplicatorOptions;
import com.alipay.sofa.jraft.rpc.RaftClientService;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.util.OnlyForTest;
Expand Down Expand Up @@ -109,21 +110,23 @@ public ThreadId getReplicator(final PeerId peer) {
}

@Override
public boolean addReplicator(final PeerId peer) {
return addReplicator(peer, ReplicatorType.Follower);
}

@Override
public boolean addReplicator(final PeerId peer, final ReplicatorType replicatorType) {
public boolean addReplicator(final PeerId peer, final ReplicatorType replicatorType, final boolean sync) {
Requires.requireTrue(this.commonOptions.getTerm() != 0);
this.failureReplicators.remove(peer);
if (this.replicatorMap.containsKey(peer)) {
return true;
}
final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();

opts.setReplicatorType(replicatorType);
opts.setPeerId(peer);
if (!sync) {
final RaftClientService client = opts.getRaftRpcService();
if (client != null && !client.checkConnection(peer.getEndpoint(), true)) {
LOG.error("Fail to check replicator connection to peer={}, replicatorType={}.", peer, replicatorType);
this.failureReplicators.put(peer, replicatorType);
return false;
}
}
final ThreadId rid = Replicator.start(opts, this.raftOptions);
if (rid == null) {
LOG.error("Fail to start replicator to peer={}, replicatorType={}.", peer, replicatorType);
Expand Down Expand Up @@ -181,7 +184,7 @@ public void checkReplicator(final PeerId peer, final boolean lockNode) {
try {
if (node.isLeader()) {
final ReplicatorType rType = this.failureReplicators.get(peer);
if (rType != null && addReplicator(peer, rType)) {
if (rType != null && addReplicator(peer, rType, false)) {
this.failureReplicators.remove(peer, rType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public interface Checksum {

/**
* Caculate a checksum value for this entity.
* Calculate a checksum value for this entity.
* @return checksum value
*/
long checksum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public interface ClientService extends Lifecycle<RpcOptions> {
*/
boolean connect(final Endpoint endpoint);

/**
* Check connection for given address and async to create a new one if there is no connection.
* @param endpoint target address
* @param createIfAbsent create a new one if there is no connection
* @return true if there is a connection and the connection is active and writable.
*/
boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);

/**
* Disconnect from endpoint.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* Check connection for given address.
*
* @param endpoint target address
* @return true if there is a connection adn the connection is active adn writable.
* @return true if there is a connection and the connection is active and writable.
*/
boolean checkConnection(final Endpoint endpoint);

/**
* Check connection for given address and async to create a new one if there is no connection.
* @param endpoint target address
* @param createIfAbsent create a new one if there is no connection
* @return true if there is a connection and the connection is active and writable.
*/
boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);

/**
* Close all connections of a address.
*
Expand Down
Loading

0 comments on commit 136cf10

Please sign in to comment.