Skip to content

Commit

Permalink
support TCP half-close
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Hazel committed May 2, 2012
1 parent 9f1c394 commit b7a19a0
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/main/java/org/jboss/netty/channel/AbstractChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static Integer allocateId(Channel channel) {
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelCloseFuture closeFuture = new ChannelCloseFuture();
private volatile int interestOps = OP_READ;
private boolean handleHalfClose = false;

/** Cache for the string representation of this channel */
private boolean strValConnected;
Expand Down Expand Up @@ -178,6 +179,14 @@ public boolean isOpen() {
return !closeFuture.isDone();
}

public void setHandleHalfClose(boolean value) {
handleHalfClose = value;
}

public boolean getHandleHalfClose() {
return handleHalfClose;
}

/**
* Marks this channel as closed. This method is intended to be called by
* an internal component - please do not call it unless you know what you
Expand Down Expand Up @@ -211,6 +220,12 @@ public ChannelFuture close() {
return closeFuture;
}

@Override
public ChannelFuture shutdownOutput() {
ChannelFuture returnedFuture = Channels.shutdownOutput(this);
return returnedFuture;
}

@Override
public ChannelFuture getCloseFuture() {
return closeFuture;
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/jboss/netty/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public interface Channel extends Comparable<Channel> {
*/
static int OP_READ_WRITE = OP_READ | OP_WRITE;

static int OP_HALF_CLOSE = 8;

/**
* Returns the unique integer ID of this channel.
*/
Expand Down Expand Up @@ -300,6 +302,8 @@ public interface Channel extends Comparable<Channel> {
*/
ChannelFuture close();

ChannelFuture shutdownOutput();

/**
* Returns the {@link ChannelFuture} which will be notified when this
* channel is closed. This method always returns the same future instance.
Expand Down Expand Up @@ -335,6 +339,10 @@ public interface Channel extends Comparable<Channel> {
*/
boolean isWritable();

void setHandleHalfClose(boolean value);

boolean getHandleHalfClose();

/**
* Changes the {@code interestOps} of this channel asynchronously.
*
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/org/jboss/netty/channel/Channels.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,34 @@ public static void fireChannelClosed(ChannelHandlerContext ctx) {
ctx.getChannel(), ChannelState.OPEN, Boolean.FALSE));
}

/**
* Sends a {@code "channelReadClosed"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*/
public static void fireChannelReadClosed(Channel channel) {
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, "write only"));

// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
}

/**
* Sends a {@code "channelReadClosed"} event to the
* {@link ChannelUpstreamHandler} which is placed in the closest upstream
* from the handler associated with the specified
* {@link ChannelHandlerContext}.
*/
public static void fireChannelReadClosed(ChannelHandlerContext ctx) {
ctx.sendUpstream(
new UpstreamChannelStateEvent(
ctx.getChannel(), ChannelState.OPEN, "write only"));
}

/**
* Sends a {@code "exceptionCaught"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
Expand Down Expand Up @@ -738,12 +766,47 @@ public static void close(
ctx.getChannel(), future, ChannelState.OPEN, Boolean.FALSE));
}

/**
* Sends a {@code "shutdownOutput"} request to the last
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*
* @param channel the channel to shutdownOutput
*
* @return the {@link ChannelFuture} which will be notified on closure
*/
public static ChannelFuture shutdownOutput(Channel channel) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.OPEN, "read only"));
return future;
}

/**
* Sends a {@code "shutdownOutput"} request to the
* {@link ChannelDownstreamHandler} which is placed in the shutdownOutputst
* downstream from the handler associated with the specified
* {@link ChannelHandlerContext}.
*
* @param ctx the context
* @param future the future which will be notified on closure
*/
public static void shutdownOutput(
ChannelHandlerContext ctx, ChannelFuture future) {
ctx.sendDownstream(new DownstreamChannelStateEvent(
ctx.getChannel(), future, ChannelState.OPEN, "read only"));
}

private static void validateInterestOps(int interestOps) {
switch (interestOps) {
case Channel.OP_NONE:
case Channel.OP_READ:
case Channel.OP_WRITE:
case Channel.OP_READ_WRITE:
case Channel.OP_HALF_CLOSE:
case Channel.OP_READ | Channel.OP_HALF_CLOSE:
case Channel.OP_WRITE | Channel.OP_HALF_CLOSE:
case Channel.OP_READ_WRITE | Channel.OP_HALF_CLOSE:
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void eventSunk(
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
} else if ("read only".equals(value)) {
channel.worker.shutdownOutput(channel, future);
}
break;
case BOUND:
Expand Down
17 changes: 16 additions & 1 deletion src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,12 @@ private boolean read(SelectionKey k) {

if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
if (channel.getHandleHalfClose()) {
fireChannelReadClosed(channel);
} else {
close(channel, succeededFuture(channel));
}

return false;
}

Expand Down Expand Up @@ -598,6 +603,16 @@ void close(NioSocketChannel channel, ChannelFuture future) {
}
}

void shutdownOutput(NioSocketChannel channel, ChannelFuture future) {
try {
channel.socket.socket().shutdownOutput();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}

private void cleanUpWriteBuffer(NioSocketChannel channel) {
Exception cause = null;
boolean fireExceptionCaught = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public ChannelFuture close()
return null;
}

@Override
public ChannelFuture shutdownOutput()
{
// TODO Auto-generated method stub
return null;
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress)
{
Expand Down

0 comments on commit b7a19a0

Please sign in to comment.