Skip to content

Commit

Permalink
Fix reactor#134: Handle pending writes when outbound completion is re…
Browse files Browse the repository at this point in the history
…ached

In some use cases it may appear that the last flush operation that
is performed by PublisherSender#onComplete/onError will not able
to write the data completely. In these cases the underlaying Netty
implementation will schedule an additional task which will try to
flush the remaining data. This additional flush operation will call
ChannelPipeline#flush. Extend ChannelOperationsHandler#drain so that
it is able to handle these additional callbacks.
  • Loading branch information
violetagg committed Aug 2, 2017
1 parent 0682f17 commit 37e75af
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 46 deletions.
158 changes: 112 additions & 46 deletions src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel.Unsafe;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.util.ReferenceCountUtil;
Expand All @@ -55,7 +57,7 @@
* @author Stephane Maldini
*/
final class ChannelOperationsHandler extends ChannelDuplexHandler
implements NettyPipeline.SendOptions {
implements NettyPipeline.SendOptions, ChannelFutureListener {

final PublisherSender inner;
final BiConsumer<?, ? super ByteBuf> encoder;
Expand All @@ -66,13 +68,15 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler
* Cast the supplied queue (SpscLinkedArrayQueue) to use its atomic dual-insert
* backed by {@link BiPredicate#test}
**/
BiPredicate<ChannelPromise, Object> pendingWriteOffer;
BiPredicate<ChannelFuture, Object> pendingWriteOffer;
Queue<?> pendingWrites;
ChannelHandlerContext ctx;
boolean flushOnEach;

long pendingBytes;
ContextHandler<?> lastContext;
ContextHandler<?> lastContext;

private Unsafe unsafe;

volatile boolean innerActive;
volatile boolean removed;
Expand Down Expand Up @@ -151,10 +155,6 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
ctx.channel()
.isWritable());
}
if (ctx.channel()
.isWritable()) {
inner.request(prefetch);
}
drain();
}

Expand Down Expand Up @@ -182,6 +182,7 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
this.unsafe = ctx.channel().unsafe();
inner.request(prefetch);
}

Expand Down Expand Up @@ -239,7 +240,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (pendingWrites == null) {
this.pendingWrites = Queues.unbounded()
.get();
this.pendingWriteOffer = (BiPredicate<ChannelPromise, Object>) pendingWrites;
this.pendingWriteOffer = (BiPredicate<ChannelFuture, Object>) pendingWrites;
}

if (!pendingWriteOffer.test(promise, msg)) {
Expand All @@ -259,17 +260,27 @@ public NettyPipeline.SendOptions flushOnEach() {
return this;
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
inner.request(1L);
}
}

