Skip to content

Commit

Permalink
Reduce memory copies in HttpContentDecoder and so also the risk of me…
Browse files Browse the repository at this point in the history
…mory leaks
  • Loading branch information
Norman Maurer committed Jul 14, 2013
1 parent ecb215c commit 0393ffb
Showing 1 changed file with 41 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.netty.handler.codec.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
Expand Down Expand Up @@ -108,15 +106,23 @@ protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> ou
headers.set(HttpHeaders.Names.CONTENT_ENCODING, targetContentEncoding);
}

Object[] decoded = decodeContent(message, c);
out.add(message);
decodeContent(c, out);

// Replace the content.
// Replace the content length.
if (headers.contains(HttpHeaders.Names.CONTENT_LENGTH)) {
int contentLength = 0;
int size = out.size();
for (int i = 0; i < size; i++) {
Object o = out.get(i);
if (o instanceof HttpContent) {
contentLength += ((HttpContent) o).content().readableBytes();
}
}
headers.set(
HttpHeaders.Names.CONTENT_LENGTH,
Integer.toString(((ByteBufHolder) decoded[1]).content().readableBytes()));
Integer.toString(contentLength));
}
out.add(decoded);
return;
}

Expand All @@ -129,7 +135,7 @@ protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> ou
}

if (decoder != null) {
out.add(decodeContent(null, c));
decodeContent(c, out);
} else {
if (c instanceof LastHttpContent) {
decodeStarted = false;
Expand All @@ -139,36 +145,17 @@ protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> ou
}
}

private Object[] decodeContent(HttpMessage header, HttpContent c) {
ByteBuf newContent = Unpooled.buffer();
private void decodeContent(HttpContent c, List<Object> out) {
ByteBuf content = c.content();
decode(content, newContent);

decode(content, out);

if (c instanceof LastHttpContent) {
ByteBuf lastProduct = Unpooled.buffer();
finishDecode(lastProduct);
finishDecode(out);

// Generate an additional chunk if the decoder produced
// the last product on closure,
if (lastProduct.isReadable()) {
if (header == null) {
return new Object[] { new DefaultHttpContent(newContent), new DefaultLastHttpContent(lastProduct)};
} else {
return new Object[] { header, new DefaultHttpContent(newContent),
new DefaultLastHttpContent(lastProduct)};
}
} else {
if (header == null) {
return new Object[] { new DefaultLastHttpContent(newContent) };
} else {
return new Object[] { header, new DefaultLastHttpContent(newContent) };
}
}
}
if (header == null) {
return new Object[] { new DefaultHttpContent(newContent) };
} else {
return new Object[] { header, new DefaultHttpContent(newContent) };
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
}
}

Expand Down Expand Up @@ -210,35 +197,46 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

private void cleanup() {
if (decoder != null) {
// Clean-up the previous decoder if not cleaned up correctly.
ByteBuf buf = Unpooled.buffer();
finishDecode(buf);
buf.release();
// Clean-up the previous encoder if not cleaned up correctly.
if (decoder.finish()) {
for (;;) {
ByteBuf buf = (ByteBuf) decoder.readOutbound();
if (buf == null) {
break;
}
// Release the buffer
buf.release();
}
}
decoder = null;
}
}

private void decode(ByteBuf in, ByteBuf out) {
// call retain as it will be release after is written
decoder.writeInbound(in.retain());
private void decode(ByteBuf in, List<Object> out) {
// call retain here as it will call release after its written to the channel
decoder.writeOutbound(in.retain());
fetchDecoderOutput(out);
}

private void finishDecode(ByteBuf out) {
private void finishDecode(List<Object> out) {
if (decoder.finish()) {
fetchDecoderOutput(out);
}
decodeStarted = false;
decoder = null;
}

private void fetchDecoderOutput(ByteBuf out) {
private void fetchDecoderOutput(List<Object> out) {
for (;;) {
ByteBuf buf = (ByteBuf) decoder.readInbound();
ByteBuf buf = (ByteBuf) decoder.readOutbound();
if (buf == null) {
break;
}
out.writeBytes(buf);
buf.release();
if (!buf.isReadable()) {
buf.release();
continue;
}
out.add(new DefaultHttpContent(buf));
}
}
}

0 comments on commit 0393ffb

Please sign in to comment.