Skip to content

Commit

Permalink
ByteToMessageDecoder Cumulator improments (netty#9877)
Browse files Browse the repository at this point in the history
Motivation:
ByteToMessageDecoder's default MERGE_CUMULATOR will allocate a new buffer and
copy if the refCnt() of the cumulation is > 1. However this is overly
conservative because we maybe able to avoid allocate/copy if the current
cumulation can accommodate the input buffer without a reallocation. Also when the
reallocation and copy does occur the new buffer is sized just large enough to
accommodate the current the current amount of data. If some data remains in the
cumulation after decode this will require a new allocation/copy when more data
arrives.

Modifications:
- Use maxFastWritableBytes to avoid allocation/copy if the current buffer can
  accommodate the input data without a reallocation operation.
- Use ByteBufAllocator#calculateNewCapacity(..) to get the size of the buffer
  when a reallocation/copy operation is necessary.

Result:
ByteToMessageDecoder MERGE_CUMULATOR won't allocate/copy if the cumulation
buffer can accommodate data without a reallocation, and when a reallocation
occurs we are more likely to leave additional space for future data in an effort
to reduce overall reallocations.
  • Loading branch information
Scottmitch authored Dec 13, 2019
1 parent e7091c0 commit ee70272
Showing 1 changed file with 27 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.netty.handler.codec;

import static io.netty.util.internal.ObjectUtil.checkPositive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
Expand All @@ -30,6 +28,9 @@

import java.util.List;

import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Integer.MAX_VALUE;

/**
* {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other Message type.
Expand Down Expand Up @@ -80,20 +81,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
cumulation = expandCumulation(alloc, cumulation, in);
} else {
cumulation.writeBytes(in);
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
return cumulation;
return cumulation.writeBytes(in);
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
Expand All @@ -110,7 +108,6 @@ public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
try {
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
Expand All @@ -119,20 +116,18 @@ public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in);
return expandCumulation(alloc, cumulation, in);
}
final CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
in = null;
buffer = composite;
composite = alloc.compositeBuffer(MAX_VALUE);
composite.addComponent(true, cumulation);
}
return buffer;
composite.addComponent(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if
Expand Down Expand Up @@ -362,7 +357,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
super.userEventTriggered(ctx, evt);
}

private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
CodecOutputList out = CodecOutputList.newInstance();
try {
channelInputClosed(ctx, out);
Expand Down Expand Up @@ -522,8 +517,9 @@ protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> ou
}
}

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes());
private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(
oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
newCumulation.writeBytes(oldCumulation);
Expand Down

0 comments on commit ee70272

Please sign in to comment.