Skip to content

Commit

Permalink
[Bugfix]修复Connect-GroupDescription解析失败的问题(didi#1010)
Browse files Browse the repository at this point in the history
1、先尝试使用IncrementalCooperativeConnectProtocol协议进行解析;
2、IncrementalCooperativeConnectProtocol协议解析失败后,再维持原先的情况,使用ConnectProtocol协议进行解析;
  • Loading branch information
ZQKC committed May 16, 2023
1 parent e975932 commit 2256e8b
Showing 1 changed file with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.slf4j.Logger;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -1342,19 +1344,7 @@ void handleResponse(AbstractResponse abstractResponse) {
memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>());
}
} else {
ConnectProtocol.Assignment assignment = null;
if (groupMember.memberAssignment().length > 0) {
assignment = ConnectProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
}

ConnectProtocol.WorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = ConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
}

memberBaseAssignment = new KSMemberConnectAssignment(assignment, workerState);
memberBaseAssignment = deserializeConnectGroupDataCompatibility(groupMember);
}

memberDescriptions.add(new KSMemberDescription(
Expand Down Expand Up @@ -1383,6 +1373,36 @@ void handleFailure(Throwable throwable) {
};
}

private KSMemberBaseAssignment deserializeConnectGroupDataCompatibility(DescribedGroupMember groupMember) {
try {
// 高版本的反序列化方式
ExtendedWorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = IncrementalCooperativeConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));

return new KSMemberConnectAssignment(workerState.assignment(), workerState);
}
} catch (Exception e) {
// ignore
}

// 低版本的反序列化方式
ConnectProtocol.Assignment assignment = null;
if (groupMember.memberAssignment().length > 0) {
assignment = ConnectProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
}

ConnectProtocol.WorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = ConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
}

return new KSMemberConnectAssignment(assignment, workerState);
}


private Set<AclOperation> validAclOperations(final int authorizedOperations) {
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
Expand Down

0 comments on commit 2256e8b

Please sign in to comment.