Skip to content

Commit

Permalink
2.x: concat to report isDisposed consistently with termination (React…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 27, 2017
1 parent 4c19753 commit 39e5d91
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ void drain() {
boolean empty = t == null;

if (d && empty) {
disposed = true;
actual.onComplete();
return;
}
Expand Down Expand Up @@ -367,7 +368,7 @@ public void onComplete() {

@Override
public boolean isDisposed() {
return d.isDisposed();
return cancelled;
}

@Override
Expand Down Expand Up @@ -400,7 +401,7 @@ void drain() {
Throwable ex = error.get();
if (ex != null) {
queue.clear();

cancelled = true;
actual.onError(error.terminate());
return;
}
Expand All @@ -414,6 +415,7 @@ void drain() {
v = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
this.d.dispose();
error.addThrowable(ex);
actual.onError(error.terminate());
Expand All @@ -423,6 +425,7 @@ void drain() {
boolean empty = v == null;

if (d && empty) {
cancelled = true;
Throwable ex = error.terminate();
if (ex != null) {
actual.onError(ex);
Expand All @@ -440,6 +443,7 @@ void drain() {
o = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
this.d.dispose();
queue.clear();
error.addThrowable(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@

package io.reactivex.internal.operators.observable;

import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.concurrent.Callable;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.disposables.Disposables;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -367,4 +370,66 @@ protected void subscribeActual(Observer<? super Integer> observer) {
RxJavaPlugins.reset();
}
}

@SuppressWarnings("unchecked")
@Test
public void concatReportsDisposedOnComplete() {
final Disposable[] disposable = { null };

Observable.fromArray(Observable.just(1), Observable.just(2))
.hide()
.concatMap(Functions.<Observable<Integer>>identity())
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}

@Override
public void onNext(Integer t) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

assertTrue(disposable[0].isDisposed());
}

@Test
@SuppressWarnings("unchecked")
public void concatReportsDisposedOnError() {
final Disposable[] disposable = { null };

Observable.fromArray(Observable.just(1), Observable.<Integer>error(new TestException()))
.hide()
.concatMap(Functions.<Observable<Integer>>identity())
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}

@Override
public void onNext(Integer t) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

assertTrue(disposable[0].isDisposed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,118 @@ public void subscribe(ObservableEmitter<Integer> s) throws Exception {

assertEquals(1, calls[0]);
}

@Test
public void concatReportsDisposedOnComplete() {
final Disposable[] disposable = { null };

Observable.concat(Observable.just(1), Observable.just(2))
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}

@Override
public void onNext(Integer t) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

assertTrue(disposable[0].isDisposed());
}

@Test
@SuppressWarnings("unchecked")
public void concatReportsDisposedOnCompleteDelayError() {
final Disposable[] disposable = { null };

Observable.concatArrayDelayError(Observable.just(1), Observable.just(2))
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}

@Override
public void onNext(Integer t) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

assertTrue(disposable[0].isDisposed());
}

@Test
public void concatReportsDisposedOnError() {
final Disposable[] disposable = { null };

Observable.concat(Observable.just(1), Observable.<Integer>error(new TestException()))
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}

@Override
public void onNext(Integer t) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

assertTrue(disposable[0].isDisposed());
}

@Test
@SuppressWarnings("unchecked")
public void concatReportsDisposedOnErrorDelayError() {
final Disposable[] disposable = { null };

Observable.concatArrayDelayError(Observable.just(1), Observable.<Integer>error(new TestException()))
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}

@Override
public void onNext(Integer t) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

assertTrue(disposable[0].isDisposed());
}
}

0 comments on commit 39e5d91

Please sign in to comment.