Skip to content

Commit

Permalink
Always upstream full memcache messages.
Browse files Browse the repository at this point in the history
This changeset is related to netty#2182, which exposes the failure in
the http codec, but the memcache codec works very similar. In addition,
better failure handling in the decoder has been added.
  • Loading branch information
daschl authored and Norman Maurer committed Feb 23, 2014
1 parent 64be9b2 commit 3f53ba2
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package io.netty.handler.codec.memcache.binary;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder;
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
import io.netty.handler.codec.memcache.DefaultMemcacheContent;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import io.netty.handler.codec.memcache.MemcacheContent;
import io.netty.handler.codec.memcache.MemcacheMessage;
import io.netty.util.CharsetUtil;

import java.util.List;
Expand Down Expand Up @@ -71,15 +74,19 @@ protected AbstractBinaryMemcacheDecoder(int chunkSize) {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state) {
case READ_HEADER:
case READ_HEADER: try {
if (in.readableBytes() < 24) {
return;
}
resetDecoder();

currentHeader = decodeHeader(in);
state = State.READ_EXTRAS;
case READ_EXTRAS:
} catch (Exception e) {
out.add(invalidMessage(e));
return;
}
case READ_EXTRAS: try {
byte extrasLength = currentHeader.getExtrasLength();
if (extrasLength > 0) {
if (in.readableBytes() < extrasLength) {
Expand All @@ -90,7 +97,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}

state = State.READ_KEY;
case READ_KEY:
} catch (Exception e) {
out.add(invalidMessage(e));
return;
}
case READ_KEY: try {
short keyLength = currentHeader.getKeyLength();
if (keyLength > 0) {
if (in.readableBytes() < keyLength) {
Expand All @@ -103,8 +114,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

out.add(buildMessage(currentHeader, currentExtras, currentKey));
currentExtras = null;
state = State.READ_VALUE;
case READ_VALUE:
state = State.READ_CONTENT;
} catch (Exception e) {
out.add(invalidMessage(e));
return;
}
case READ_CONTENT: try {
int valueLength = currentHeader.getTotalBodyLength()
- currentHeader.getKeyLength()
- currentHeader.getExtrasLength();
Expand Down Expand Up @@ -142,11 +157,44 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

state = State.READ_HEADER;
return;
} catch (Exception e) {
out.add(invalidChunk(e));
return;
}
case BAD_MESSAGE:
in.skipBytes(actualReadableBytes());
return;
default:
throw new Error("Unknown state reached: " + state);
}
}

/**
* Helper method to create a message indicating a invalid decoding result.
*
* @param cause the cause of the decoding failure.
* @return a valid message indicating failure.
*/
private M invalidMessage(Exception cause) {
state = State.BAD_MESSAGE;
M message = buildInvalidMessage();
message.setDecoderResult(DecoderResult.failure(cause));
return message;
}

/**
* Helper method to create a content chunk indicating a invalid decoding result.
*
* @param cause the cause of the decoding failure.
* @return a valid content chunk indicating failure.
*/
private MemcacheContent invalidChunk(Exception cause) {
state = State.BAD_MESSAGE;
MemcacheContent chunk = new DefaultLastMemcacheContent(Unpooled.EMPTY_BUFFER);
chunk.setDecoderResult(DecoderResult.failure(cause));
return chunk;
}

/**
* When the channel goes inactive, release all frames to prevent data leaks.
*
Expand Down Expand Up @@ -192,6 +240,13 @@ protected void resetDecoder() {
*/
protected abstract M buildMessage(H header, ByteBuf extras, String key);

/**
* Helper method to create a upstream message when the incoming parsing did fail.
*
* @return a message indicating a decoding failure.
*/
protected abstract M buildInvalidMessage();

/**
* Contains all states this decoder can possibly be in.
* <p/>
Expand All @@ -218,7 +273,12 @@ enum State {
/**
* Currently reading the value chunks (optional).
*/
READ_VALUE
READ_CONTENT,

/**
* Something went wrong while decoding the message or chunks.
*/
BAD_MESSAGE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import java.util.List;

/**
* A memcache object aggregator for the binary protocol.
* An object aggregator for the memcache binary protocol.
*
* It aggregates {@link BinaryMemcacheMessage}s and {@link MemcacheContent} into {@link FullBinaryMemcacheRequest}s
* or {@link FullBinaryMemcacheResponse}s.
*/
public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator {

Expand All @@ -50,8 +53,8 @@ protected void decode(ChannelHandlerContext ctx, MemcacheObject msg, List<Object
MemcacheMessage m = (MemcacheMessage) msg;

if (!m.getDecoderResult().isSuccess()) {
out.add(toFullMessage(m));
this.currentMessage = null;
out.add(ReferenceCountUtil.retain(m));
return;
}

Expand Down Expand Up @@ -111,4 +114,34 @@ protected void decode(ChannelHandlerContext ctx, MemcacheObject msg, List<Object
}
}

/**
* Convert a invalid message into a full message.
*
* This method makes sure that upstream handlers always get a full message returned, even
* when invalid chunks are failing.
*
* @param msg the message to transform.
* @return a full message containing parts of the original message.
*/
private static FullMemcacheMessage toFullMessage(final MemcacheMessage msg) {
if (msg instanceof FullMemcacheMessage) {
return ((FullMemcacheMessage) msg).retain();
}

FullMemcacheMessage fullMsg;
if (msg instanceof BinaryMemcacheRequest) {
BinaryMemcacheRequest req = (BinaryMemcacheRequest) msg;
fullMsg = new DefaultFullBinaryMemcacheRequest(req.getHeader(), req.getKey(), req.getExtras(),
Unpooled.EMPTY_BUFFER);
} else if (msg instanceof BinaryMemcacheResponse) {
BinaryMemcacheResponse res = (BinaryMemcacheResponse) msg;
fullMsg = new DefaultFullBinaryMemcacheResponse(res.getHeader(), res.getKey(), res.getExtras(),
Unpooled.EMPTY_BUFFER);
} else {
throw new IllegalStateException();
}

return fullMsg;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.netty.handler.codec.memcache.binary;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

/**
* The decoder part which takes care of decoding the request-specific headers.
Expand Down Expand Up @@ -51,4 +52,12 @@ protected BinaryMemcacheRequest buildMessage(BinaryMemcacheRequestHeader header,
return new DefaultBinaryMemcacheRequest(header, key, extras);
}

@Override
protected BinaryMemcacheRequest buildInvalidMessage() {
return new DefaultBinaryMemcacheRequest(
new DefaultBinaryMemcacheRequestHeader(),
"",
Unpooled.EMPTY_BUFFER
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.netty.handler.codec.memcache.binary;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

/**
* The decoder which takes care of decoding the response headers.
Expand Down Expand Up @@ -51,4 +52,12 @@ protected BinaryMemcacheResponse buildMessage(BinaryMemcacheResponseHeader heade
return new DefaultBinaryMemcacheResponse(header, key, extras);
}

@Override
protected BinaryMemcacheResponse buildInvalidMessage() {
return new DefaultBinaryMemcacheResponse(
new DefaultBinaryMemcacheResponseHeader(),
"",
Unpooled.EMPTY_BUFFER
);
}
}

0 comments on commit 3f53ba2

Please sign in to comment.