Skip to content

Commit

Permalink
fix reactor#138: Emit error signal when the connection is closed prem…
Browse files Browse the repository at this point in the history
…aturely

- Throw IOException when channel inactive is received
- Ensure an empty context will not be emitted
  • Loading branch information
violetagg committed Sep 7, 2017
1 parent 545a1bf commit c8db8a4
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.ipc.netty.http.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -132,11 +133,6 @@ static HttpOperations bindHttp(Channel channel,
chunkedTransfer(true);
}

@Override
protected boolean shouldEmitEmptyContext() {
return true;
}

@Override
public HttpClientRequest addCookie(Cookie cookie) {
if (!hasSentHeaders()) {
Expand Down Expand Up @@ -260,6 +256,15 @@ protected void onInboundCancel() {
channel().close();
}

@Override
protected void onInboundComplete() {
if (responseState == null) {
parentContext().fireContextError(new IOException("Connection closed prematurely"));
return;
}
super.onInboundComplete();
}

@Override
public HttpClientRequest header(CharSequence name, CharSequence value) {
if (!hasSentHeaders()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

Expand All @@ -27,9 +36,11 @@
import reactor.core.publisher.Mono;
import reactor.ipc.netty.FutureMono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.SocketUtils;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientResponse;
import reactor.ipc.netty.http.server.HttpServer;
import reactor.ipc.netty.resources.PoolResources;
import reactor.test.StepVerifier;

public class ChannelOperationsHandlerTest {
Expand Down Expand Up @@ -81,4 +92,89 @@ private void doTestPrefetchSize(int writeBufferLowWaterMark, int writeBufferHigh

assertThat(handler.prefetch == (handler.inner.requested - handler.inner.produced)).isTrue();
}

@Test
public void testChannelInactiveThrowsAbortedException() throws Exception {
ExecutorService threadPool = Executors.newCachedThreadPool();

int abortServerPort = SocketUtils.findAvailableTcpPort();
ConnectionAbortServer abortServer = new ConnectionAbortServer(abortServerPort);

threadPool.submit(abortServer);

if(!abortServer.await(10, TimeUnit.SECONDS)){
throw new IOException("Fail to start test server");
}

Mono<HttpClientResponse> response =
HttpClient.create(ops -> ops.host("localhost")
.port(abortServerPort)
.poolResources(PoolResources.fixed("http", 1)))
.get("/", req -> req.sendString(Flux.just("a", "b", "c")));

StepVerifier.create(response)
.expectError()
.verify(Duration.ofSeconds(1));

abortServer.close();
}

private static final class ConnectionAbortServer extends CountDownLatch implements Runnable {

private final int port;
private final ServerSocketChannel server;
private volatile boolean read = false;
private volatile Thread thread;

private ConnectionAbortServer(int port) {
super(1);
this.port = port;
try {
server = ServerSocketChannel.open();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void run() {
try {
server.configureBlocking(true);
server.socket()
.bind(new InetSocketAddress(port));
countDown();
thread = Thread.currentThread();
while (true) {
SocketChannel ch = server.accept();

while (true) {
int bytes = ch.read(ByteBuffer.allocate(256));
if (bytes > 0) {
if (!read) {
read = true;
}
else {
ch.close();
return;
}
}
}
}
}
catch (IOException e) {
}
}

public void close() throws IOException {
Thread thread = this.thread;
if (thread != null) {
thread.interrupt();
}
ServerSocketChannel server = this.server;
if (server != null) {
server.close();
}
}
}
}

0 comments on commit c8db8a4

Please sign in to comment.