Skip to content

Commit

Permalink
Merge pull request ReactiveX#3556 from akarnokd/ToMapErrorPropagation1x
Browse files Browse the repository at this point in the history
1.x: fix toMap and toMultimap not handling exceptions of the callbacks
  • Loading branch information
Aaron Tull committed Dec 8, 2015
2 parents 8316519 + 4949ee3 commit 9d4f9c3
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 19 deletions.
33 changes: 30 additions & 3 deletions src/main/java/rx/internal/operators/OperatorToMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.Subscribers;

/**
* Maps the elements of the source observable into a java.util.Map instance and
Expand Down Expand Up @@ -75,9 +77,24 @@ public OperatorToMap(

@Override
public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber) {

Map<K, V> localMap;

try {
localMap = mapFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
subscriber.onError(ex);
Subscriber<? super T> parent = Subscribers.empty();
parent.unsubscribe();
return parent;
}

final Map<K, V> fLocalMap = localMap;

return new Subscriber<T>(subscriber) {

private Map<K, V> map = mapFactory.call();
private Map<K, V> map = fLocalMap;

@Override
public void onStart() {
Expand All @@ -86,8 +103,18 @@ public void onStart() {

@Override
public void onNext(T v) {
K key = keySelector.call(v);
V value = valueSelector.call(v);
K key;
V value;

try {
key = keySelector.call(v);
value = valueSelector.call(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
subscriber.onError(ex);
return;
}

map.put(key, value);
}

Expand Down
42 changes: 38 additions & 4 deletions src/main/java/rx/internal/operators/OperatorToMultimap.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Map;

import rx.Observable.Operator;
import rx.exceptions.Exceptions;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.Subscribers;

/**
* Maps the elements of the source observable into a multimap
Expand Down Expand Up @@ -103,8 +105,24 @@ public OperatorToMultimap(

@Override
public Subscriber<? super T> call(final Subscriber<? super Map<K, Collection<V>>> subscriber) {

Map<K, Collection<V>> localMap;

try {
localMap = mapFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
subscriber.onError(ex);

Subscriber<? super T> parent = Subscribers.empty();
parent.unsubscribe();
return parent;
}

final Map<K, Collection<V>> fLocalMap = localMap;

return new Subscriber<T>(subscriber) {
private Map<K, Collection<V>> map = mapFactory.call();
private Map<K, Collection<V>> map = fLocalMap;

@Override
public void onStart() {
Expand All @@ -113,11 +131,27 @@ public void onStart() {

@Override
public void onNext(T v) {
K key = keySelector.call(v);
V value = valueSelector.call(v);
K key;
V value;

try {
key = keySelector.call(v);
value = valueSelector.call(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
subscriber.onError(ex);
return;
}

Collection<V> collection = map.get(key);
if (collection == null) {
collection = collectionFactory.call(key);
try {
collection = collectionFactory.call(key);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
subscriber.onError(ex);
return;
}
map.put(key, collection);
}
collection.add(value);
Expand Down
81 changes: 69 additions & 12 deletions src/test/java/rx/internal/operators/OperatorToMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,19 @@
package rx.internal.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.*;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.junit.*;
import org.mockito.*;

import rx.Observable;
import rx.Observer;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.exceptions.TestException;
import rx.functions.*;
import rx.internal.util.UtilityFunctions;
import rx.observers.TestSubscriber;

public class OperatorToMapTest {
@Mock
Expand Down Expand Up @@ -224,4 +219,66 @@ public Integer call(String t1) {
verify(objectObserver, times(1)).onError(any(Throwable.class));
}

@Test
public void testKeySelectorThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}

@Test
public void testValueSelectorThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}

@Test
public void testMapFactoryThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func0<Map<Integer, Integer>>() {
@Override
public Map<Integer, Integer> call() {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}
}
96 changes: 96 additions & 0 deletions src/test/java/rx/internal/operators/OperatorToMultimapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@

import rx.Observable;
import rx.Observer;
import rx.exceptions.TestException;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.internal.operators.OperatorToMultimap.DefaultMultimapCollectionFactory;
import rx.internal.operators.OperatorToMultimap.DefaultToMultimapFactory;
import rx.internal.util.UtilityFunctions;
import rx.observers.TestSubscriber;

public class OperatorToMultimapTest {
@Mock
Expand Down Expand Up @@ -269,4 +271,98 @@ public Collection<String> call(Integer t1) {
verify(objectObserver, never()).onNext(expected);
verify(objectObserver, never()).onCompleted();
}

@Test
public void testKeySelectorThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}

@Test
public void testValueSelectorThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}

@Test
public void testMapFactoryThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func0<Map<Integer, Collection<Integer>>>() {
@Override
public Map<Integer, Collection<Integer>> call() {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}

@Test
public void testCollectionFactoryThrows() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v;
}
}, new Func0<Map<Integer, Collection<Integer>>>() {
@Override
public Map<Integer, Collection<Integer>> call() {
return new HashMap<Integer, Collection<Integer>>();
}
}, new Func1<Integer, Collection<Integer>>() {
@Override
public Collection<Integer> call(Integer k) {
throw new TestException();
}
}).subscribe(ts);

ts.assertError(TestException.class);
ts.assertNoValues();
ts.assertNotCompleted();
}
}

0 comments on commit 9d4f9c3

Please sign in to comment.