Skip to content

Commit

Permalink
Fix bug in HTTP/2 outbound flow control
Browse files Browse the repository at this point in the history
Motivation:

The outbound flow controller logic does not properly reset the allocated
bytes between successive invocations of the priority algorithm.

Modifications:

Updated the priority algorithm to reset the allocated bytes for each
stream.

Result:

Each call to the priority algorithm now starts with zero allocated bytes
for each stream.
  • Loading branch information
nmittler committed Nov 13, 2014
1 parent d220afa commit f23f3b9
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 326 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.netty.util.collection.IntObjectMap;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -388,8 +387,7 @@ public final int numChildren() {

@Override
public final Collection<? extends Http2Stream> children() {
DefaultStream[] childrenArray = children.values(DefaultStream.class);
return Arrays.asList(childrenArray);
return children.values();
}

@Override
Expand Down Expand Up @@ -421,10 +419,10 @@ public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusi
if (newParent != parent() || exclusive) {
List<ParentChangedEvent> events = null;
if (newParent.isDescendantOf(this)) {
events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children().size() : 0));
events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.numChildren(): 0));
parent.takeChild(newParent, false, events);
} else {
events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children().size() : 0));
events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.numChildren() : 0));
}
newParent.takeChild(this, exclusive, events);
notifyParentChanged(events);
Expand Down Expand Up @@ -563,7 +561,7 @@ final void takeChild(DefaultStream child, boolean exclusive, List<ParentChangedE
// move any previous children to the child node, becoming grand children
// of this node.
if (!children.isEmpty()) {
for (DefaultStream grandchild : removeAllChildren().values(DefaultStream.class)) {
for (DefaultStream grandchild : removeAllChildren().values()) {
child.takeChild(grandchild, false, events);
}
}
Expand All @@ -590,7 +588,7 @@ final void removeChild(DefaultStream child) {
totalChildWeights -= child.weight();

// Move up any grand children to be directly dependent on this node.
for (DefaultStream grandchild : child.children.values(DefaultStream.class)) {
for (DefaultStream grandchild : child.children.values()) {
takeChild(grandchild, false, events);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;

import java.util.ArrayDeque;

Expand Down Expand Up @@ -268,26 +267,20 @@ public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int str
// There were previous DATA frames sent. We need to send the HEADERS only after the most
// recent DATA frame to keep them in sync...

// Wrap the original promise in an aggregate which will complete the original promise
// once the headers are written.
final ChannelPromiseAggregator aggregatePromise = new ChannelPromiseAggregator(promise);
final ChannelPromise innerPromise = ctx.newPromise();
aggregatePromise.add(innerPromise);

// Only write the HEADERS frame after the previous DATA frame has been written.
final Http2Stream theStream = stream;
lastDataWrite.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The DATA write failed, also fail this write.
innerPromise.setFailure(future.cause());
promise.setFailure(future.cause());
return;
}

// Perform the write.
writeHeaders(ctx, theStream, headers, streamDependency, weight, exclusive, padding,
endOfStream, innerPromise);
endOfStream, promise);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/
public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5;

/**
* The default maximum connection size used as a limit when the number of active streams is
* large. Set to 2 MiB.
*/
public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2;

/**
* A value for the window update ratio to be use in order to disable window updates for
* a stream (i.e. {@code 0}).
Expand All @@ -41,6 +47,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro

private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;

public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
Expand All @@ -59,6 +66,14 @@ public void streamAdded(Http2Stream stream) {
});
}

public DefaultHttp2InboundFlowController setMaxConnectionWindowSize(int maxConnectionWindowSize) {
if (maxConnectionWindowSize <= 0) {
throw new IllegalArgumentException("maxConnectionWindowSize must be > 0");
}
this.maxConnectionWindowSize = maxConnectionWindowSize;
return this;
}

@Override
public void initialInboundWindowSize(int newWindowSize) throws Http2Exception {
int deltaWindowSize = newWindowSize - initialWindowSize;
Expand Down Expand Up @@ -114,7 +129,6 @@ public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, in
int dataLength = data.readableBytes() + padding;
boolean windowUpdateSent = false;
try {
// Apply the connection-level flow control.
windowUpdateSent = applyConnectionFlowControl(ctx, dataLength);

// Apply the stream-level flow control.
Expand Down Expand Up @@ -214,6 +228,25 @@ void setWindowUpdateRatio(double ratio) {
windowUpdateRatio = ratio;
}

/**
* Returns the initial size of this window.
*/
int initialWindowSize() {
int maxWindowSize = initialWindowSize;
if (streamId == CONNECTION_STREAM_ID) {
// Determine the maximum number of streams that we can allow without integer overflow
// of maxWindowSize * numStreams. Also take care to avoid division by zero when
// maxWindowSize == 0.
int maxNumStreams = Integer.MAX_VALUE;
if (maxWindowSize > 0) {
maxNumStreams /= maxWindowSize;
}
int numStreams = Math.min(maxNumStreams, Math.max(1, connection.numActiveStreams()));
maxWindowSize = Math.min(maxConnectionWindowSize, maxWindowSize * numStreams);
}
return maxWindowSize;
}

/**
* Updates the flow control window for this stream if it is appropriate.
*
Expand All @@ -224,7 +257,7 @@ boolean updateWindowIfAppropriate(ChannelHandlerContext ctx) throws Http2Excepti
return false;
}

int threshold = (int) (initialWindowSize * windowUpdateRatio);
int threshold = (int) (initialWindowSize() * windowUpdateRatio);
if (window <= threshold) {
updateWindow(ctx);
return true;
Expand Down Expand Up @@ -290,10 +323,8 @@ void updatedInitialWindowSize(int delta) throws Http2Exception {
*/
void updateWindow(ChannelHandlerContext ctx) throws Http2Exception {
// Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialWindowSize - window;
int deltaWindowSize = initialWindowSize() - window;
addAndGet(deltaWindowSize);

// Send a window update for the stream/connection.
frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise());
}
}
Expand Down
Loading

0 comments on commit f23f3b9

Please sign in to comment.