Skip to content

Commit

Permalink
Add ChannelConfig.maxMessagesPerRead and ChannelOption.MAX_MESSAGES_P…
Browse files Browse the repository at this point in the history
…ER_READ

- Fixes netty#1486
- Make sure AbstractNioMessageChannel.NioMessageUnsafe.read() only up to maxMessagesPerRead
  • Loading branch information
trustin committed Jun 25, 2013
1 parent 58b968b commit a1632e7
Show file tree
Hide file tree
Showing 24 changed files with 202 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,26 @@ public int getReadTimeout() {

@Override
public RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (RxtxChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}

@Override
public RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}

@Override
public RxtxChannelConfig setWriteSpinCount(int writeSpinCount) {
return (RxtxChannelConfig) super.setWriteSpinCount(writeSpinCount);
super.setWriteSpinCount(writeSpinCount);
return this;
}

@Override
public RxtxChannelConfig setAllocator(ByteBufAllocator allocator) {
return (RxtxChannelConfig) super.setAllocator(allocator);
super.setAllocator(allocator);
return this;
}

@Override
Expand All @@ -220,16 +229,19 @@ public RxtxChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator)

@Override
public RxtxChannelConfig setAutoRead(boolean autoRead) {
return (RxtxChannelConfig) super.setAutoRead(autoRead);
super.setAutoRead(autoRead);
return this;
}

@Override
public RxtxChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (RxtxChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}

@Override
public RxtxChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (RxtxChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ public static Paritybit valueOf(int value) {
@Override
RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

@Override
RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

@Override
RxtxChannelConfig setWriteSpinCount(int writeSpinCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ public SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (SctpChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
}

@Override
public SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}

@Override
public SctpChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SctpChannelConfig) super.setWriteSpinCount(writeSpinCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,28 @@ public SctpServerChannelConfig setBacklog(int backlog) {
return this;
}

@Override
public SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}

@Override
public SctpServerChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SctpServerChannelConfig) super.setWriteSpinCount(writeSpinCount);
super.setWriteSpinCount(writeSpinCount);
return this;
}

@Override
public SctpServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (SctpServerChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}

@Override
public SctpServerChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SctpServerChannelConfig) super.setAllocator(allocator);
super.setAllocator(allocator);
return this;
}

@Override
Expand All @@ -178,16 +187,19 @@ public SctpServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allo

@Override
public SctpServerChannelConfig setAutoRead(boolean autoRead) {
return (SctpServerChannelConfig) super.setAutoRead(autoRead);
super.setAutoRead(autoRead);
return this;
}

@Override
public SctpServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (SctpServerChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}

@Override
public SctpServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (SctpServerChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public interface SctpChannelConfig extends ChannelConfig {
@Override
SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

@Override
SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

@Override
SctpChannelConfig setWriteSpinCount(int writeSpinCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public interface SctpServerChannelConfig extends ChannelConfig {
*/
SctpServerChannelConfig setInitMaxStreams(InitMaxStreams initMaxStreams);

@Override
SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

@Override
SctpServerChannelConfig setWriteSpinCount(int writeSpinCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ public UdtChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return this;
}

@Override
public UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}

@Override
public UdtChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ public UdtServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis)
return this;
}

@Override
public UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}

@Override
public UdtServerChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public interface UdtChannelConfig extends ChannelConfig {
@Override
UdtChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

@Override
UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

@Override
UdtChannelConfig setWriteSpinCount(int writeSpinCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public interface UdtServerChannelConfig extends UdtChannelConfig {
@Override
UdtServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

@Override
UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

@Override
UdtServerChannelConfig setWriteSpinCount(int writeSpinCount);

Expand Down
16 changes: 16 additions & 0 deletions transport/src/main/java/io/netty/channel/ChannelConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ public interface ChannelConfig {
*/
ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

/**
* Returns the maximum number of messages in a {@link MessageList} of
* a {@link ChannelInboundHandler#messageReceived(ChannelHandlerContext, MessageList) messageReceived()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to fill multiple messages
* into the {@link MessageList}.
*/
int getMaxMessagesPerRead();

/**
* Sets the maximum number of messages in a {@link MessageList} of
* a {@link ChannelInboundHandler#messageReceived(ChannelHandlerContext, MessageList) messageReceived()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to fill multiple messages
* into the {@link MessageList}.
*/
ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
Expand Down
2 changes: 2 additions & 0 deletions transport/src/main/java/io/netty/channel/ChannelOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ChannelOption<T> extends UniqueName {

public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS =
new ChannelOption<Integer>("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ =
new ChannelOption<Integer>("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT =
new ChannelOption<Integer>("WRITE_SPIN_COUNT");
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DefaultChannelConfig implements ChannelConfig {
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;
private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int maxMessagesPerRead = 16;
private volatile int writeSpinCount = 16;
private volatile boolean autoRead = true;
private volatile int writeBufferHighWaterMark = 64 * 1024;
Expand All @@ -53,7 +54,10 @@ public DefaultChannelConfig(Channel channel) {

@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, RCVBUF_ALLOCATOR);
return getOptions(
null,
CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
ALLOCATOR, AUTO_READ, RCVBUF_ALLOCATOR);
}

protected Map<ChannelOption<?>, Object> getOptions(
Expand Down Expand Up @@ -94,6 +98,9 @@ public <T> T getOption(ChannelOption<T> option) {
if (option == CONNECT_TIMEOUT_MILLIS) {
return (T) Integer.valueOf(getConnectTimeoutMillis());
}
if (option == MAX_MESSAGES_PER_READ) {
return (T) Integer.valueOf(getMaxMessagesPerRead());
}
if (option == WRITE_SPIN_COUNT) {
return (T) Integer.valueOf(getWriteSpinCount());
}
Expand All @@ -116,6 +123,8 @@ public <T> boolean setOption(ChannelOption<T> option, T value) {

if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == MAX_MESSAGES_PER_READ) {
setMaxMessagesPerRead((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
Expand Down Expand Up @@ -153,6 +162,20 @@ public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return this;
}

@Override
public int getMaxMessagesPerRead() {
return maxMessagesPerRead;
}

@Override
public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
if (maxMessagesPerRead <= 0) {
throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
}
this.maxMessagesPerRead = maxMessagesPerRead;
return this;
}

@Override
public int getWriteSpinCount() {
return writeSpinCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.netty.channel.nio;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;

Expand Down Expand Up @@ -54,29 +55,32 @@ public void read() {
}
}

final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final boolean autoRead = config.isAutoRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
MessageList<Object> msgBuf = MessageList.newInstance();
Throwable exception = null;
loop: for (;;) {
try {
for (;;) {
int localRead = doReadMessages(msgBuf);
if (localRead == 0) {
break loop;
}
if (localRead < 0) {
closed = true;
break loop;
}
if (!config().isAutoRead()) {
break loop;
}
int readMessages = 0;
try {
for (;;) {
int localRead = doReadMessages(msgBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

readMessages += localRead;
if (readMessages >= maxMessagesPerRead | !autoRead) {
break;
}
} catch (Throwable t) {
exception = t;
break;
}
} catch (Throwable t) {
exception = t;
}

pipeline.fireMessageReceived(msgBuf);
Expand All @@ -86,7 +90,7 @@ public void read() {
closed = true;
}

pipeline().fireExceptionCaught(exception);
pipeline.fireExceptionCaught(exception);
}

if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public interface DatagramChannelConfig extends ChannelConfig {
*/
DatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface);

@Override
DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

@Override
DatagramChannelConfig setWriteSpinCount(int writeSpinCount);

Expand Down
Loading

0 comments on commit a1632e7

Please sign in to comment.