Skip to content

Commit

Permalink
okhttp: outbound flow control.
Browse files Browse the repository at this point in the history
Fixes grpc#371
  • Loading branch information
Xudong Ma committed May 22, 2015
1 parent e50f671 commit 0d48087
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/io/grpc/transport/AbstractStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected enum Phase {

/**
* The number of bytes currently queued, waiting to be sent. When this falls below
* onReadyThreshold, {@link StreamListener#onReady()} will be called.
*/
private int numSentBytesQueued;

Expand Down Expand Up @@ -158,6 +159,9 @@ public void setOnReadyThreshold(int onReadyThreshold) {
checkArgument(onReadyThreshold > 0, "onReadyThreshold must be > 0");
boolean doNotify;
synchronized (onReadyLock) {
if (this.onReadyThreshold <= numSentBytesQueued && onReadyThreshold > numSentBytesQueued) {
shouldNotifyOnReady = true;
}
this.onReadyThreshold = onReadyThreshold;
doNotify = needToNotifyOnReady();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public void allocated() {
onStreamAllocated();
}

void onStreamSentBytes(int numBytes) {
onSentBytes(numBytes);
}

public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
synchronized (lock) {
if (endOfStream) {
Expand Down Expand Up @@ -173,6 +177,10 @@ protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flus
buffer = EMPTY_BUFFER;
} else {
buffer = ((OkHttpWritableBuffer) frame).buffer();
int size = (int) buffer.size();
if (size > 0) {
onSendingBytes(size);
}
}
// If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
// properly chunked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ synchronized void initialOutboundWindowSize(int newWindowSize) {
OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState();
if (state == null) {
// Create the OutboundFlowState with the new window size.
state = new OutboundFlowState(stream.id());
state = new OutboundFlowState(stream);
stream.setOutboundFlowState(state);
} else {
state.incrementStreamWindow(delta);
Expand Down Expand Up @@ -167,7 +167,7 @@ private void flush() {
private OutboundFlowState state(OkHttpClientStream stream) {
OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState();
if (state == null) {
state = new OutboundFlowState(stream.id());
state = new OutboundFlowState(stream);
stream.setOutboundFlowState(state);
}
return state;
Expand Down Expand Up @@ -245,12 +245,18 @@ private final class OutboundFlowState {
int queuedBytes;
int window = initialWindowSize;
int allocatedBytes;
OkHttpClientStream stream;

OutboundFlowState(int streamId) {
this.streamId = streamId;
pendingWriteQueue = new ArrayDeque<Frame>(2);
}

OutboundFlowState(OkHttpClientStream stream) {
this(stream.id());
this.stream = stream;
}

int window() {
return window;
}
Expand Down Expand Up @@ -397,6 +403,8 @@ void write() {
} catch (IOException e) {
throw new RuntimeException(e);
}
stream.onStreamSentBytes(bytesToWrite);

if (enqueued) {
// It's enqueued - remove it from the head of the pending write queue.
queuedBytes -= bytesToWrite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,72 @@ public void receiveWindowUpdateForUnknownStream() throws Exception {
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}

@Test
public void shouldBeInitiallyReady() throws Exception {
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream(
method,new Metadata.Headers(), listener);
assertTrue(stream.isReady());
assertTrue(listener.isOnReadyCalled());
stream.cancel();
assertFalse(stream.isReady());
}

@Test
public void notifyOnReady() throws Exception {
final int messageLength = 15;
setInitialWindowSize(0);
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream(
method,new Metadata.Headers(), listener);
stream.setOnReadyThreshold(HEADER_LENGTH + 20);
assertTrue(stream.isReady());
// Be notified at the beginning.
assertTrue(listener.isOnReadyCalled());

// Write a message that will not exceed the notification threshold and queue it.
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input, input.available());
stream.flush();
assertTrue(stream.isReady());

// Write another two messages, still be queued.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input, input.available());
stream.flush();
assertFalse(stream.isReady());
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input, input.available());
stream.flush();
assertFalse(stream.isReady());

// Let the first message out.
frameHandler.windowUpdate(0, HEADER_LENGTH + messageLength);
frameHandler.windowUpdate(3, HEADER_LENGTH + messageLength);
assertFalse(stream.isReady());
assertFalse(listener.isOnReadyCalled());

// Let the second message out.
frameHandler.windowUpdate(0, HEADER_LENGTH + messageLength);
frameHandler.windowUpdate(3, HEADER_LENGTH + messageLength);
assertTrue(stream.isReady());
assertTrue(listener.isOnReadyCalled());

// Now the first message is still in the queue, and it's size is smaller than the threshold.
// Increase the threshold should have no affection.
stream.setOnReadyThreshold(messageLength * 10);
assertFalse(listener.isOnReadyCalled());
// Decrease the threshold should have no affection too.
stream.setOnReadyThreshold(HEADER_LENGTH);
assertFalse(listener.isOnReadyCalled());
// But now increase the threshold to larger than the queued message size, onReady should be
// triggered.
stream.setOnReadyThreshold(HEADER_LENGTH + messageLength + 1);
assertTrue(listener.isOnReadyCalled());

stream.cancel();
}

private void waitForStreamPending(int expected) throws Exception {
int duration = TIME_OUT_MS / 10;
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -922,6 +988,7 @@ private static class MockStreamListener implements ClientStreamListener {
Metadata.Trailers trailers;
CountDownLatch closed = new CountDownLatch(1);
ArrayList<String> messages = new ArrayList<String>();
boolean onReadyCalled;

MockStreamListener() {
}
Expand All @@ -948,6 +1015,13 @@ public void closed(Status status, Metadata.Trailers trailers) {

@Override
public void onReady() {
onReadyCalled = true;
}

boolean isOnReadyCalled() {
boolean value = onReadyCalled;
onReadyCalled = false;
return value;
}

void waitUntilStreamClosed() throws InterruptedException {
Expand Down

0 comments on commit 0d48087

Please sign in to comment.