Skip to content

Commit

Permalink
Fix/error response on future (sofastack#513)
Browse files Browse the repository at this point in the history
* forget to set error_response

* forget to set error_response

* forget to set error_response

* forget to set error_response
  • Loading branch information
fengjiachun authored Oct 5, 2020
1 parent c2f2a7f commit c0daf8d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,33 +216,35 @@ public void complete(final Object result, final Throwable err) {

if (err == null) {
Status status = Status.OK();
Message msg;
if (result instanceof ErrorResponse) {
status = handleErrorResponse((ErrorResponse) result);
msg = (Message) result;
} else if (result instanceof Message) {
final Descriptors.FieldDescriptor fd = ((Message) result).getDescriptorForType() //
.findFieldByNumber(RpcResponseFactory.ERROR_RESPONSE_NUM);
if (fd != null && ((Message) result).hasField(fd)) {
final ErrorResponse eResp = (ErrorResponse) ((Message) result).getField(fd);
status = handleErrorResponse(eResp);
msg = eResp;
} else {
if (done != null) {
done.setResponse((T) result);
}
msg = (T) result;
}
} else {
if (done != null) {
done.setResponse((T) result);
}
msg = (T) result;
}
if (done != null) {
try {
if (status.isOk()) {
done.setResponse((T) msg);
}
done.run(status);
} catch (final Throwable t) {
LOG.error("Fail to run RpcResponseClosure, the request is {}.", request, t);
}
}
if (!future.isDone()) {
future.setResult((Message) result);
future.setResult(msg);
}
} else {
if (done != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,41 @@ public void testInvokeWithDoneOnException() throws Exception {
assertNotNull(done.status);
assertEquals(RaftError.ETIMEDOUT.getNumber(), done.status.getCode());
}

@Test
public void testInvokeWithDOneOnErrorResponse() throws Exception {
final InvokeContext invokeCtx = new InvokeContext();
invokeCtx.put(InvokeContext.CRC_SWITCH, false);
final ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
final CliRequests.GetPeersRequest request = CliRequests.GetPeersRequest.newBuilder() //
.setGroupId("id") //
.setLeaderId("127.0.0.1:8001") //
.build();

MockRpcResponseClosure<ErrorResponse> done = new MockRpcResponseClosure<>();
Future<Message> future = this.clientService.invokeWithDone(this.endpoint, request, invokeCtx, done, -1);
Mockito.verify(this.rpcClient).invokeAsync(eq(this.endpoint), eq(request), eq(invokeCtx),
callbackArg.capture(), eq((long) this.rpcOptions.getRpcDefaultTimeout()));
InvokeCallback cb = callbackArg.getValue();
assertNotNull(cb);
assertNotNull(future);

assertNull(done.getResponse());
assertNull(done.status);
assertFalse(future.isDone());

final Message resp = this.rpcResponseFactory.newResponse(CliRequests.GetPeersResponse.getDefaultInstance(),
new Status(-1, "failed"));
cb.complete(resp, null);

final Message msg = future.get();

assertTrue(msg instanceof ErrorResponse);
assertEquals(((ErrorResponse) msg).getErrorMsg(), "failed");

done.latch.await();
assertNotNull(done.status);
assertTrue(!done.status.isOk());
assertEquals(done.status.getErrorMsg(), "failed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,41 @@ public void testInvokeWithDoneOnException() throws Exception {
assertNotNull(done.status);
assertEquals(RaftError.ETIMEDOUT.getNumber(), done.status.getCode());
}

@Test
public void testInvokeWithDOneOnErrorResponse() throws Exception {
final InvokeContext invokeCtx = new InvokeContext();
invokeCtx.put(InvokeContext.CRC_SWITCH, false);
final ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
final CliRequests.GetPeersRequest request = CliRequests.GetPeersRequest.newBuilder() //
.setGroupId("id") //
.setLeaderId("127.0.0.1:8001") //
.build();

MockRpcResponseClosure<ErrorResponse> done = new MockRpcResponseClosure<>();
Future<Message> future = this.clientService.invokeWithDone(this.endpoint, request, invokeCtx, done, -1);
Mockito.verify(this.rpcClient).invokeAsync(eq(this.endpoint), eq(request), eq(invokeCtx),
callbackArg.capture(), eq((long) this.rpcOptions.getRpcDefaultTimeout()));
InvokeCallback cb = callbackArg.getValue();
assertNotNull(cb);
assertNotNull(future);

assertNull(done.getResponse());
assertNull(done.status);
assertFalse(future.isDone());

final Message resp = this.rpcResponseFactory.newResponse(CliRequests.GetPeersResponse.getDefaultInstance(),
new Status(-1, "failed"));
cb.complete(resp, null);

final Message msg = future.get();

assertTrue(msg instanceof ErrorResponse);
assertEquals(((ErrorResponse) msg).getErrorMsg(), "failed");

done.latch.await();
assertNotNull(done.status);
assertTrue(!done.status.isOk());
assertEquals(done.status.getErrorMsg(), "failed");
}
}

0 comments on commit c0daf8d

Please sign in to comment.