Skip to content

Commit

Permalink
Minor changes for replicator log and state (sofastack#21)
Browse files Browse the repository at this point in the history
* (fix) Should use clock time in replicator block method

* (feat) Tweak replicator log and state

* (fix) typo

* (fix) rename MaxReplicatorInflightMsgs to maxReplicatorInflightMsgs in log
  • Loading branch information
killme2008 authored and fengjiachun committed Mar 8, 2019
1 parent 5d055ef commit 5ab7341
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,15 @@ private void startHeartbeatTimer(long startMs) {

void installSnapshot() {
if (this.state == State.Snapshot) {
LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
id.unlock();
return;
}
boolean doUnlock = true;
try {
Requires.requireTrue(this.reader == null);
Requires.requireTrue(this.reader == null,
"Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(),
this.state);
reader = options.getSnapshotStorage().open();
if (reader == null) {
final NodeImpl node = options.getNode();
Expand Down Expand Up @@ -553,9 +556,9 @@ static boolean onInstallSnapshotReturned(ThreadId id, Replicator r, Status statu
// We don't retry installing the snapshot explicitly.
// id is unlock in sendEntries
if (!success) {
r.state = State.Probe;
//should reset states
r.resetInflights();
r.state = State.Probe;
r.block(Utils.nowMs(), status.getCode());
return false;
}
Expand Down Expand Up @@ -923,17 +926,17 @@ private static void onTimeout(ThreadId id) {
void destroy() {
final ThreadId savedId = this.id;
LOG.info("Replicator {} is going to quit", savedId);
this.state = State.Destroyed;
this.id = null;
if (reader != null) {
Utils.closeQuietly(reader);
this.reader = null;
}
// Unregister replicator metric set
if (this.options.getNode().getNodeMetrics().getMetricRegistry() != null) {
options.getNode().getNodeMetrics().getMetricRegistry().remove(getReplicatorMetricName(this.options));
}
this.state = State.Destroyed;
savedId.unlockAndDestroy();
if (reader != null) {
Utils.closeQuietly(reader);
this.reader = null;
}
}

static void onHeartbeatReturned(ThreadId id, Status status, AppendEntriesRequest request,
Expand Down Expand Up @@ -1021,6 +1024,8 @@ static void onRpcReturned(ThreadId id, RequestType reqType, Status status, Messa
holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));

if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.state = State.Probe;
r.sendEmptyEntries(false);
Expand Down Expand Up @@ -1166,12 +1171,12 @@ private static boolean onAppendEntriesReturned(ThreadId id, Inflight inflight, S
sb.append(" fail, sleep.");
LOG.debug(sb.toString());
}
r.state = State.Probe;
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
r.resetInflights();
r.state = State.Probe;
//unlock in in block
r.block(startTimeMs, status.getCode());
return false;
Expand Down

0 comments on commit 5ab7341

Please sign in to comment.