Skip to content

Commit

Permalink
Set readPending to false when ever a read is done
Browse files Browse the repository at this point in the history
Motivation:
readPending is currently only set to false if data is delivered to the application, however this may result in duplicate events being received from the selector in the event that the socket was closed.

Modifications:
- We should set readPending to false before each read attempt for all
transports besides NIO.
- Based upon the Javadocs it is possible that NIO may have spurious
wakeups [1]. In this case we should be more cautious and only set
readPending to false if data was actually read.

[1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SelectionKey.html
That a selection key's ready set indicates that its channel is ready for some operation category is a hint, but not a guarantee, that an operation in such a category may be performed by a thread without causing the thread to block.

Result:
Notification from the selector (or simulated events from kqueue/epoll ET) in the event of socket closure.
Fixes netty#7255
  • Loading branch information
Scottmitch committed Oct 25, 2017
1 parent 424bb09 commit 413c7c2
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,6 @@ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Thro
EpollRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
Expand Down Expand Up @@ -784,6 +783,7 @@ void epollInReady() {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
readPending = false;
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
Expand All @@ -793,7 +793,6 @@ void epollInReady() {
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
readPending = false;
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
Expand All @@ -535,7 +536,6 @@ void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

Expand Down Expand Up @@ -572,7 +572,6 @@ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Thro
KQueueRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ public final void read() {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// Based upon the Javadocs it is possible that NIO may have spurious wake ups [1]. In this
// case we should be more cautious and only set readPending to false if data was actually
// read.
// [1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SelectionKey.html
// That a selection key's ready set indicates that its channel is ready for some operation
// category is a hint, but not a guarantee, that an operation in such a category may be
// performed by a thread without causing the thread to block.
readPending = false;
}
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Thro
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
Expand All @@ -102,9 +101,6 @@ protected void doRead() {
// during the same read loop readPending was set to false.
return;
}
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
Expand All @@ -117,6 +113,7 @@ protected void doRead() {
try {
byteBuf = allocHandle.allocate(allocator);
do {
readPending = false;
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
Expand All @@ -140,7 +137,6 @@ protected void doRead() {
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = allocHandle.allocate(allocator);
} else {
Expand All @@ -158,7 +154,6 @@ protected void doRead() {
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use
// it because allocHandle.continueReading() returned false.
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
Expand Down

0 comments on commit 413c7c2

Please sign in to comment.