Skip to content

Commit

Permalink
let the failed closure ran in callback executor (sofastack#505)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Sep 17, 2020
1 parent 33a4a65 commit 91ebdb1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
27 changes: 25 additions & 2 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.jraft.rpc;

import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -83,7 +84,7 @@ public static Future<?> runClosureInThread(final Closure done) {
}

/**
* Run a task in thread pool,returns the future object.
* Run a task in thread pool, returns the future object.
*/
public static Future<?> runInThread(final Runnable runnable) {
return RPC_CLOSURE_EXECUTOR.submit(runnable);
Expand All @@ -101,7 +102,29 @@ public static Future<?> runClosureInThread(final Closure done, final Status stat
try {
done.run(status);
} catch (final Throwable t) {
LOG.error("Fail to run done closure", t);
LOG.error("Fail to run done closure.", t);
}
});
}

/**
* Run closure with status in specified executor
*/
public static void runClosureInExecutor(final Executor executor, final Closure done, final Status status) {
if (done == null) {
return;
}

if (executor == null) {
runClosureInThread(done, status);
return;
}

executor.execute(() -> {
try {
done.run(status);
} catch (final Throwable t) {
LOG.error("Fail to run done closure.", t);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,13 @@ public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoin
final Executor rpcExecutor) {
final RpcClient rc = this.rpcClient;
final FutureImpl<Message> future = new FutureImpl<>();
final Executor currExecutor = rpcExecutor != null ? rpcExecutor : this.rpcExecutor;
try {
if (rc == null) {
future.failure(new IllegalStateException("Client service is uninitialized."));
// should be in another thread to avoid dead locking.
RpcUtils.runClosureInThread(done, new Status(RaftError.EINTERNAL, "Client service is uninitialized."));
RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
"Client service is uninitialized."));
return future;
}

Expand Down Expand Up @@ -259,19 +261,20 @@ public void complete(final Object result, final Throwable err) {

@Override
public Executor executor() {
return rpcExecutor != null ? rpcExecutor : AbstractClientService.this.rpcExecutor;
return currExecutor;
}
}, timeoutMs <= 0 ? this.rpcOptions.getRpcDefaultTimeout() : timeoutMs);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
future.failure(e);
// should be in another thread to avoid dead locking.
Utils.runClosureInThread(done, new Status(RaftError.EINTR, "Sending rpc was interrupted"));
RpcUtils.runClosureInExecutor(currExecutor, done,
new Status(RaftError.EINTR, "Sending rpc was interrupted"));
} catch (final RemotingException e) {
future.failure(e);
// should be in another thread to avoid dead locking.
Utils.runClosureInThread(done,
new Status(RaftError.EINTERNAL, "Fail to send a RPC request:" + e.getMessage()));
RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
"Fail to send a RPC request:" + e.getMessage()));

}

Expand Down

0 comments on commit 91ebdb1

Please sign in to comment.