Skip to content

Commit

Permalink
[INLONG-2922][SDK] Fix NPE in syncSendMessage to send message (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
baomingyu authored Mar 5, 2022
1 parent 6eb25e3 commit 5e46b65
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private SendResult syncSendInternalMessage(NettyClient client,
EncodeObject encodeObject, String msgUUID,
long timeout, TimeUnit timeUnit)
throws ExecutionException, InterruptedException, TimeoutException {
client = clientMgr.getClientByRoundRobin();

if (client == null) {
return SendResult.NO_CONNECTION;
}
Expand Down Expand Up @@ -220,7 +220,7 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID,
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(), encodeObject.getStreamId(),
Utils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt());
NettyClient client = null;
NettyClient client = clientMgr.getClientByRoundRobin();
SendResult message = null;
try {
message = syncSendInternalMessage(client, encodeObject, msgUUID, timeout, timeUnit);
Expand Down Expand Up @@ -259,7 +259,9 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID,
syncCallables.remove(encodeObject.getMessageId());
return SendResult.UNKOWN_ERROR;
}
scanThread.resetTimeoutChannel(client.getChannel());
if (client != null) {
scanThread.resetTimeoutChannel(client.getChannel());
}
if (message == SendResult.OK) {
metricWorker.recordSuccessByMessageId(encodeObject.getMessageId());
}
Expand Down

0 comments on commit 5e46b65

Please sign in to comment.