Skip to content

Commit

Permalink
[netty#2254] Correctly handle Channel.read() and ChannelHandlerContex…
Browse files Browse the repository at this point in the history
…t.read()

This includes also when it is called from channelRead(...) and channelReadComplete(...) methods.
  • Loading branch information
Norman Maurer committed Feb 22, 2014
1 parent 60b830b commit 52535a1
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected void doRegister() throws Exception {
protected abstract AbstractEpollUnsafe newUnsafe();

protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;

/**
* Called once EPOLLIN event is ready to be processed
Expand All @@ -142,6 +143,13 @@ void epollRdHupReady() {
// NOOP
}

@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}

@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public EventLoopGroup childEventLoopGroup() {
}

final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {

@Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations
Expand All @@ -99,35 +100,41 @@ void epollInReady() {
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this,
childEventLoopGroup().next(), socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
readPending = false;
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this,
childEventLoopGroup().next(), socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();

} catch (Throwable t) {
exception = t;
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete();

if (exception != null) {
pipeline.fireExceptionCaught(exception);
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ private void closeOnRead(ChannelPipeline pipeline) {
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
Expand Down Expand Up @@ -602,6 +603,7 @@ void epollInReady() {
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

Expand All @@ -620,13 +622,6 @@ void epollInReady() {
break;
}
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
clearEpollIn();
}

pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);

Expand All @@ -646,6 +641,16 @@ public void run() {
}
});
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,6 @@ protected AbstractNioUnsafe newUnsafe() {
private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle;

private void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}

private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
Expand All @@ -84,19 +69,15 @@ private void closeOnRead(ChannelPipeline pipeline) {
}
}

private void handleReadException(ChannelPipeline pipeline, ChannelConfig config,
private void handleReadException(ChannelPipeline pipeline,
ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
if (!config.isAutoRead()) {
removeReadOp();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
Expand Down Expand Up @@ -132,7 +113,7 @@ public void read() {
close = localReadAmount < 0;
break;
}

readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

Expand All @@ -155,12 +136,6 @@ public void read() {
break;
}
} while (++ messages < maxMessagesPerRead);
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}

pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
Expand All @@ -170,7 +145,17 @@ public void read() {
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, config, byteBuf, t, close);
handleReadException(pipeline, byteBuf, t, close);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
removeReadOp();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ public interface NioUnsafe extends Unsafe {

protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {

protected boolean readPending;

protected final void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}

@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}

@Override
public SelectableChannel ch() {
return javaChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

private void removeReadOp() {
SelectionKey key = selectionKey();
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}

@Override
public void read() {
assert eventLoop().inEventLoop();
Expand All @@ -65,58 +56,63 @@ public void read() {
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;

try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

// stop reading and remove op
if (!config.isAutoRead()) {
break;
}

if (readBuf.size() >= maxMessagesPerRead) {
break;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

// stop reading and remove op
if (!config.isAutoRead()) {
break;
}

if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
readPending = false;
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
readBuf.clear();
pipeline.fireChannelReadComplete();

readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}

if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
pipeline.fireExceptionCaught(exception);
}

pipeline.fireExceptionCaught(exception);
}

if (closed) {
if (isOpen()) {
close(voidPromise());
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
removeReadOp();
}
}
}
Expand Down

0 comments on commit 52535a1

Please sign in to comment.