Skip to content

Commit

Permalink
Cleaning up code for determining whether the HTTP/2 connection header
Browse files Browse the repository at this point in the history
and initial settings frame were sent.

Motivation:

The current logic for marking the connection header and initial settings
frame is a bit complicated.

Modifications:

- Http2FrameEncoder, Http2ConnectionHandler: removed sent listeners and
set the sent flag immediately.

Result:

Just a code cleanup ... behavior is the same.
  • Loading branch information
nmittler authored and Norman Maurer committed Apr 15, 2014
1 parent 58423a4 commit a992e7f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
Expand Down Expand Up @@ -110,6 +109,21 @@ public Http2ConnectionHandler(final Http2Connection connection,
this.outboundFlow = outboundFlow;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the initial settings frame to the remote
// endpoint.
sendInitialSettings(ctx);
super.handlerAdded(ctx);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// This handler was just added to the context. In case it was handled after
// the connection became active, send the initial settings frame now.
sendInitialSettings(ctx);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// Avoid NotYetConnectedException
Expand Down Expand Up @@ -145,11 +159,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
@Override
public void channelRead(ChannelHandlerContext ctx, Object inMsg) throws Exception {
try {
if (inMsg == CONNECTION_PREFACE) {
// The connection preface has been received from the remote endpoint, we're
// beginning an HTTP2 connection. Send the initial settings to the remote endpoint.
sendInitialSettings(ctx);
} else if (inMsg instanceof Http2DataFrame) {
if (inMsg instanceof Http2DataFrame) {
handleInboundData(ctx, (Http2DataFrame) inMsg);
} else if (inMsg instanceof Http2HeadersFrame) {
handleInboundHeaders(ctx, (Http2HeadersFrame) inMsg);
Expand Down Expand Up @@ -181,12 +191,6 @@ public void channelRead(ChannelHandlerContext ctx, Object inMsg) throws Exceptio
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
try {
if (!initialSettingsReceived) {
throw protocolError(
"Attempting to send frame (%s) before initial settings received", msg
.getClass().getName());
}

if (msg instanceof Http2DataFrame) {
handleOutboundData(ctx, (Http2DataFrame) msg, promise);
} else if (msg instanceof Http2HeadersFrame) {
Expand Down Expand Up @@ -649,25 +653,25 @@ private void verifyInitialSettingsReceived() throws Http2Exception {
/**
* Sends the initial settings frame upon establishment of the connection, if not already sent.
*/
private void sendInitialSettings(ChannelHandlerContext ctx) throws Http2Exception {
if (initialSettingsSent) {
throw protocolError("Already sent initial settings.");
}

// Create and send the frame to the remote endpoint.
DefaultHttp2SettingsFrame frame =
new DefaultHttp2SettingsFrame.Builder()
.setInitialWindowSize(inboundFlow.getInitialInboundWindowSize())
.setMaxConcurrentStreams(connection.remote().getMaxStreams())
.setPushEnabled(connection.local().isPushToAllowed()).build();

ctx.writeAndFlush(frame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
initialSettingsSent = true;
private void sendInitialSettings(final ChannelHandlerContext ctx) throws Http2Exception {
if (!initialSettingsSent && ctx.channel().isActive()) {
initialSettingsSent = true;

// Create and send the frame to the remote endpoint.
DefaultHttp2SettingsFrame frame =
new DefaultHttp2SettingsFrame.Builder()
.setInitialWindowSize(inboundFlow.getInitialInboundWindowSize())
.setMaxConcurrentStreams(connection.remote().getMaxStreams())
.setPushEnabled(connection.local().isPushToAllowed()).build();
ctx.writeAndFlush(frame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess() && ctx.channel().isOpen()) {
// The write failed, close the connection.
ctx.close();
}
}
}
});
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static io.netty.handler.codec.http2.draft10.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.draft10.Http2Exception.format;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.FRAME_HEADER_LENGTH;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.FRAME_LENGTH_MASK;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.connectionPrefaceBuf;
Expand Down Expand Up @@ -125,10 +124,6 @@ private void processHttp2Preface(ChannelHandlerContext ctx, ByteBuf in) throws H
return;
}

// Fire the connection preface to notify the connection handler that it should send the
// initial settings frame.
ctx.fireChannelRead(CONNECTION_PREFACE);

// Start processing the first header.
state = State.FRAME_HEADER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.netty.handler.codec.http2.draft10.frame.encoder;

import static io.netty.handler.codec.http2.draft10.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.connectionPrefaceBuf;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
Expand All @@ -34,7 +33,6 @@
public class Http2FrameEncoder extends MessageToByteEncoder<Http2Frame> {

private final Http2FrameMarshaller frameMarshaller;
private ChannelFutureListener prefaceWriteListener;
private boolean prefaceWritten;

public Http2FrameEncoder() {
Expand Down Expand Up @@ -68,12 +66,6 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
protected void encode(ChannelHandlerContext ctx, Http2Frame frame, ByteBuf out)
throws Exception {
try {
if (!prefaceWritten) {
throw protocolError(
"Attempting to send frame before connection preface written: %s", frame
.getClass().getName());
}

frameMarshaller.marshall(frame, out, ctx.alloc());
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
Expand All @@ -84,20 +76,17 @@ protected void encode(ChannelHandlerContext ctx, Http2Frame frame, ByteBuf out)
* Sends the HTTP2 connection preface to the remote endpoint, if not already sent.
*/
private void sendPreface(final ChannelHandlerContext ctx) {
if (!prefaceWritten && prefaceWriteListener == null && ctx.channel().isActive()) {
prefaceWriteListener = new ChannelFutureListener() {
if (!prefaceWritten && ctx.channel().isActive()) {
prefaceWritten = true;
ctx.writeAndFlush(connectionPrefaceBuf()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
prefaceWritten = true;
prefaceWriteListener = null;
} else if (ctx.channel().isOpen()) {
if (!future.isSuccess() && ctx.channel().isOpen()) {
// The write failed, close the connection.
ctx.close();
}
}
};
ctx.writeAndFlush(connectionPrefaceBuf()).addListener(prefaceWriteListener);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.netty.handler.codec.http2.draft10.frame;

import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertTrue;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -114,8 +113,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
});

// Wait a bit and verify that the connection was closed.
assertTrue(serverHandler.awaitClose());
assertTrue(clientHandler.awaitClose());
serverHandler.awaitClose();
clientHandler.awaitClose();
}

@Test
Expand All @@ -127,24 +126,21 @@ protected void initChannel(Channel ch) throws Exception {
p.addLast("codec", new Http2FrameCodec());
p.addLast("wrongFrameGenerator", new ChannelHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == CONNECTION_PREFACE) {
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
Http2PingFrame frame =
new DefaultHttp2PingFrame.Builder().setData(buf).build();
ctx.writeAndFlush(frame);
} else {
super.channelRead(ctx, msg);
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
Http2PingFrame frame =
new DefaultHttp2PingFrame.Builder().setData(buf).build();
ctx.writeAndFlush(frame);
super.channelActive(ctx);
}
});
p.addLast("handler", clientHandler);
}
});

// Wait a bit and verify that the connection was closed.
assertTrue(serverHandler.awaitClose());
assertTrue(clientHandler.awaitClose());
serverHandler.awaitClose();
clientHandler.awaitClose();
}

@Test
Expand Down Expand Up @@ -181,29 +177,23 @@ private Channel createClientChannel(ChannelHandler handler) {
private static class CaptureHandler extends ChannelHandlerAdapter {
final DefaultPromise<Http2SettingsFrame> settings;
final DefaultPromise<Http2SettingsFrame> settingsAck;
final DefaultPromise<Void> initFuture;
Channel channel;
final DefaultPromise<Void> closeFuture;
//Channel channel;

CaptureHandler(EventExecutor executor) {
settings = new DefaultPromise<Http2SettingsFrame>(executor);
settingsAck = new DefaultPromise<Http2SettingsFrame>(executor);
initFuture = new DefaultPromise<Void>(executor);
closeFuture = new DefaultPromise<Void>(executor);
}

public boolean awaitClose() throws Exception {
initFuture.await();
for (int i = 0; channel.isOpen() && i < 5; ++i) {
Thread.sleep(10);
}
return !channel.isOpen();
public void awaitClose() throws Exception {
closeFuture.await(5000);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
initFuture.setSuccess(null);

super.channelActive(ctx);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
closeFuture.setSuccess(null);
super.channelInactive(ctx);
}

@Override
Expand Down

0 comments on commit a992e7f

Please sign in to comment.