ChannelFuture doWrite(Object msg, ChannelPromise promise, PublisherSender inner) {
if (flushOnEach || //fastpath
inner == null && pendingWrites.isEmpty() || //last drained element
!ctx.channel()
.isWritable() //force flush if write buffer full
) {
pendingBytes = 0L;
if (inner != null) {

ChannelFuture future = ctx.writeAndFlush(msg, promise);

if (inner != null && !hasPendingWriteBytes()) {
inner.justFlushed = true;
}
return ctx.writeAndFlush(msg, promise);
return future;
}
else {
if (msg instanceof ByteBuf) {
Expand All @@ -290,7 +301,15 @@ else if (msg instanceof FileRegion) {
if (inner != null && inner.justFlushed) {
inner.justFlushed = false;
}
return ctx.write(msg, promise);
ChannelFuture future = ctx.write(msg, promise);
if (!ctx.channel().isWritable()) {
pendingBytes = 0L;
ctx.flush();
if (inner != null && !hasPendingWriteBytes()) {
inner.justFlushed = true;
}
}
return future;
}
}

Expand Down Expand Up @@ -332,24 +351,27 @@ void drain() {

if (pendingWrites == null || innerActive || !ctx.channel()
.isWritable()) {
if (!ctx.channel().isWritable() && hasPendingWriteBytes()) {
ctx.flush();
}
if (WIP.decrementAndGet(this) == 0) {
break;
}
continue;
}

ChannelPromise promise;
ChannelFuture future;
Object v = pendingWrites.poll();

try {
promise = (ChannelPromise) v;
future = (ChannelFuture) v;
}
catch (Throwable e) {
ctx.fireExceptionCaught(e);
return;
}

boolean empty = promise == null;
boolean empty = future == null;

if (empty) {
if (WIP.decrementAndGet(this) == 0) {
Expand All @@ -360,50 +382,72 @@ void drain() {

v = pendingWrites.poll();

if (v instanceof Publisher) {
Publisher<?> p = (Publisher<?>) v;

if (p instanceof Callable) {
@SuppressWarnings("unchecked") Callable<?> supplier =
(Callable<?>) p;

Object vr;

try {
vr = supplier.call();
}
catch (Throwable e) {
promise.setFailure(e);
continue;
}

if (vr == null) {
promise.setSuccess();
continue;
if (!innerActive && v == PublisherSender.PENDING_WRITES) {
boolean last = pendingWrites.isEmpty();
if (!future.isDone() && hasPendingWriteBytes()) {
ctx.flush();
if (!future.isDone() && hasPendingWriteBytes()) {
pendingWriteOffer.test(future, v);
}

if (inner.unbounded) {
doWrite(vr, promise, null);
}
if (last && WIP.decrementAndGet(this) == 0) {
break;
}
}
else if (future instanceof ChannelPromise) {
ChannelPromise promise = (ChannelPromise) future;
if (v instanceof Publisher) {
Publisher<?> p = (Publisher<?>) v;

if (p instanceof Callable) {
@SuppressWarnings("unchecked") Callable<?> supplier =
(Callable<?>) p;

Object vr;

try {
vr = supplier.call();
}
catch (Throwable e) {
promise.setFailure(e);
continue;
}

if (vr == null) {
promise.setSuccess();
continue;
}

if (inner.unbounded) {
doWrite(vr, promise, null);
}
else {
innerActive = true;
inner.promise = promise;
inner.onSubscribe(Operators.scalarSubscription(inner, vr));
}
}
else {
innerActive = true;
inner.promise = promise;
inner.onSubscribe(Operators.scalarSubscription(inner, vr));
p.subscribe(inner);
}
}
else {
innerActive = true;
inner.promise = promise;
p.subscribe(inner);
doWrite(v, promise, null);
}
}
else {
doWrite(v, promise, null);
}
}
}
}

private boolean hasPendingWriteBytes() {
// On close the outboundBuffer is made null. After that point
// adding messages and flushes to outboundBuffer is not allowed.
ChannelOutboundBuffer outBuffer = this.unsafe.outboundBuffer();
return outBuffer != null ? outBuffer.totalPendingWriteBytes() > 0 : false;
}

static final class PublisherSender
implements CoreSubscriber<Object>, Subscription, ChannelFutureListener {

Expand Down Expand Up @@ -456,8 +500,10 @@ public void onComplete() {
if (!justFlushed) {
if (parent.ctx.channel()
.isActive()) {
justFlushed = true;
parent.ctx.flush();
if (!parent.hasPendingWriteBytes()) {
justFlushed = true;
}
}
else {
promise.setFailure(new AbortedException("Connection has been closed"));
Expand All @@ -467,6 +513,9 @@ public void onComplete() {
}

if (f != null) {
if (!f.isDone() && parent.hasPendingWriteBytes()) {
parent.pendingWriteOffer.test(f, PENDING_WRITES);
}
f.addListener(this);
}
else {
Expand All @@ -486,8 +535,10 @@ public void onError(Throwable t) {
produced(p);
if (parent.ctx.channel()
.isActive()) {
justFlushed = true;
parent.ctx.flush();
if (!parent.hasPendingWriteBytes()) {
justFlushed = true;
}
}
else {
promise.setFailure(new AbortedException("Connection has been closed"));
Expand All @@ -496,6 +547,9 @@ public void onError(Throwable t) {
}

if (f != null) {
if (!f.isDone() && parent.hasPendingWriteBytes()) {
parent.pendingWriteOffer.test(f, PENDING_WRITES);
}
f.addListener(this);
}
else {
Expand All @@ -513,6 +567,9 @@ public void onNext(Object t) {
.isWritable()) {
request(1L);
}
else {
lastWrite.addListener(parent);
}
}

@Override
Expand Down Expand Up @@ -723,6 +780,8 @@ final void produced(long n) {
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<PublisherSender> WIP =
AtomicIntegerFieldUpdater.newUpdater(PublisherSender.class, "wip");

private static final PendingWritesOnCompletion PENDING_WRITES = new PendingWritesOnCompletion();
}

@SuppressWarnings("rawtypes")
Expand All @@ -733,4 +792,11 @@ final void produced(long n) {

static final BiConsumer<?, ? super ByteBuf> NOOP_ENCODER = (a, b) -> {
};

private static final class PendingWritesOnCompletion {
@Override
public String toString() {
return "[Pending Writes on Completion]";
}
}
}
Loading

0 comments on commit 37e75af

Please sign in to comment.