Skip to content

Commit

Permalink
adapt to latest core changes... (reactor#160)
Browse files Browse the repository at this point in the history
  - Operators hooks now take a Context
  - Mono.doAfterTerminate(BiConsumer) is now doAfterSuccessOrError
  - Mono.when methods that produce a Tuple have been renamed zip

Fixed a test that was failing on CI by adding a bit more leeway via sleep.

* attempt at unblocking the startRouterAndAwait test on travis
  • Loading branch information
simonbasle authored Sep 5, 2017
1 parent e97084d commit c348fbe
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/main/java/reactor/ipc/netty/FutureMono.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void subscribe(CoreSubscriber<? super Void> s) {

if (f == null) {
Operators.error(s,
Operators.onOperatorError(new NullPointerException(
"Deferred supplied null")));
Operators.onOperatorError(new NullPointerException("Deferred supplied null"),
s.currentContext()));
return;
}

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/reactor/ipc/netty/channel/FluxReceive.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/**
* @author Stephane Maldini
Expand Down Expand Up @@ -340,7 +341,8 @@ final boolean onInboundComplete() {

final boolean onInboundError(Throwable err) {
if (isCancelled() || inboundDone) {
Operators.onErrorDropped(err);
Context c = receiver == null ? Context.empty() : receiver.currentContext();
Operators.onErrorDropped(err, c);
return false;
}
CoreSubscriber<?> receiver = this.receiver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ final Mono<Void> withWebsocketSupport(URI url,
ops,
ops))));
if (websocketHandler != noopHandler()) {
handshake = handshake.doAfterTerminate(ops);
handshake = handshake.doAfterSuccessOrError(ops);
}
return handshake;
}
Expand All @@ -695,7 +695,7 @@ else if (isWebsocket()) {
if (websocketHandler != noopHandler()) {
handshake =
handshake.then(Mono.defer(() -> Mono.from(websocketHandler.apply(ops, ops)))
.doAfterTerminate(ops));
.doAfterSuccessOrError(ops));
}
return handshake;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(MultipartTokenizer.Token token) {
if (done) {
Operators.onNextDropped(token);
Operators.onNextDropped(token, actual.currentContext());
return;
}

Expand Down Expand Up @@ -116,7 +116,7 @@ public void onNext(MultipartTokenizer.Token token) {
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t);
Operators.onErrorDropped(t, actual.currentContext());
return;
}
UnicastProcessor<ByteBuf> w = window;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void onComplete() {
@Override
public void onError(Throwable throwable) {
if (this.done) {
Operators.onErrorDropped(throwable);
Operators.onErrorDropped(throwable, actual.currentContext());
return;
}

Expand All @@ -112,7 +112,7 @@ public void onError(Throwable throwable) {
@Override
public void onNext(ByteBuf byteBuf) {
if (this.done) {
Operators.onNextDropped(byteBuf);
Operators.onNextDropped(byteBuf, actual.currentContext());
return;
}

Expand Down Expand Up @@ -168,15 +168,17 @@ public void request(long n) {
if (Integer.MAX_VALUE > n) { // TODO: Support smaller request sizes
actual.onError(Operators.onOperatorError(this, new
IllegalArgumentException(
"This operation only supports unbounded requests, was " + n), n));
"This operation only supports unbounded requests, was " + n), n,
actual.currentContext()));
return;
}
Subscription s = this.subscription;
if (s != null) {
s.request(n);
}
} catch (Throwable throwable) {
actual.onError(Operators.onOperatorError(this, throwable, n));
actual.onError(Operators.onOperatorError(this, throwable, n,
actual.currentContext()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ final Mono<Void> withWebsocketSupport(String url,
if (replace(ops)) {
return FutureMono.from(ops.handshakerResult)
.then(Mono.defer(() -> Mono.from(websocketHandler.apply(ops, ops))))
.doAfterTerminate(ops);
.doAfterSuccessOrError(ops);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Mono<Void> disposeLater() {
cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.terminationFuture());
}

return Mono.when(clMono, sslMono, slMono, cnclMono, cnslMono, cnsrvlMono).then();
return Mono.zip(clMono, sslMono, slMono, cnclMono, cnslMono, cnsrvlMono).then();
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/reactor/ipc/netty/tcp/TcpResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected void _dispose(){
* @return the Mono that represents the end of disposal
*/
protected Mono<Void> _disposeLater() {
return Mono.when(
return Mono.zip(
defaultLoops.disposeLater(),
defaultPools.disposeLater())
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void failOnClientServerError(boolean clientError, boolean serverError,
}

private Mono<HttpClientRequest> doLoginFirst(Mono<HttpClientRequest> request, int port) {
return Mono.when(request, login(port))
return Mono.zip(request, login(port))
.map(tuple -> {
HttpClientRequest req = tuple.getT1();
req.addHeader("Authorization", tuple.getT2());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ public void startRouterAndAwait()

//shutdown the router to unblock the thread
ref.get().shutdown();
Thread.sleep(100);
assertThat(f.isDone()).isTrue();
}
}

0 comments on commit c348fbe

Please sign in to comment.