diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/InvokeCallback.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/InvokeCallback.java index d760aabf8..88b0ef52a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/InvokeCallback.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/InvokeCallback.java @@ -25,5 +25,7 @@ public interface InvokeCallback { void complete(final Object result, final Throwable err); - Executor executor(); + default Executor executor() { + return null; + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/DirectExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/DirectExecutor.java new file mode 100644 index 000000000..f4c477a6f --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/DirectExecutor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.concurrent.Executor; + +/** + * An executor that direct run command. + * + * @author jiachun.fjc + */ +public enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(final Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "DirectExecutor"; + } +} diff --git a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java index 3895e02af..b5f32cf4c 100644 --- a/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java +++ b/jraft-extension/rpc-grpc-impl/src/main/java/com/alipay/sofa/jraft/rpc/impl/GrpcClient.java @@ -44,6 +44,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeContext; import com.alipay.sofa.jraft.rpc.RpcClient; +import com.alipay.sofa.jraft.util.DirectExecutor; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.Utils; @@ -101,20 +102,12 @@ public void registerConnectEventListener(final ReplicatorGroup replicatorGroup) public Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx, final long timeoutMs) throws RemotingException { final CompletableFuture future = new CompletableFuture<>(); - invokeAsync(endpoint, request, ctx, new InvokeCallback() { - @Override - public void complete(final Object result, final Throwable err) { - if (err == null) { - future.complete(result); - } else { - future.completeExceptionally(err); - } - } - - @Override - public Executor executor() { - return null; + invokeAsync(endpoint, request, ctx, (result, err) -> { + if (err == null) { + future.complete(result); + } else { + future.completeExceptionally(err); } }, timeoutMs); @@ -138,26 +131,18 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv final Channel ch = getChannel(endpoint); final MethodDescriptor method = getCallMethod(request); final CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS); - final Executor executor = callback.executor(); + final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE; ClientCalls.asyncUnaryCall(ch.newCall(method, callOpts), (Message) request, new StreamObserver() { @Override public void onNext(final Message value) { - if (executor == null) { - callback.complete(value, null); - } else { - executor.execute(() -> callback.complete(value, null)); - } + executor.execute(() -> callback.complete(value, null)); } @Override public void onError(final Throwable throwable) { - if (executor == null) { - callback.complete(null, throwable); - } else { - executor.execute(() -> callback.complete(null, throwable)); - } + executor.execute(() -> callback.complete(null, throwable)); } @Override