diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java index 29f96b3de..cbab1b373 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executor; import com.alipay.remoting.ConnectionEventType; +import com.alipay.remoting.RejectedExecutionPolicy; import com.alipay.remoting.Url; import com.alipay.remoting.config.switches.GlobalSwitch; import com.alipay.remoting.rpc.RpcAddressParser; @@ -41,13 +42,14 @@ */ public class BoltRpcClient implements RpcClient { - public static final String BOLT_ADDRESS_PARSER = "BOLT_ADDRESS_PARSER"; - public static final String BOLT_CTX = "BOLT_CTX"; + public static final String BOLT_ADDRESS_PARSER = "BOLT_ADDRESS_PARSER"; + public static final String BOLT_CTX = "BOLT_CTX"; + public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY"; private final com.alipay.remoting.rpc.RpcClient rpcClient; private com.alipay.remoting.InvokeContext defaultInvokeCtx; - private RpcAddressParser defaultAddressParser = new RpcAddressParser(); + private RpcAddressParser defaultAddressParser = new RpcAddressParser(); public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) { this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient"); @@ -106,7 +108,7 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv final RpcAddressParser addressParser = getAddressParser(ctx); try { final Url url = addressParser.parse(endpoint.toString()); - this.rpcClient.invokeWithCallback(url, request, getBoltInvokeCtx(ctx), getBoltCallback(callback), + this.rpcClient.invokeWithCallback(url, request, getBoltInvokeCtx(ctx), getBoltCallback(callback, ctx), (int) timeoutMs); } catch (final com.alipay.remoting.rpc.exception.InvokeTimeoutException e) { throw new InvokeTimeoutException(e); @@ -140,6 +142,11 @@ private RpcAddressParser getAddressParser(final InvokeContext ctx) { this.defaultAddressParser); } + private RejectedExecutionPolicy getRejectedPolicy(final InvokeContext ctx) { + return ctx == null ? RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION : ctx.getOrDefault( + BOLT_REJECTED_EXECUTION_POLICY, RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION); + } + private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext ctx) { if (ctx == null) { return this.defaultInvokeCtx; @@ -161,16 +168,19 @@ private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext c return boltCtx; } - private BoltCallback getBoltCallback(final InvokeCallback callback) { - return new BoltCallback(callback); + private BoltCallback getBoltCallback(final InvokeCallback callback, final InvokeContext ctx) { + Requires.requireNonNull(callback, "callback"); + return new BoltCallback(callback, getRejectedPolicy(ctx)); } - private static class BoltCallback implements com.alipay.remoting.InvokeCallback { + private static class BoltCallback implements com.alipay.remoting.RejectionProcessableInvokeCallback { - private final InvokeCallback callback; + private final InvokeCallback callback; + private final RejectedExecutionPolicy rejectedPolicy; - private BoltCallback(final InvokeCallback callback) { + private BoltCallback(final InvokeCallback callback, final RejectedExecutionPolicy rejectedPolicy) { this.callback = callback; + this.rejectedPolicy = rejectedPolicy; } @Override @@ -187,5 +197,10 @@ public void onException(final Throwable err) { public Executor getExecutor() { return this.callback.executor(); } + + @Override + public RejectedExecutionPolicy rejectedExecutionPolicy() { + return this.rejectedPolicy; + } } }