diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 63804dfe1a5d..9f4cf49f7119 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -22,10 +22,13 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledDirectByteBuf; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -45,6 +48,13 @@ public final class ChannelOutboundBuffer { private static final int INITIAL_CAPACITY = 32; + private static final int threadLocalDirectBufferSize; + + static { + threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); + logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); + } + private static final Recycler RECYCLER = new Recycler() { @Override protected ChannelOutboundBuffer newObject(Handle handle) { @@ -224,10 +234,44 @@ private static long total(Object msg) { } public Object current() { + return current(true); + } + + public Object current(boolean preferDirect) { if (isEmpty()) { return null; } else { - return buffer[flushed].msg; + // TODO: Think of a smart way to handle ByteBufHolder messages + Object msg = buffer[flushed].msg; + if (threadLocalDirectBufferSize <= 0 || !preferDirect) { + return msg; + } + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (buf.isDirect()) { + return buf; + } else { + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + return buf; + } + + // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. + // We can do a better job by using our pooled allocator. If the current allocator does not + // pool a direct buffer, we use a ThreadLocal based pool. + ByteBufAllocator alloc = channel.alloc(); + ByteBuf directBuf; + if (alloc.isDirectBufferPooled()) { + directBuf = alloc.directBuffer(readableBytes); + } else { + directBuf = ThreadLocalPooledByteBuf.newInstance(); + } + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + current(directBuf); + return directBuf; + } + } + return msg; } } @@ -339,7 +383,7 @@ public ByteBuffer[] nioBuffers() { if (nioBufferCount + count > nioBuffers.length) { this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount); } - if (buf.isDirect() || !alloc.isDirectBufferPooled()) { + if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { if (buf.nioBufferCount() == 1) { nioBuffers[nioBufferCount ++] = buf.internalNioBuffer(readerIndex, readableBytes); } else { @@ -371,7 +415,12 @@ private static int fillBufferArray(ByteBuf buf, ByteBuffer[] nioBuffers, int nio private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes, ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { - ByteBuf directBuf = alloc.directBuffer(readableBytes); + ByteBuf directBuf; + if (alloc.isDirectBufferPooled()) { + directBuf = alloc.directBuffer(readableBytes); + } else { + directBuf = ThreadLocalPooledByteBuf.newInstance(); + } directBuf.writeBytes(buf, readerIndex, readableBytes); buf.release(); entry.msg = directBuf; @@ -543,4 +592,35 @@ public void clear() { } } + static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf { + private final Recycler.Handle handle; + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ThreadLocalPooledByteBuf newObject(Handle handle) { + return new ThreadLocalPooledByteBuf(handle); + } + }; + + private ThreadLocalPooledByteBuf(Recycler.Handle handle) { + super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); + this.handle = handle; + } + + static ThreadLocalPooledByteBuf newInstance() { + ThreadLocalPooledByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + return buf; + } + + @Override + protected void deallocate() { + if (capacity() > threadLocalDirectBufferSize) { + super.deallocate(); + } else { + clear(); + RECYCLER.recycle(this, handle); + } + } + } } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 5b01a9c58d46..9732af9cec80 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -320,7 +320,7 @@ protected AbstractUnsafe newUnsafe() { @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { - Object msg = in.current(); + Object msg = in.current(false); if (msg == null) { break; } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 0fa5af4fbeaf..fa73967eb904 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -283,7 +283,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception { // Use a copy because the original msgs will be recycled by AbstractChannel. final Object[] msgsCopy = new Object[in.size()]; for (int i = 0; i < msgsCopy.length; i ++) { - msgsCopy[i] = ReferenceCountUtil.retain(in.current()); + msgsCopy[i] = ReferenceCountUtil.retain(in.current(false)); in.remove(); } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 7674b5dfe40d..ee271e47a72b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -145,7 +145,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; for (;;) { - Object msg = in.current(); + Object msg = in.current(true); if (msg == null) { // Wrote all messages. clearOpWrite(); @@ -159,16 +159,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception { in.remove(); continue; } - if (!buf.isDirect()) { - ByteBufAllocator alloc = alloc(); - if (alloc.isDirectBufferPooled()) { - // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. - // We can do a better job by using our pooled allocator. If the current allocator does not - // pool a direct buffer, we rely on JDK's direct buffer pool. - buf = alloc.directBuffer(readableBytes).writeBytes(buf); - in.current(buf); - } - } + boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 0b1ce33c107c..8ca219752952 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -155,7 +155,7 @@ protected void doRead() { @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { - Object msg = in.current(); + Object msg = in.current(false); if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 4d5c14eccde5..1c9d3ec3bc71 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -235,7 +235,7 @@ protected int doReadMessages(List buf) throws Exception { @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { - final Object o = in.current(); + final Object o = in.current(false); if (o == null) { break; }