Skip to content

Commit

Permalink
[FLINK-24213][qs] Use single lock in ServerConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Sep 9, 2021
1 parent b5ac92e commit 2e721ab
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> {
private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);

private final Object connectionLock = new Object();
private final Object connectionLock;

@GuardedBy("connectionLock")
private InternalConnection<REQ, RESP> internalConnection;
Expand All @@ -60,7 +60,8 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody>

private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

private ServerConnection(InternalConnection<REQ, RESP> internalConnection) {
private ServerConnection(Object lock, InternalConnection<REQ, RESP> internalConnection) {
this.connectionLock = lock;
this.internalConnection = internalConnection;
forwardCloseFuture();
}
Expand Down Expand Up @@ -119,11 +120,14 @@ ServerConnection<REQ, RESP> createPendingConnection(
final String clientName,
final MessageSerializer<REQ, RESP> serializer,
final KvStateRequestStats stats) {
final Object lock = new Object();

return new ServerConnection<>(
lock,
new PendingConnection<>(
channel ->
new EstablishedConnection<>(
clientName, serializer, channel, stats)));
lock, clientName, serializer, channel, stats)));
}

interface InternalConnection<REQ, RESP> {
Expand Down Expand Up @@ -280,7 +284,7 @@ public REQ getRequest() {
private static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody>
implements ClientHandlerCallback<RESP>, InternalConnection<REQ, RESP> {

private final Object lock = new Object();
private final Object lock;

/** The actual TCP channel. */
private final Channel channel;
Expand All @@ -307,11 +311,13 @@ private static class EstablishedConnection<REQ extends MessageBody, RESP extends
* @param channel The actual TCP channel
*/
EstablishedConnection(
final Object lock,
final String clientName,
final MessageSerializer<REQ, RESP> serializer,
final Channel channel,
final KvStateRequestStats stats) {

this.lock = lock;
this.channel = Preconditions.checkNotNull(channel);

// Add the client handler with the callback
Expand Down

0 comments on commit 2e721ab

Please sign in to comment.