Skip to content

Commit

Permalink
Improve Client Channel Pooling (reactor#48)
Browse files Browse the repository at this point in the history
* fix reactor#45 Premature close on pool connection and rework context bridging
* Fix incorrect thread colocation for pooling (now uses all group threads)
* Rework connection pool state handling

* Mitigate possible double acquisition until pool implementation update
  • Loading branch information
smaldini authored Feb 19, 2017
1 parent 7d95fbb commit ab3f93d
Show file tree
Hide file tree
Showing 25 changed files with 803 additions and 312 deletions.
4 changes: 1 addition & 3 deletions src/main/java/reactor/ipc/netty/NettyPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,6 @@
* -> ssl & trace log ? [SslLoggingHandler]
* -> ssl ? [SslReader]
* -> log ? [LoggingHandler]
* => [BridgeSetup]
* -> http ? [HttpCodecHandler]
* -> http ws ? [HttpAggregator]
* -> http server ? [HttpServerHandler]
Expand All @@ -55,7 +54,6 @@ public interface NettyPipeline {
String SslLoggingHandler = LEFT + "sslLoggingHandler";
String ProxyHandler = LEFT + "proxyHandler";
String ReactiveBridge = RIGHT + "reactiveBridge";
String BridgeSetup = LEFT + "bridgeSetup";
String HttpEncoder = LEFT + "httpEncoder";
String HttpDecoder = LEFT + "httpDecoder";
String HttpAggregator = LEFT + "httpAggregator";
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/reactor/ipc/netty/channel/AbortedException.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,28 @@
*/
public class AbortedException extends RuntimeException {

final boolean nostack;

/**
* Simple connection abort exception
* Simple connection abort exception without stack
*/
public AbortedException() {
super("Connection reset by peer");
nostack = true;
}

public AbortedException(String message) {
super(message);
nostack = false;
}

@Override
public synchronized Throwable fillInStackTrace() {
return nostack ? this : super.fillInStackTrace();
}



/**
* Return true if connection has been simply aborted on a tcp level by verifying if
* the given inbound error.
Expand Down
31 changes: 22 additions & 9 deletions src/main/java/reactor/ipc/netty/channel/ChannelOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -113,8 +112,18 @@ public static <INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> Cha
.get();
}

static void set(Channel ch, ChannelOperations<?, ?> ops) {
ch.attr(ChannelOperations.OPERATIONS_KEY).set(ops);
static ChannelOperations<?, ?> tryGetAndSet(Channel ch, ChannelOperations<?, ?> ops) {
Attribute<ChannelOperations> attr = ch.attr(ChannelOperations.OPERATIONS_KEY);
for (; ; ) {
ChannelOperations<?, ?> op = attr.get();
if (op != null) {
return op;
}

if (attr.compareAndSet(null, ops)) {
return null;
}
}
}

final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>>
Expand Down Expand Up @@ -252,7 +261,9 @@ public final void onError(Throwable t) {
Subscription s =
OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription() || isDisposed()) {
Operators.onErrorDropped(t);
if(log.isDebugEnabled()){
log.error("An outbound error could not be processed", t);
}
return;
}
onOutboundError(t);
Expand Down Expand Up @@ -300,7 +311,7 @@ protected final boolean isInboundDone() {
* @return true if inbound traffic is not expected anymore
*/
protected final boolean isInboundCancelled() {
return inbound.isCancelled();
return inbound.isCancelled() || !channel.isActive();
}

/**
Expand Down Expand Up @@ -405,7 +416,7 @@ protected void onInboundComplete() {
*/
protected void onOutboundComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] User Handler requesting close connection", formatName());
log.debug("[{}] {} User Handler requesting close connection", formatName(), channel());
}
markOutboundCloseable();
onHandlerTerminate();
Expand Down Expand Up @@ -448,7 +459,8 @@ protected final void applyHandler() {
// channel.pipeline()
// .fireUserEventTriggered(NettyPipeline.handlerStartedEvent());
if (log.isDebugEnabled()) {
log.debug("[{}] handler is being applied: {}", formatName(), handler);
log.debug("[{}] {} handler is being applied: {}", formatName(), channel
(), handler);
}
handler.apply((INBOUND) this, (OUTBOUND) this)
.subscribe(this);
Expand Down Expand Up @@ -482,7 +494,8 @@ protected final boolean discreteRemoteClose(Throwable err) {
protected final void onHandlerTerminate() {
if (replace(null)) {
if(log.isTraceEnabled()){
log.trace("Disposing ChannelOperation from a channel", new Exception("ChannelOperation terminal stack"));
log.trace("{} Disposing ChannelOperation from a channel", channel(), new Exception
("ChannelOperation terminal stack"));
}
try {
Operators.terminate(OUTBOUND_CLOSE, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler
final PublisherSender inner;
final BiConsumer<?, ? super ByteBuf> encoder;
final int prefetch;
final ContextHandler<?> originContext;

/**
* Cast the supplied queue (SpscLinkedArrayQueue) to use its atomic dual-insert
Expand All @@ -69,9 +70,9 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler
Queue<?> pendingWrites;
ChannelHandlerContext ctx;
boolean flushOnEach;
long pendingBytes;

ContextHandler<?> parentContext;
long pendingBytes;
ContextHandler<?> lastContext;

volatile boolean innerActive;
volatile boolean removed;
Expand All @@ -82,8 +83,14 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler
this.inner = new PublisherSender(this);
this.prefetch = 32;
this.encoder = NOOP_ENCODER;
this.parentContext = contextHandler; // only set if parent context is closable,
// pool will usually fetch context via parentContext()
this.lastContext = null;
this.originContext = contextHandler; // only set if parent context is closable,
// pool will usually fetch context via lastContext()
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
originContext.createOperations(ctx.channel(), null);
}

@Override
Expand All @@ -94,8 +101,10 @@ final public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ops.onHandlerTerminate();
}
else {
parentContext().terminateChannel(ctx.channel());
parentContext().fireContextError(new AbortedException());
if (lastContext != null) {
lastContext.terminateChannel(ctx.channel());
lastContext.fireContextError(new AbortedException());
}
}
}
catch (Throwable err) {
Expand Down Expand Up @@ -134,22 +143,11 @@ final public void channelRead(ChannelHandlerContext ctx, Object msg)
}
}

ContextHandler<?> parentContext() {
if (parentContext != null) {
return parentContext;
}
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());

if (ops != null) {
return ops.parentContext();
}
throw new IllegalStateException("Tried to call parent context when none is " + "attached");
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Write state change {}",
log.debug("{} Write state change {}",
ctx.channel(),
ctx.channel()
.isWritable());
}
Expand All @@ -169,8 +167,10 @@ final public void exceptionCaught(ChannelHandlerContext ctx, Throwable err)
ops.onInboundError(err);
}
else {
parentContext().terminateChannel(ctx.channel());
parentContext().fireContextError(err);
if (lastContext != null) {
lastContext.terminateChannel(ctx.channel());
lastContext.fireContextError(err);
}
}
}

Expand All @@ -182,11 +182,7 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
if (ctx.channel()
.isActive()) {
parentContext().createOperations(ctx.channel(), null);
inner.request(prefetch);
}
inner.request(prefetch);
}

@Override
Expand All @@ -203,15 +199,26 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
final public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (log.isTraceEnabled()) {
log.trace("User event {}", evt);
log.trace("{} End of the pipeline, User event {}", ctx.channel(), evt);
}
if (evt == NettyPipeline.handlerTerminatedEvent()) {
parentContext().terminateChannel(ctx.channel());
ContextHandler<?> c = lastContext;
if (c == null){
if (log.isDebugEnabled()){
log.debug("{} No context to dispose", ctx.channel());
}
return;
}
if (log.isDebugEnabled()){
log.debug("{} Disposing context {}", ctx.channel(), c);
}
lastContext = null;
c.terminateChannel(ctx.channel());
return;
}
if (evt instanceof NettyPipeline.SendOptionsChangeEvent) {
if (log.isDebugEnabled()) {
log.debug("New sending options");
log.debug("{} New sending options", ctx.channel());
}
((NettyPipeline.SendOptionsChangeEvent) evt).configurator()
.accept(this);
Expand All @@ -226,7 +233,7 @@ final public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (log.isDebugEnabled()) {
log.debug("Writing object {}", msg);
log.debug("{} Writing object {}", ctx.channel(), msg);
}

if (pendingWrites == null) {
Expand Down Expand Up @@ -278,7 +285,7 @@ else if (msg instanceof FileRegion) {
pendingBytes = Operators.addCap(pendingBytes, ((FileRegion) msg).count());
}
if (log.isTraceEnabled()) {
log.trace("Pending write size = {}", pendingBytes);
log.trace("{} Pending write size = {}", ctx.channel(), pendingBytes);
}
if (inner != null && inner.justFlushed) {
inner.justFlushed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected Tuple2<String, Integer> getSNI() {
}

@Override
public void accept(Channel ch) {
protected void doPipeline(Channel ch) {
addSslAndLogHandlers(clientOptions, this, loggingHandler, secure, getSNI(), ch.pipeline());
addProxyHandler(clientOptions, ch.pipeline());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ public final void setFuture(Future<?> future) {
log.debug("Connecting new channel: {}", future.toString());
}
this.f = (ChannelFuture) future;
f.addListener(this);
sink.setCancellation(this);

if(future.isDone()){
try {
operationComplete((ChannelFuture) future);
}
catch (Exception e){
fireContextError(e);
}
return;
}
f.addListener(this);
}

@Override
Expand Down
Loading

0 comments on commit ab3f93d

Please sign in to comment.