diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index 0de843eae..3237ee744 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -1083,16 +1083,21 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap final boolean isLogDebugEnabled = LOG.isDebugEnabled(); StringBuilder sb = null; if (isLogDebugEnabled) { - sb = new StringBuilder("Node "). // - append(r.options.getGroupId()).append(":").append(r.options.getServerId()). // - append(" received HeartbeatResponse from "). // - append(r.options.getPeerId()). // - append(" prevLogIndex=").append(request.getPrevLogIndex()). // - append(" prevLogTerm=").append(request.getPrevLogTerm()); + sb = new StringBuilder("Node ") // + .append(r.options.getGroupId()) // + .append(':') // + .append(r.options.getServerId()) // + .append(" received HeartbeatResponse from ") // + .append(r.options.getPeerId()) // + .append(" prevLogIndex=") // + .append(request.getPrevLogIndex()) // + .append(" prevLogTerm=") // + .append(request.getPrevLogTerm()); } if (!status.isOk()) { if (isLogDebugEnabled) { - sb.append(" fail, sleep."); + sb.append(" fail, sleep, status=") // + .append(status); LOG.debug(sb.toString()); } r.state = State.Probe; @@ -1107,7 +1112,9 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap r.consecutiveErrorTimes = 0; if (response.getTerm() > r.options.getTerm()) { if (isLogDebugEnabled) { - sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ") + sb.append(" fail, greater term ") // + .append(response.getTerm()) // + .append(" expect term ") // .append(r.options.getTerm()); LOG.debug(sb.toString()); } @@ -1120,7 +1127,9 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap } if (!response.getSuccess() && response.hasLastLogIndex()) { if (isLogDebugEnabled) { - sb.append(" fail, response term ").append(response.getTerm()).append(" lastLogIndex ") + sb.append(" fail, response term ") // + .append(response.getTerm()) // + .append(" lastLogIndex ") // .append(response.getLastLogIndex()); LOG.debug(sb.toString()); } @@ -1181,7 +1190,9 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St final boolean isLogDebugEnabled = LOG.isDebugEnabled(); StringBuilder sb = null; if (isLogDebugEnabled) { - sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,"); + sb = new StringBuilder("Replicator ") // + .append(r) // + .append(" is processing RPC responses, "); } try { int processed = 0; @@ -1192,7 +1203,9 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St if (queuedPipelinedResponse.seq != r.requiredNextSeq) { if (processed > 0) { if (isLogDebugEnabled) { - sb.append("has processed ").append(processed).append(" responses,"); + sb.append("has processed ") // + .append(processed) // + .append(" responses, "); } break; } else { @@ -1208,7 +1221,8 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St if (inflight == null) { // The previous in-flight requests were cleared. if (isLogDebugEnabled) { - sb.append("ignore response because request not found:").append(queuedPipelinedResponse) + sb.append("ignore response because request not found: ") // + .append(queuedPipelinedResponse) // .append(",\n"); } continue; @@ -1249,7 +1263,8 @@ static void onRpcReturned(final ThreadId id, final RequestType reqType, final St } } finally { if (isLogDebugEnabled) { - sb.append(", after processed, continue to send entries: ").append(continueSendEntries); + sb.append("after processed, continue to send entries: ") // + .append(continueSendEntries); LOG.debug(sb.toString()); } if (continueSendEntries) { @@ -1296,13 +1311,18 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight final boolean isLogDebugEnabled = LOG.isDebugEnabled(); StringBuilder sb = null; if (isLogDebugEnabled) { - sb = new StringBuilder("Node "). // - append(r.options.getGroupId()).append(":").append(r.options.getServerId()). // - append(" received AppendEntriesResponse from "). // - append(r.options.getPeerId()). // - append(" prevLogIndex=").append(request.getPrevLogIndex()). // - append(" prevLogTerm=").append(request.getPrevLogTerm()). // - append(" count=").append(request.getEntriesCount()); + sb = new StringBuilder("Node ") // + .append(r.options.getGroupId()) // + .append(':') // + .append(r.options.getServerId()) // + .append(" received AppendEntriesResponse from ") // + .append(r.options.getPeerId()) // + .append(" prevLogIndex=") // + .append(request.getPrevLogIndex()) // + .append(" prevLogTerm=") // + .append(request.getPrevLogTerm()) // + .append(" count=") // + .append(request.getEntriesCount()); } if (!status.isOk()) { // If the follower crashes, any RPC to the follower fails immediately, @@ -1310,7 +1330,8 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight // it comes back or be removed // dummy_id is unlock in block if (isLogDebugEnabled) { - sb.append(" fail, sleep."); + sb.append(" fail, sleep, status=") // + .append(status); LOG.debug(sb.toString()); } notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); @@ -1328,7 +1349,9 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight if (!response.getSuccess()) { if (response.getTerm() > r.options.getTerm()) { if (isLogDebugEnabled) { - sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ") + sb.append(" fail, greater term ") // + .append(response.getTerm()) // + .append(" expect term ") // .append(r.options.getTerm()); LOG.debug(sb.toString()); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java index e24ea962c..7e79f363d 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java @@ -110,7 +110,7 @@ public ThreadId getReplicator(final PeerId peer) { @Override public boolean addReplicator(final PeerId peer) { - return this.addReplicator(peer, ReplicatorType.Follower); + return addReplicator(peer, ReplicatorType.Follower); } @Override diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java index d17c277d0..8d20d56ef 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java @@ -7365,21 +7365,6 @@ public interface AppendEntriesRequestHeaderOrBuilder extends * required string peer_id = 3; */ com.google.protobuf.ByteString getPeerIdBytes(); - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - boolean hasErrorResponse(); - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse getErrorResponse(); - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponseOrBuilder getErrorResponseOrBuilder(); } /** @@ -7448,20 +7433,6 @@ private AppendEntriesRequestHeader(com.google.protobuf.CodedInputStream input, peerId_ = bs; break; } - case 794: { - com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.Builder subBuilder = null; - if (((bitField0_ & 0x00000008) == 0x00000008)) { - subBuilder = errorResponse_.toBuilder(); - } - errorResponse_ = input.readMessage( - com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.PARSER, extensionRegistry); - if (subBuilder != null) { - subBuilder.mergeFrom(errorResponse_); - errorResponse_ = subBuilder.buildPartial(); - } - bitField0_ |= 0x00000008; - break; - } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -7609,32 +7580,6 @@ public com.google.protobuf.ByteString getPeerIdBytes() { } } - public static final int ERRORRESPONSE_FIELD_NUMBER = 99; - private com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse errorResponse_; - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public boolean hasErrorResponse() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse getErrorResponse() { - return errorResponse_ == null ? com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.getDefaultInstance() - : errorResponse_; - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponseOrBuilder getErrorResponseOrBuilder() { - return errorResponse_ == null ? com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.getDefaultInstance() - : errorResponse_; - } - private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7656,12 +7601,6 @@ public final boolean isInitialized() { memoizedIsInitialized = 0; return false; } - if (hasErrorResponse()) { - if (!getErrorResponse().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } - } memoizedIsInitialized = 1; return true; } @@ -7676,9 +7615,6 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io if (((bitField0_ & 0x00000004) == 0x00000004)) { com.google.protobuf.GeneratedMessageV3.writeString(output, 3, peerId_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeMessage(99, getErrorResponse()); - } unknownFields.writeTo(output); } @@ -7697,9 +7633,6 @@ public int getSerializedSize() { if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, peerId_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - size += com.google.protobuf.CodedOutputStream.computeMessageSize(99, getErrorResponse()); - } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -7728,10 +7661,6 @@ public boolean equals(final java.lang.Object obj) { if (hasPeerId()) { result = result && getPeerId().equals(other.getPeerId()); } - result = result && (hasErrorResponse() == other.hasErrorResponse()); - if (hasErrorResponse()) { - result = result && getErrorResponse().equals(other.getErrorResponse()); - } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -7755,10 +7684,6 @@ public int hashCode() { hash = (37 * hash) + PEER_ID_FIELD_NUMBER; hash = (53 * hash) + getPeerId().hashCode(); } - if (hasErrorResponse()) { - hash = (37 * hash) + ERRORRESPONSE_FIELD_NUMBER; - hash = (53 * hash) + getErrorResponse().hashCode(); - } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -7883,7 +7808,6 @@ private Builder(com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders) { - getErrorResponseFieldBuilder(); } } @@ -7895,12 +7819,6 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000002); peerId_ = ""; bitField0_ = (bitField0_ & ~0x00000004); - if (errorResponseBuilder_ == null) { - errorResponse_ = null; - } else { - errorResponseBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -7937,14 +7855,6 @@ public com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequestHeader buildPar to_bitField0_ |= 0x00000004; } result.peerId_ = peerId_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } - if (errorResponseBuilder_ == null) { - result.errorResponse_ = errorResponse_; - } else { - result.errorResponse_ = errorResponseBuilder_.build(); - } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8003,9 +7913,6 @@ public Builder mergeFrom(com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequ peerId_ = other.peerId_; onChanged(); } - if (other.hasErrorResponse()) { - mergeErrorResponse(other.getErrorResponse()); - } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -8021,11 +7928,6 @@ public final boolean isInitialized() { if (!hasPeerId()) { return false; } - if (hasErrorResponse()) { - if (!getErrorResponse().isInitialized()) { - return false; - } - } return true; } @@ -8280,126 +8182,6 @@ public Builder setPeerIdBytes(com.google.protobuf.ByteString value) { return this; } - private com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse errorResponse_ = null; - private com.google.protobuf.SingleFieldBuilderV3 errorResponseBuilder_; - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public boolean hasErrorResponse() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse getErrorResponse() { - if (errorResponseBuilder_ == null) { - return errorResponse_ == null ? com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse - .getDefaultInstance() : errorResponse_; - } else { - return errorResponseBuilder_.getMessage(); - } - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public Builder setErrorResponse(com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse value) { - if (errorResponseBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - errorResponse_ = value; - onChanged(); - } else { - errorResponseBuilder_.setMessage(value); - } - bitField0_ |= 0x00000008; - return this; - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public Builder setErrorResponse(com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.Builder builderForValue) { - if (errorResponseBuilder_ == null) { - errorResponse_ = builderForValue.build(); - onChanged(); - } else { - errorResponseBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000008; - return this; - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public Builder mergeErrorResponse(com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse value) { - if (errorResponseBuilder_ == null) { - if (((bitField0_ & 0x00000008) == 0x00000008) && errorResponse_ != null - && errorResponse_ != com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.getDefaultInstance()) { - errorResponse_ = com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.newBuilder(errorResponse_) - .mergeFrom(value).buildPartial(); - } else { - errorResponse_ = value; - } - onChanged(); - } else { - errorResponseBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000008; - return this; - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public Builder clearErrorResponse() { - if (errorResponseBuilder_ == null) { - errorResponse_ = null; - onChanged(); - } else { - errorResponseBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000008); - return this; - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse.Builder getErrorResponseBuilder() { - bitField0_ |= 0x00000008; - onChanged(); - return getErrorResponseFieldBuilder().getBuilder(); - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - public com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponseOrBuilder getErrorResponseOrBuilder() { - if (errorResponseBuilder_ != null) { - return errorResponseBuilder_.getMessageOrBuilder(); - } else { - return errorResponse_ == null ? com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse - .getDefaultInstance() : errorResponse_; - } - } - - /** - * optional .jraft.ErrorResponse errorResponse = 99; - */ - private com.google.protobuf.SingleFieldBuilderV3 getErrorResponseFieldBuilder() { - if (errorResponseBuilder_ == null) { - errorResponseBuilder_ = new com.google.protobuf.SingleFieldBuilderV3( - getErrorResponse(), getParentForChildren(), isClean()); - errorResponse_ = null; - } - return errorResponseBuilder_; - } - public final Builder setUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFields(unknownFields); } @@ -14946,29 +14728,28 @@ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { + "og_index\030\006 \002(\003\022\020\n\010pre_vote\030\007 \002(\010\"a\n\023Requ" + "estVoteResponse\022\014\n\004term\030\001 \002(\003\022\017\n\007granted" + "\030\002 \002(\010\022+\n\rerrorResponse\030c \001(\0132\024.jraft.Er" - + "rorResponse\"\177\n\032AppendEntriesRequestHeade" + + "rorResponse\"R\n\032AppendEntriesRequestHeade" + "r\022\020\n\010group_id\030\001 \002(\t\022\021\n\tserver_id\030\002 \002(\t\022\017" - + "\n\007peer_id\030\003 \002(\t\022+\n\rerrorResponse\030c \001(\0132\024" - + ".jraft.ErrorResponse\"\323\001\n\024AppendEntriesRe" - + "quest\022\020\n\010group_id\030\001 \002(\t\022\021\n\tserver_id\030\002 \002" - + "(\t\022\017\n\007peer_id\030\003 \002(\t\022\014\n\004term\030\004 \002(\003\022\025\n\rpre" - + "v_log_term\030\005 \002(\003\022\026\n\016prev_log_index\030\006 \002(\003" - + "\022!\n\007entries\030\007 \003(\0132\020.jraft.EntryMeta\022\027\n\017c" - + "ommitted_index\030\010 \002(\003\022\014\n\004data\030\t \001(\014\"{\n\025Ap" - + "pendEntriesResponse\022\014\n\004term\030\001 \002(\003\022\017\n\007suc" - + "cess\030\002 \002(\010\022\026\n\016last_log_index\030\003 \001(\003\022+\n\rer" - + "rorResponse\030c \001(\0132\024.jraft.ErrorResponse\"" - + "i\n\016GetFileRequest\022\021\n\treader_id\030\001 \002(\003\022\020\n\010" - + "filename\030\002 \002(\t\022\r\n\005count\030\003 \002(\003\022\016\n\006offset\030" - + "\004 \002(\003\022\023\n\013read_partly\030\005 \001(\010\"l\n\017GetFileRes" - + "ponse\022\013\n\003eof\030\001 \002(\010\022\014\n\004data\030\002 \002(\014\022\021\n\tread" - + "_size\030\003 \001(\003\022+\n\rerrorResponse\030c \001(\0132\024.jra" - + "ft.ErrorResponse\"Y\n\020ReadIndexRequest\022\020\n\010" - + "group_id\030\001 \002(\t\022\021\n\tserver_id\030\002 \002(\t\022\017\n\007ent" - + "ries\030\003 \003(\014\022\017\n\007peer_id\030\004 \001(\t\"`\n\021ReadIndex" - + "Response\022\r\n\005index\030\001 \002(\003\022\017\n\007success\030\002 \002(\010" - + "\022+\n\rerrorResponse\030c \001(\0132\024.jraft.ErrorRes" - + "ponseB(\n\031com.alipay.sofa.jraft.rpcB\013RpcR" + "equests" }; + + "\n\007peer_id\030\003 \002(\t\"\323\001\n\024AppendEntriesRequest" + + "\022\020\n\010group_id\030\001 \002(\t\022\021\n\tserver_id\030\002 \002(\t\022\017\n" + + "\007peer_id\030\003 \002(\t\022\014\n\004term\030\004 \002(\003\022\025\n\rprev_log" + + "_term\030\005 \002(\003\022\026\n\016prev_log_index\030\006 \002(\003\022!\n\007e" + + "ntries\030\007 \003(\0132\020.jraft.EntryMeta\022\027\n\017commit" + + "ted_index\030\010 \002(\003\022\014\n\004data\030\t \001(\014\"{\n\025AppendE" + + "ntriesResponse\022\014\n\004term\030\001 \002(\003\022\017\n\007success\030" + + "\002 \002(\010\022\026\n\016last_log_index\030\003 \001(\003\022+\n\rerrorRe" + + "sponse\030c \001(\0132\024.jraft.ErrorResponse\"i\n\016Ge" + + "tFileRequest\022\021\n\treader_id\030\001 \002(\003\022\020\n\010filen" + + "ame\030\002 \002(\t\022\r\n\005count\030\003 \002(\003\022\016\n\006offset\030\004 \002(\003" + + "\022\023\n\013read_partly\030\005 \001(\010\"l\n\017GetFileResponse" + + "\022\013\n\003eof\030\001 \002(\010\022\014\n\004data\030\002 \002(\014\022\021\n\tread_size" + + "\030\003 \001(\003\022+\n\rerrorResponse\030c \001(\0132\024.jraft.Er" + + "rorResponse\"Y\n\020ReadIndexRequest\022\020\n\010group" + + "_id\030\001 \002(\t\022\021\n\tserver_id\030\002 \002(\t\022\017\n\007entries\030" + + "\003 \003(\014\022\017\n\007peer_id\030\004 \001(\t\"`\n\021ReadIndexRespo" + + "nse\022\r\n\005index\030\001 \002(\003\022\017\n\007success\030\002 \002(\010\022+\n\re" + + "rrorResponse\030c \001(\0132\024.jraft.ErrorResponse" + + "B(\n\031com.alipay.sofa.jraft.rpcB\013RpcReques" + "ts" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; @@ -15011,7 +14792,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protob internal_static_jraft_AppendEntriesRequestHeader_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_jraft_AppendEntriesRequestHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_jraft_AppendEntriesRequestHeader_descriptor, new java.lang.String[] { "GroupId", - "ServerId", "PeerId", "ErrorResponse", }); + "ServerId", "PeerId", }); internal_static_jraft_AppendEntriesRequest_descriptor = getDescriptor().getMessageTypes().get(9); internal_static_jraft_AppendEntriesRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_jraft_AppendEntriesRequest_descriptor, new java.lang.String[] { "GroupId", "ServerId", diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java index bba557f6f..55389a3c0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.jraft.rpc.impl; +import java.util.concurrent.Executor; + import com.alipay.remoting.AsyncContext; import com.alipay.remoting.BizContext; import com.alipay.remoting.ConnectionEventType; @@ -129,6 +131,11 @@ public ExecutorSelector getExecutorSelector() { } return realSelector::select; } + + @Override + public Executor getExecutor() { + return processor.executor(); + } }); } diff --git a/jraft-core/src/main/resources/raft.desc b/jraft-core/src/main/resources/raft.desc index 547a72432..6b0a544c0 100644 Binary files a/jraft-core/src/main/resources/raft.desc and b/jraft-core/src/main/resources/raft.desc differ diff --git a/jraft-core/src/main/resources/rpc.proto b/jraft-core/src/main/resources/rpc.proto index 86274f2fb..87544ac55 100644 --- a/jraft-core/src/main/resources/rpc.proto +++ b/jraft-core/src/main/resources/rpc.proto @@ -63,7 +63,6 @@ message AppendEntriesRequestHeader { required string group_id = 1; required string server_id = 2; required string peer_id = 3; - optional ErrorResponse errorResponse = 99; }; message AppendEntriesRequest { diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/ConnectionInterceptor.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/ConnectionInterceptor.java new file mode 100644 index 000000000..cfae0db60 --- /dev/null +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/ConnectionInterceptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rpc.impl; + +import java.util.List; + +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.internal.ServerStream; +import io.grpc.internal.ServerStreamHelper; +import io.grpc.netty.shaded.io.grpc.netty.NettyConnectionHelper; + +import com.alipay.sofa.jraft.rpc.Connection; + +/** + * Intercepting incoming calls to get {@link Connection} and attach to current {@link Context} + * before that are dispatched by {@link ServerCallHandler}. + * + * @author jiachun.fjc + */ +public class ConnectionInterceptor implements ServerInterceptor { + + static final Context.Key STREAM = Context.key("current-stream"); + + @Override + public ServerCall.Listener interceptCall(final ServerCall call, + final Metadata headers, + final ServerCallHandler next) { + Context ctx = Context.current(); + final ServerStream stream = ServerStreamHelper.getServerStream(call); + if (stream != null) { + ctx = ctx.withValue(STREAM, stream); + } + return Contexts.interceptCall(ctx, call, headers, next); + } + + public static Connection getCurrentConnection(final List listeners) { + final ServerStream stream = ConnectionInterceptor.STREAM.get(); + if (stream != null) { + return NettyConnectionHelper.getOrCreateConnection(stream, listeners); + } + return null; + } +} diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java index 14cb4df06..3895e02af 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java @@ -17,7 +17,11 @@ package com.alipay.sofa.jraft.rpc.impl; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import io.grpc.CallOptions; import io.grpc.Channel; @@ -34,6 +38,7 @@ import com.alipay.sofa.jraft.ReplicatorGroup; import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.InvokeTimeoutException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.InvokeCallback; @@ -41,6 +46,7 @@ import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.Utils; import com.google.protobuf.Message; /** @@ -51,9 +57,9 @@ */ public class GrpcClient implements RpcClient { - private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); + private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); - private final Map managedChannels = new ConcurrentHashMap<>(); + private final Map managedChannelPool = new ConcurrentHashMap<>(); private final Map parserClasses; private final MarshallerRegistry marshallerRegistry; private volatile ReplicatorGroup replicatorGroup; @@ -71,32 +77,19 @@ public boolean init(final RpcOptions opts) { @Override public void shutdown() { - for (final Map.Entry entry : this.managedChannels.entrySet()) { - final ManagedChannel ch = entry.getValue(); - LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch); - ManagedChannelHelper.shutdownAndAwaitTermination(ch); - } + closeAllChannels(); } @Override public boolean checkConnection(final Endpoint endpoint) { Requires.requireNonNull(endpoint, "endpoint"); - final ManagedChannel ch = this.managedChannels.get(endpoint); - if (ch == null) { - return false; - } - final ConnectivityState st = ch.getState(true); - return st == ConnectivityState.CONNECTING || st == ConnectivityState.READY || st == ConnectivityState.IDLE; + return checkChannel(endpoint); } @Override public void closeConnection(final Endpoint endpoint) { Requires.requireNonNull(endpoint, "endpoint"); - final ManagedChannel ch = this.managedChannels.remove(endpoint); - LOG.info("Close connection: {}, {}.", endpoint, ch); - if (ch != null) { - ManagedChannelHelper.shutdownAndAwaitTermination(ch); - } + closeChannel(endpoint); } @Override @@ -107,13 +100,31 @@ public void registerConnectEventListener(final ReplicatorGroup replicatorGroup) @Override public Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx, final long timeoutMs) throws RemotingException { - Requires.requireNonNull(endpoint, "endpoint"); - Requires.requireNonNull(request, "request"); - final Channel ch = getChannel(endpoint); - final MethodDescriptor method = getCallMethod(request); + final CompletableFuture future = new CompletableFuture<>(); + invokeAsync(endpoint, request, ctx, new InvokeCallback() { + + @Override + public void complete(final Object result, final Throwable err) { + if (err == null) { + future.complete(result); + } else { + future.completeExceptionally(err); + } + } + + @Override + public Executor executor() { + return null; + } + }, timeoutMs); + try { - return ClientCalls.blockingUnaryCall(ch, method, CallOptions.DEFAULT, (Message) request); + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + future.cancel(true); + throw new InvokeTimeoutException(e); } catch (final Throwable t) { + future.cancel(true); throw new RemotingException(t); } } @@ -126,24 +137,34 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv final Channel ch = getChannel(endpoint); final MethodDescriptor method = getCallMethod(request); - ClientCalls.asyncUnaryCall(ch.newCall(method, CallOptions.DEFAULT), (Message) request, - new StreamObserver() { + final CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS); + final Executor executor = callback.executor(); - @Override - public void onNext(final Message value) { + ClientCalls.asyncUnaryCall(ch.newCall(method, callOpts), (Message) request, new StreamObserver() { + + @Override + public void onNext(final Message value) { + if (executor == null) { callback.complete(value, null); + } else { + executor.execute(() -> callback.complete(value, null)); } + } - @Override - public void onError(final Throwable throwable) { + @Override + public void onError(final Throwable throwable) { + if (executor == null) { callback.complete(null, throwable); + } else { + executor.execute(() -> callback.complete(null, throwable)); } + } - @Override - public void onCompleted() { - // TODO ? - } - }); + @Override + public void onCompleted() { + // NO-OP + } + }); } private MethodDescriptor getCallMethod(final Object request) { @@ -152,8 +173,7 @@ private MethodDescriptor getCallMethod(final Object request) { + interest); return MethodDescriptor // . newBuilder() // - .setType(MethodDescriptor.MethodType.UNARY) - // + .setType(MethodDescriptor.MethodType.UNARY) // .setFullMethodName(MethodDescriptor.generateFullMethodName(interest, GrpcRaftRpcFactory.FIXED_METHOD_NAME)) // .setRequestMarshaller(ProtoUtils.marshaller(reqIns)) // .setResponseMarshaller( @@ -162,32 +182,53 @@ private MethodDescriptor getCallMethod(final Object request) { } private Channel getChannel(final Endpoint endpoint) { - ManagedChannel ch = this.managedChannels.get(endpoint); - if (ch == null) { - final ManagedChannel newCh = ManagedChannelBuilder.forTarget(endpoint.toString()) // - .usePlaintext() // - .build(); - ch = this.managedChannels.putIfAbsent(endpoint, newCh); - if (ch == null) { - ch = newCh; - // channel connection event - ch.notifyWhenStateChanged(ConnectivityState.READY, () -> { - final ReplicatorGroup rpGroup = replicatorGroup; - if (rpGroup != null) { + return this.managedChannelPool.computeIfAbsent(endpoint, ep -> { + final ManagedChannel ch = ManagedChannelBuilder.forAddress(ep.getIp(), ep.getPort()) // + .usePlaintext() // + .directExecutor() // + .build(); + // channel connection event + ch.notifyWhenStateChanged(ConnectivityState.READY, () -> { + final ReplicatorGroup rpGroup = replicatorGroup; + if (rpGroup != null) { + Utils.runInThread(() -> { final PeerId peer = new PeerId(); - if (peer.parse(endpoint.toString())) { + if (peer.parse(ep.toString())) { LOG.info("Peer {} is connected.", peer); rpGroup.checkReplicator(peer, true); } else { - LOG.error("Fail to parse peer: {}.", endpoint); + LOG.error("Fail to parse peer: {}.", ep); } - } - }); - } else { - ManagedChannelHelper.shutdownAndAwaitTermination(newCh, 100); - } + }); + } + }); + + return ch; + }); + } + + private void closeAllChannels() { + for (final Map.Entry entry : this.managedChannelPool.entrySet()) { + final ManagedChannel ch = entry.getValue(); + LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch); + ManagedChannelHelper.shutdownAndAwaitTermination(ch); } + } + + private void closeChannel(final Endpoint endpoint) { + final ManagedChannel ch = this.managedChannelPool.remove(endpoint); + LOG.info("Close connection: {}, {}.", endpoint, ch); + if (ch != null) { + ManagedChannelHelper.shutdownAndAwaitTermination(ch); + } + } - return ch; + private boolean checkChannel(final Endpoint endpoint) { + final ManagedChannel ch = this.managedChannelPool.get(endpoint); + if (ch == null) { + return false; + } + final ConnectivityState st = ch.getState(true); + return st == ConnectivityState.CONNECTING || st == ConnectivityState.READY || st == ConnectivityState.IDLE; } } diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcRaftRpcFactory.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcRaftRpcFactory.java index 28989a303..9934dbb34 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcRaftRpcFactory.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcRaftRpcFactory.java @@ -16,20 +16,21 @@ */ package com.alipay.sofa.jraft.rpc.impl; -import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import io.grpc.Server; -import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.ServerBuilder; import io.grpc.util.MutableHandlerRegistry; import com.alipay.sofa.jraft.rpc.RaftRpcFactory; import com.alipay.sofa.jraft.rpc.RpcClient; +import com.alipay.sofa.jraft.rpc.RpcResponseFactory; import com.alipay.sofa.jraft.rpc.RpcServer; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.SPI; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; import com.google.protobuf.Message; /** @@ -39,24 +40,29 @@ @SPI(priority = 1) public class GrpcRaftRpcFactory implements RaftRpcFactory { - public static String FIXED_METHOD_NAME = "_call"; + static final String FIXED_METHOD_NAME = "_call"; + static final int RPC_SERVER_PROCESSOR_POOL_SIZE = SystemPropertyUtil.getInt( + "grpc.default_rpc_server_processor_pool_size", + 100); - private final Map parserClasses = new ConcurrentHashMap<>(); - private final MarshallerRegistry defaultMarshallerRegistry = new MarshallerRegistry() { + static final RpcResponseFactory RESPONSE_FACTORY = new GrpcResponseFactory(); - @Override - public Message findResponseInstanceByRequest(final String reqCls) { - return MarshallerHelper - .findRespInstance(reqCls); - } + final Map parserClasses = new ConcurrentHashMap<>(); + final MarshallerRegistry defaultMarshallerRegistry = new MarshallerRegistry() { - @Override - public void registerResponseInstance(final String reqCls, - final Message respIns) { - MarshallerHelper.registerRespInstance(reqCls, - respIns); - } - }; + @Override + public Message findResponseInstanceByRequest(final String reqCls) { + return MarshallerHelper + .findRespInstance(reqCls); + } + + @Override + public void registerResponseInstance(final String reqCls, + final Message respIns) { + MarshallerHelper.registerRespInstance( + reqCls, respIns); + } + }; @Override public void registerProtobufSerializer(final String className, final Object... args) { @@ -77,9 +83,9 @@ public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper 0 && port < 0xFFFF, "port out of range:" + port); final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry(); - final Server server = NettyServerBuilder // - .forAddress(new InetSocketAddress(endpoint.getIp(), endpoint.getPort())) // + final Server server = ServerBuilder.forPort(port) // .fallbackHandlerRegistry(handlerRegistry) // + .directExecutor() // .build(); final RpcServer rpcServer = new GrpcServer(server, handlerRegistry, this.parserClasses, getMarshallerRegistry()); if (helper != null) { @@ -88,10 +94,14 @@ public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper parserClasses; - private final MarshallerRegistry marshallerRegistry; - private final RemoteAddressInterceptor remoteAddressInterceptor = new RemoteAddressInterceptor(); - private final AtomicBoolean started = new AtomicBoolean(false); + private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class); + + private static final String EXECUTOR_NAME = "grpc-default-executor"; + + private final Server server; + private final MutableHandlerRegistry handlerRegistry; + private final Map parserClasses; + private final MarshallerRegistry marshallerRegistry; + private final List serverInterceptors = new CopyOnWriteArrayList<>(); + private final List closedEventListeners = new CopyOnWriteArrayList<>(); + private final AtomicBoolean started = new AtomicBoolean(false); + + private ExecutorService defaultExecutor; public GrpcServer(Server server, MutableHandlerRegistry handlerRegistry, Map parserClasses, MarshallerRegistry marshallerRegistry) { @@ -59,6 +80,7 @@ public GrpcServer(Server server, MutableHandlerRegistry handlerRegistry, Map()) // + .threadFactory(new NamedThreadFactory(EXECUTOR_NAME + "-", true)) // + .rejectedHandler((r, executor) -> { + throw new RejectedExecutionException("[" + EXECUTOR_NAME + "], task " + r.toString() + + " rejected from " + + executor.toString()); + }) + .build(); + try { this.server.start(); } catch (final IOException e) { @@ -79,12 +117,13 @@ public void shutdown() { if (!this.started.compareAndSet(true, false)) { return; } + ExecutorServiceHelper.shutdownAndAwaitTermination(this.defaultExecutor); GrpcServerHelper.shutdownAndAwaitTermination(this.server); } @Override public void registerConnectionClosedEventListener(final ConnectionClosedEventListener listener) { - // NO-OP + this.closedEventListeners.add(listener); } @SuppressWarnings("unchecked") @@ -103,36 +142,68 @@ public void registerProcessor(final RpcProcessor processor) { final ServerCallHandler handler = ServerCalls.asyncUnaryCall( (request, responseObserver) -> { - final SocketAddress remoteAddress = RemoteAddressInterceptor.REMOTE_ADDRESS.get(); + final SocketAddress remoteAddress = RemoteAddressInterceptor.getRemoteAddress(); + final Connection conn = ConnectionInterceptor.getCurrentConnection(this.closedEventListeners); final RpcContext rpcCtx = new RpcContext() { @Override public void sendResponse(final Object responseObj) { - responseObserver.onNext((Message) responseObj); - responseObserver.onCompleted(); + try { + responseObserver.onNext((Message) responseObj); + responseObserver.onCompleted(); + } catch (final Throwable t) { + LOG.warn("[GRPC] failed to send response: {}.", t); + } } @Override public Connection getConnection() { - throw new UnsupportedOperationException("unsupported"); + if (conn == null) { + throw new IllegalStateException("fail to get connection"); + } + return conn; } @Override public String getRemoteAddress() { + // Rely on GRPC's capabilities, not magic (netty channel) return remoteAddress != null ? remoteAddress.toString() : null; } }; - processor.handleRequest(rpcCtx, request); + final RpcProcessor.ExecutorSelector selector = processor.executorSelector(); + Executor executor; + if (selector != null && request instanceof RpcRequests.AppendEntriesRequest) { + final RpcRequests.AppendEntriesRequest req = (RpcRequests.AppendEntriesRequest) request; + final RpcRequests.AppendEntriesRequestHeader.Builder header = RpcRequests.AppendEntriesRequestHeader // + .newBuilder() // + .setGroupId(req.getGroupId()) // + .setPeerId(req.getPeerId()) // + .setServerId(req.getServerId()); + executor = selector.select(interest, header.build()); + } else { + executor = processor.executor(); + } + + if (executor == null) { + executor = this.defaultExecutor; + } + + if (executor != null) { + executor.execute(() -> processor.handleRequest(rpcCtx, request)); + } else { + processor.handleRequest(rpcCtx, request); + } }); final ServerServiceDefinition serviceDef = ServerServiceDefinition // - .builder(processor.interest()) // + .builder(interest) // .addMethod(method, handler) // .build(); - this.handlerRegistry.addService(ServerInterceptors.intercept(serviceDef, this.remoteAddressInterceptor)); + this.handlerRegistry + .addService(ServerInterceptors.intercept(serviceDef, this.serverInterceptors.toArray(new ServerInterceptor[0]))); } @Override @@ -140,6 +211,10 @@ public int boundPort() { return this.server.getPort(); } + public void setDefaultExecutor(ExecutorService defaultExecutor) { + this.defaultExecutor = defaultExecutor; + } + public Server getServer() { return server; } @@ -147,4 +222,13 @@ public Server getServer() { public MutableHandlerRegistry getHandlerRegistry() { return handlerRegistry; } + + public boolean addServerInterceptor(final ServerInterceptor interceptor) { + return this.serverInterceptors.add(interceptor); + } + + private void registerDefaultServerInterceptor() { + this.serverInterceptors.add(new RemoteAddressInterceptor()); + this.serverInterceptors.add(new ConnectionInterceptor()); + } } diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/RemoteAddressInterceptor.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/RemoteAddressInterceptor.java index 3e6bfbd00..46b7817f9 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/RemoteAddressInterceptor.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/RemoteAddressInterceptor.java @@ -33,14 +33,18 @@ */ public class RemoteAddressInterceptor implements ServerInterceptor { - static final Context.Key REMOTE_ADDRESS = Context.key("remote-address"); + private static final Context.Key REMOTE_ADDRESS = Context.key("remote-address"); @Override - public ServerCall.Listener interceptCall(final ServerCall serverCall, - final Metadata metadata, - final ServerCallHandler serverCallHandler) { + public ServerCall.Listener interceptCall(final ServerCall call, + final Metadata headers, + final ServerCallHandler next) { final Context ctx = Context.current() // - .withValue(REMOTE_ADDRESS, serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); - return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler); + .withValue(REMOTE_ADDRESS, call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + return Contexts.interceptCall(ctx, call, headers, next); + } + + public static SocketAddress getRemoteAddress() { + return REMOTE_ADDRESS.get(); } } diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/io/grpc/internal/ServerStreamHelper.java b/jraft-extension/rpc-grpc-impl/src/main/java/io/grpc/internal/ServerStreamHelper.java new file mode 100644 index 000000000..73abb4173 --- /dev/null +++ b/jraft-extension/rpc-grpc-impl/src/main/java/io/grpc/internal/ServerStreamHelper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.grpc.internal; + +import io.grpc.ServerCall; + +import com.alipay.sofa.jraft.util.internal.ReferenceFieldUpdater; +import com.alipay.sofa.jraft.util.internal.Updaters; + +/** + * Get grpc's server stream. + * + * @author jiachun.fjc + */ +public class ServerStreamHelper { + + private static final ReferenceFieldUpdater, ServerStream> STREAM_GETTER = Updaters + .newReferenceFieldUpdater( + ServerCallImpl.class, + "stream"); + + public static ServerStream getServerStream(final ServerCall call) { + if (call instanceof ServerCallImpl) { + return STREAM_GETTER.get((ServerCallImpl) call); + } + return null; + } +} diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/io/grpc/netty/shaded/io/grpc/netty/NettyConnectionHelper.java b/jraft-extension/rpc-grpc-impl/src/main/java/io/grpc/netty/shaded/io/grpc/netty/NettyConnectionHelper.java new file mode 100644 index 000000000..1e70bfeea --- /dev/null +++ b/jraft-extension/rpc-grpc-impl/src/main/java/io/grpc/netty/shaded/io/grpc/netty/NettyConnectionHelper.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.grpc.netty.shaded.io.grpc.netty; + +import java.util.List; + +import io.grpc.internal.ServerStream; +import io.grpc.netty.shaded.io.netty.channel.Channel; +import io.grpc.netty.shaded.io.netty.util.Attribute; +import io.grpc.netty.shaded.io.netty.util.AttributeKey; + +import com.alipay.sofa.jraft.rpc.Connection; +import com.alipay.sofa.jraft.rpc.impl.ConnectionClosedEventListener; +import com.alipay.sofa.jraft.util.internal.ReferenceFieldUpdater; +import com.alipay.sofa.jraft.util.internal.Updaters; + +/** + * Get netty channel. + * + * @author jiachun.fjc + */ +public class NettyConnectionHelper { + + private static final ReferenceFieldUpdater CHANNEL_GETTER = Updaters + .newReferenceFieldUpdater( + NettyServerStream.class, + "channel"); + + private static final AttributeKey NETTY_CONN_KEY = AttributeKey + .valueOf("netty.conn"); + + public static Connection getOrCreateConnection(final ServerStream stream, + final List listeners) { + if (stream instanceof NettyServerStream) { + return attachChannel(CHANNEL_GETTER.get((NettyServerStream) stream), listeners); + } + return null; + } + + private static Connection attachChannel(final Channel channel, final List listeners) { + if (channel == null) { + return null; + } + + final Attribute attr = channel.attr(NETTY_CONN_KEY); + NettyConnection conn = attr.get(); + if (conn == null) { + final NettyConnection newConn = new NettyConnection(channel); + conn = attr.setIfAbsent(newConn); + if (conn == null) { + conn = newConn; + for (final ConnectionClosedEventListener l : listeners) { + conn.addClosedEventListener(l); + } + } + } + + return conn; + } +} + +class NettyConnection implements Connection { + + private final Channel ch; + + NettyConnection(Channel ch) { + this.ch = ch; + } + + @Override + public Object getAttribute(final String key) { + return this.ch.attr(AttributeKey.valueOf(key)).get(); + } + + @Override + public void setAttribute(final String key, final Object value) { + this.ch.attr(AttributeKey.valueOf(key)).set(value); + } + + @Override + public void close() { + this.ch.close(); + } + + void addClosedEventListener(final ConnectionClosedEventListener listener) { + this.ch.closeFuture() // + .addListener(future -> listener.onClosed(this.ch.remoteAddress().toString(), NettyConnection.this)); + } +} diff --git a/jraft-extension/rpc-grpc-impl/src/test/resources/log4j2.xml b/jraft-extension/rpc-grpc-impl/src/test/resources/log4j2.xml index 0d4541357..eaa5a7599 100644 --- a/jraft-extension/rpc-grpc-impl/src/test/resources/log4j2.xml +++ b/jraft-extension/rpc-grpc-impl/src/test/resources/log4j2.xml @@ -17,7 +17,7 @@ - + diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java index 5751ca6cc..9a57ac96c 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java @@ -383,6 +383,10 @@ public static String opName(byte op) { return "RESET_SEQUENCE"; case RANGE_SPLIT: return "RANGE_SPLIT"; + case DELETE_LIST: + return "DELETE_LIST"; + case CONTAINS_KEY: + return "CONTAINS_KEY"; case REVERSE_SCAN: return "REVERSE_SCAN"; default: