Skip to content

Commit

Permalink
Merge pull request spotify#149 from togi/rx3-discard-wrapper
Browse files Browse the repository at this point in the history
Ported a fix for DiscardAfterDisposeWrapper that was missing in mobius-rx3
  • Loading branch information
togi authored Jan 11, 2022
2 parents 6a2c815 + 0e4e423 commit f27ea14
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

/**
* Wraps a {@link Connection} or a {@link Consumer} and blocks them from receiving any further
* values after the wrapper has been disposed.
* values after the wrapper has been disposed. Does not prevent races between {@link
* #accept(Object)} and {@link #dispose()} for wrapped {@link Connection}s; the behaviour if such a
* race happens is up to the original connection.
*/
class DiscardAfterDisposeWrapper<I> implements Consumer<I>, Disposable {
private final Consumer<I> consumer;
@Nullable private final Disposable disposable;
private boolean disposed;
private volatile boolean disposed;

static <I> DiscardAfterDisposeWrapper<I> wrapConnection(Connection<I> connection) {
checkNotNull(connection);
Expand All @@ -50,15 +52,15 @@ private DiscardAfterDisposeWrapper(Consumer<I> consumer, @Nullable Disposable di
}

@Override
public synchronized void accept(I effect) {
public void accept(I effect) {
if (disposed) {
return;
}
consumer.accept(effect);
}

@Override
public synchronized void dispose() {
public void dispose() {
disposed = true;
if (disposable != null) {
disposable.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,12 @@

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

import com.spotify.mobius.Connectable;
import com.spotify.mobius.Connection;
import com.spotify.mobius.functions.Consumer;
import com.spotify.mobius.test.RecordingConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -44,29 +35,22 @@ public class DiscardAfterDisposeConnectableTest {

private RecordingConsumer<String> recordingConsumer;
private Connection<Integer> connection;
private Semaphore blockEffectPerformer;
private Semaphore signalEffectHasBeenPerformed;
private BlockableConnection blockableConnection;
private TestConnection testConnection;

private DiscardAfterDisposeConnectable<Integer, String> underTest;

private final ExecutorService executorService = Executors.newCachedThreadPool();

@Before
public void setUp() throws Exception {
blockEffectPerformer = new Semaphore(0);
signalEffectHasBeenPerformed = new Semaphore(0);

recordingConsumer = new RecordingConsumer<>();
blockableConnection = new BlockableConnection(recordingConsumer);
testConnection = new TestConnection(recordingConsumer);

underTest =
new DiscardAfterDisposeConnectable<>(
new Connectable<Integer, String>() {
@Nonnull
@Override
public Connection<Integer> connect(Consumer<String> output) {
return blockableConnection;
return testConnection;
}
});
}
Expand Down Expand Up @@ -100,94 +84,49 @@ public Connection<Integer> connect(Consumer<String> output) {
}

@Test
public void delegatesEffectsToActualSink() throws Exception {
public void forwardsMessagesToWrappedConsumer() throws Exception {
connection = underTest.connect(recordingConsumer);
connection.accept(1);
recordingConsumer.assertValues("Value is: 1");
connection.accept(14);
recordingConsumer.assertValues("Value is: 14");
}

@Test
public void delegatesDisposeToActualSink() throws Exception {
public void delegatesDisposeToActualConnection() throws Exception {
connection = underTest.connect(recordingConsumer);
connection.dispose();
assertThat(blockableConnection.disposed, is(true));
assertThat(testConnection.disposed, is(true));
}

@Test
public void discardsEventsAfterDisposal() throws Exception {
// given a disposed connection
connection = underTest.connect(recordingConsumer);

// given the effect performer is blocked
blockableConnection.block = true;

// when an effect is requested
Future<?> effectPerformedFuture = executorService.submit(() -> connection.accept(1));

// and the sink is disposed
connection.dispose();

// before the effect gets performed
// (needs permitting the blocked effect performer to proceed)
blockEffectPerformer.release();

// (get the result of the future to ensure the effect has been performed, also propagating
// exceptions if any - result should happen quickly, but it's good to have a timeout in case
// something is messed up)
effectPerformedFuture.get(10, TimeUnit.SECONDS);

// then no events are emitted
recordingConsumer.assertValues();
}

@Test
public void discardsEffectsAfterDisposal() throws Exception {
// given a disposed sink
connection = underTest.connect(recordingConsumer);
connection.dispose();

// when an effect is performed
// when a message arrives
connection.accept(1);

// then no effects or events happen
blockableConnection.assertEffects();
// it is discarded
recordingConsumer.assertValues();
}

private class BlockableConnection implements Connection<Integer> {
private static class TestConnection implements Connection<Integer> {

private final List<Integer> recordedEffects = new ArrayList<>();
private boolean disposed;
private final Consumer<String> eventConsumer;
private volatile boolean block = false;

BlockableConnection(Consumer<String> eventConsumer) {
TestConnection(Consumer<String> eventConsumer) {
this.eventConsumer = eventConsumer;
}

void assertEffects(Integer... values) {
assertThat(recordedEffects, equalTo(Arrays.asList(values)));
}

@Override
public void accept(final Integer effect) {
if (block) {
try {
if (!blockEffectPerformer.tryAcquire(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("timed out waiting for effect performer unblock");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
recordedEffects.add(effect);
eventConsumer.accept("Value is: " + effect);
signalEffectHasBeenPerformed.release();
}

@Override
public void dispose() {
disposed = true;
signalEffectHasBeenPerformed.release();
}
}
}

0 comments on commit f27ea14

Please sign in to comment.