Skip to content

Commit

Permalink
feat: grpc pipeline support (sofastack#456)
Browse files Browse the repository at this point in the history
* feat: grpc pipeline support

* by CR
  • Loading branch information
fengjiachun authored May 27, 2020
1 parent baa3b69 commit 779ae80
Show file tree
Hide file tree
Showing 16 changed files with 533 additions and 369 deletions.
67 changes: 45 additions & 22 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1083,16 +1083,21 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Node "). //
append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
append(" received HeartbeatResponse from "). //
append(r.options.getPeerId()). //
append(" prevLogIndex=").append(request.getPrevLogIndex()). //
append(" prevLogTerm=").append(request.getPrevLogTerm());
sb = new StringBuilder("Node ") //
.append(r.options.getGroupId()) //
.append(':') //
.append(r.options.getServerId()) //
.append(" received HeartbeatResponse from ") //
.append(r.options.getPeerId()) //
.append(" prevLogIndex=") //
.append(request.getPrevLogIndex()) //
.append(" prevLogTerm=") //
.append(request.getPrevLogTerm());
}
if (!status.isOk()) {
if (isLogDebugEnabled) {
sb.append(" fail, sleep.");
sb.append(" fail, sleep, status=") //
.append(status);
LOG.debug(sb.toString());
}
r.state = State.Probe;
Expand All @@ -1107,7 +1112,9 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
r.consecutiveErrorTimes = 0;
if (response.getTerm() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ")
sb.append(" fail, greater term ") //
.append(response.getTerm()) //
.append(" expect term ") //
.append(r.options.getTerm());
LOG.debug(sb.toString());
}
Expand All @@ -1120,7 +1127,9 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
}
if (!response.getSuccess() && response.hasLastLogIndex()) {
if (isLogDebugEnabled) {
sb.append(" fail, response term ").append(response.getTerm()).append(" lastLogIndex ")
sb.append(" fail, response term ") //
.append(response.getTerm()) //
.append(" lastLogIndex ") //
.append(response.getLastLogIndex());
LOG.debug(sb.toString());
}
Expand Down Expand Up @@ -1181,7 +1190,9 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,");
sb = new StringBuilder("Replicator ") //
.append(r) //
.append(" is processing RPC responses, ");
}
try {
int processed = 0;
Expand All @@ -1192,7 +1203,9 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
if (processed > 0) {
if (isLogDebugEnabled) {
sb.append("has processed ").append(processed).append(" responses,");
sb.append("has processed ") //
.append(processed) //
.append(" responses, ");
}
break;
} else {
Expand All @@ -1208,7 +1221,8 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
if (inflight == null) {
// The previous in-flight requests were cleared.
if (isLogDebugEnabled) {
sb.append("ignore response because request not found:").append(queuedPipelinedResponse)
sb.append("ignore response because request not found: ") //
.append(queuedPipelinedResponse) //
.append(",\n");
}
continue;
Expand Down Expand Up @@ -1249,7 +1263,8 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St
}
} finally {
if (isLogDebugEnabled) {
sb.append(", after processed, continue to send entries: ").append(continueSendEntries);
sb.append("after processed, continue to send entries: ") //
.append(continueSendEntries);
LOG.debug(sb.toString());
}
if (continueSendEntries) {
Expand Down Expand Up @@ -1296,21 +1311,27 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Node "). //
append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
append(" received AppendEntriesResponse from "). //
append(r.options.getPeerId()). //
append(" prevLogIndex=").append(request.getPrevLogIndex()). //
append(" prevLogTerm=").append(request.getPrevLogTerm()). //
append(" count=").append(request.getEntriesCount());
sb = new StringBuilder("Node ") //
.append(r.options.getGroupId()) //
.append(':') //
.append(r.options.getServerId()) //
.append(" received AppendEntriesResponse from ") //
.append(r.options.getPeerId()) //
.append(" prevLogIndex=") //
.append(request.getPrevLogIndex()) //
.append(" prevLogTerm=") //
.append(request.getPrevLogTerm()) //
.append(" count=") //
.append(request.getEntriesCount());
}
if (!status.isOk()) {
// If the follower crashes, any RPC to the follower fails immediately,
// so we need to block the follower for a while instead of looping until
// it comes back or be removed
// dummy_id is unlock in block
if (isLogDebugEnabled) {
sb.append(" fail, sleep.");
sb.append(" fail, sleep, status=") //
.append(status);
LOG.debug(sb.toString());
}
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
Expand All @@ -1328,7 +1349,9 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
if (!response.getSuccess()) {
if (response.getTerm() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ")
sb.append(" fail, greater term ") //
.append(response.getTerm()) //
.append(" expect term ") //
.append(r.options.getTerm());
LOG.debug(sb.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public ThreadId getReplicator(final PeerId peer) {

@Override
public boolean addReplicator(final PeerId peer) {
return this.addReplicator(peer, ReplicatorType.Follower);
return addReplicator(peer, ReplicatorType.Follower);
}

@Override
Expand Down
Loading

0 comments on commit 779ae80

Please sign in to comment.