Skip to content

Commit

Permalink
FIX: force a read operation for peer instead of self (netty#7454)
Browse files Browse the repository at this point in the history
* FIX: force a read operation for peer instead of self

Motivation:
When A is in `writeInProgress` and call self close, A should
`finishPeerRead` for B(A' peer).

Modifications:
Call `finishPeerRead` with peer in `LocalChannel#doClose`

Result:
Clear confuse of code logic

* FIX: preserves order of close after write in same event loop

Motivation:
If client and server(client's peer channel) are in same event loop, client writes data to
server in `ChannelActive`. Server receives the data and write it
back. The client's read can't be triggered becasue client's
`ChannelActive` is not finished at this point and its `readInProgress`
is false. Then server closes itself, it will also close the client's
channel. And client has no chance to receive the data.

Modifications:
1. Add a test case to demonstrate the problem
2. When `doClose` peer, we always call
`peer.eventLoop().execute()` and `registerInProgress` is not needed.
3. Remove test case
`testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder`. This
test case can't pass becasue of this commit. IMHO, I think it is OK,
becasue it is reasonable that the client flushes the data to socket,
then server close the channel without received the data.
4. For mismatch test in SniClientTest, the client should receive server's alert before closed(caused by server's close)

Result:
The problem is gone.
  • Loading branch information
louxiu authored and Scottmitch committed Dec 8, 2017
1 parent 0cac1a6 commit 805ac00
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 116 deletions.
10 changes: 5 additions & 5 deletions handler/src/test/java/io/netty/handler/ssl/SniClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.junit.Assume;
import org.junit.Test;

import java.nio.channels.ClosedChannelException;
import javax.net.ssl.SSLException;

public class SniClientTest {

Expand Down Expand Up @@ -67,7 +67,7 @@ public void testSniSNIMatcherMatchesClientJdkSslServerJdkSsl() throws Exception
SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, true);
}

@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientJdkSslServerJdkSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, false);
Expand All @@ -80,7 +80,7 @@ public void testSniSNIMatcherMatchesClientOpenSslServerOpenSsl() throws Exceptio
SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.OPENSSL, true);
}

@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientOpenSslServerOpenSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
Assume.assumeTrue(OpenSsl.isAvailable());
Expand All @@ -94,7 +94,7 @@ public void testSniSNIMatcherMatchesClientJdkSslServerOpenSsl() throws Exception
SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.OPENSSL, true);
}

@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientJdkSslServerOpenSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
Assume.assumeTrue(OpenSsl.isAvailable());
Expand All @@ -108,7 +108,7 @@ public void testSniSNIMatcherMatchesClientOpenSslServerJdkSsl() throws Exception
SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.JDK, true);
}

