Skip to content

Commit

Permalink
Use a Thread-local based direct buffer pool if non pooled allocator i…
Browse files Browse the repository at this point in the history
…s used
  • Loading branch information
Norman Maurer committed Sep 5, 2013
1 parent 95576d6 commit 6716dca
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -45,6 +48,13 @@ public final class ChannelOutboundBuffer {

private static final int INITIAL_CAPACITY = 32;

private static final int threadLocalDirectBufferSize;

static {
threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
}

private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override
protected ChannelOutboundBuffer newObject(Handle handle) {
Expand Down Expand Up @@ -224,10 +234,44 @@ private static long total(Object msg) {
}

public Object current() {
return current(true);
}

public Object current(boolean preferDirect) {
if (isEmpty()) {
return null;
} else {
return buffer[flushed].msg;
// TODO: Think of a smart way to handle ByteBufHolder messages
Object msg = buffer[flushed].msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}

// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
current(directBuf);
return directBuf;
}
}
return msg;
}
}

Expand Down Expand Up @@ -339,7 +383,7 @@ public ByteBuffer[] nioBuffers() {
if (nioBufferCount + count > nioBuffers.length) {
this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount);
}
if (buf.isDirect() || !alloc.isDirectBufferPooled()) {
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (buf.nioBufferCount() == 1) {
nioBuffers[nioBufferCount ++] = buf.internalNioBuffer(readerIndex, readableBytes);
} else {
Expand Down Expand Up @@ -371,7 +415,12 @@ private static int fillBufferArray(ByteBuf buf, ByteBuffer[] nioBuffers, int nio

private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes,
ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, readerIndex, readableBytes);
buf.release();
entry.msg = directBuf;
Expand Down Expand Up @@ -543,4 +592,35 @@ public void clear() {
}
}

static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
private final Recycler.Handle handle;

private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
@Override
protected ThreadLocalPooledByteBuf newObject(Handle handle) {
return new ThreadLocalPooledByteBuf(handle);
}
};

private ThreadLocalPooledByteBuf(Recycler.Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}

static ThreadLocalPooledByteBuf newInstance() {
ThreadLocalPooledByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}

@Override
protected void deallocate() {
if (capacity() > threadLocalDirectBufferSize) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ protected AbstractUnsafe newUnsafe() {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
Object msg = in.current(false);
if (msg == null) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
msgsCopy[i] = ReferenceCountUtil.retain(in.current(false));
in.remove();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;

for (;;) {
Object msg = in.current();
Object msg = in.current(true);
if (msg == null) {
// Wrote all messages.
clearOpWrite();
Expand All @@ -159,16 +159,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
in.remove();
continue;
}
if (!buf.isDirect()) {
ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we rely on JDK's direct buffer pool.
buf = alloc.directBuffer(readableBytes).writeBytes(buf);
in.current(buf);
}
}

boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected void doRead() {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
Object msg = in.current(false);

if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected int doReadMessages(List<Object> buf) throws Exception {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
final Object o = in.current();
final Object o = in.current(false);
if (o == null) {
break;
}
Expand Down

0 comments on commit 6716dca

Please sign in to comment.