@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientOpenSslServerJdkSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
Assume.assumeTrue(OpenSsl.isAvailable());
Expand Down
54 changes: 22 additions & 32 deletions transport/src/main/java/io/netty/channel/local/LocalChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void run() {
private volatile LocalAddress remoteAddress;
private volatile ChannelPromise connectPromise;
private volatile boolean readInProgress;
private volatile boolean registerInProgress;
private volatile boolean writeInProgress;
private volatile Future<?> finishReadFuture;

Expand Down Expand Up @@ -172,13 +171,8 @@ protected void doRegister() throws Exception {
// See https://github.com/netty/netty/issues/2400
if (peer != null && parent() != null) {
// Store the peer in a local variable as it may be set to null if doClose() is called.
// Because of this we also set registerInProgress to true as we check for this in doClose() and make sure
// we delay the fireChannelInactive() to be fired after the fireChannelActive() and so keep the correct
// order of events.
//
// See https://github.com/netty/netty/issues/2144
final LocalChannel peer = this.peer;
registerInProgress = true;
state = State.CONNECTED;

peer.remoteAddress = parent() == null ? null : parent().localAddress();
Expand All @@ -191,7 +185,6 @@ protected void doRegister() throws Exception {
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
registerInProgress = false;
ChannelPromise promise = peer.connectPromise;

// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
Expand Down Expand Up @@ -237,7 +230,9 @@ protected void doClose() throws Exception {
state = State.CLOSED;

// Preserve order of event and force a read operation now before the close operation is processed.
finishPeerRead(this);
if (writeInProgress && peer != null) {
finishPeerRead(peer);
}

ChannelPromise promise = connectPromise;
if (promise != null) {
Expand All @@ -249,34 +244,29 @@ protected void doClose() throws Exception {

if (peer != null) {
this.peer = null;
// Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777).
// Also check if the registration was not done yet. In this case we submit the close to the EventLoop
// to make sure its run after the registration completes
// (see https://github.com/netty/netty/issues/2144).
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelInActive
// event is triggered *after* this peer's channelInActive event
EventLoop peerEventLoop = peer.eventLoop();
final boolean peerIsActive = peer.isActive();
if (peerEventLoop.inEventLoop() && !registerInProgress) {
peer.tryClose(peerIsActive);
} else {
try {
peerEventLoop.execute(new Runnable() {
@Override
public void run() {
peer.tryClose(peerIsActive);
}
});
} catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.close();
try {
peerEventLoop.execute(new Runnable() {
@Override
public void run() {
peer.tryClose(peerIsActive);
}
PlatformDependent.throwException(cause);
});
} catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.close();
}
PlatformDependent.throwException(cause);
}
}
} finally {
Expand Down
146 changes: 67 additions & 79 deletions transport/src/test/java/io/netty/channel/local/LocalChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,73 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}

@Test
public void testCloseAfterWriteInSameEventLoopPreservesOrder() throws InterruptedException {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(3);
final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]);

try {
cb.group(sharedGroup)
.channel(LocalChannel.class)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(data.retainedDuplicate());
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (data.equals(msg)) {
ReferenceCountUtil.safeRelease(msg);
messageLatch.countDown();
} else {
super.channelRead(ctx, msg);
}
}
});

sb.group(sharedGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (data.equals(msg)) {
messageLatch.countDown();
ctx.writeAndFlush(data);
ctx.close();
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
messageLatch.countDown();
super.channelInactive(ctx);
}
});

Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();

// Connect to the server
cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
assertTrue(messageLatch.await(5, SECONDS));
assertFalse(cc.isOpen());
} finally {
closeChannel(cc);
closeChannel(sc);
}
} finally {
data.release();
}
}

@Test
public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException {
Bootstrap cb = new Bootstrap();
Expand Down Expand Up @@ -620,85 +687,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}

@Test
public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2);
final CountDownLatch serverChannelLatch = new CountDownLatch(1);
final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]);
final AtomicReference<Channel> serverChannelRef = new AtomicReference<Channel>();

try {
cb.group(sharedGroup)
.channel(LocalChannel.class)
.handler(new TestHandler());

sb.group(sharedGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg.equals(data)) {
ReferenceCountUtil.safeRelease(msg);
messageLatch.countDown();
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
messageLatch.countDown();
super.channelInactive(ctx);
}
});
serverChannelRef.set(ch);
serverChannelLatch.countDown();
}
});

Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();

// Connect to the server
cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();

assertTrue(serverChannelLatch.await(5, SECONDS));

final Channel ccCpy = cc;
// Make sure a write operation is executed in the eventloop
cc.pipeline().lastContext().executor().execute(new Runnable() {
@Override
public void run() {
ChannelPromise promise = ccCpy.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
serverChannelRef.get().close();
}
});
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
}
});

assertTrue(messageLatch.await(5, SECONDS));
assertFalse(cc.isOpen());
assertFalse(serverChannelRef.get().isOpen());
} finally {
closeChannel(cc);
closeChannel(sc);
}
} finally {
data.release();
}
}

@Test
public void testWriteWhilePeerIsClosedReleaseObjectAndFailPromise() throws InterruptedException {
Bootstrap cb = new Bootstrap();
Expand Down

0 comments on commit 805ac00

Please sign in to comment.