Skip to content

Commit

Permalink
[FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
Browse files Browse the repository at this point in the history
- Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown,
consumerThread.interrupt() was getting absorbed inside
KinesisDataFetcher's while(running) loop, therefore
TestableKinesisDataFetcherForShardConsumerException's awaitTermination()
wasn't getting interrupted by it. This led to deadlock, with
KinesisDataFetcher waiting on the test code to send the interrupt, and
the test code waiting for KinesisDataFetcher to throw the expected
exception.
- Now, the test code waits until KinesisDataFetcher is inside
awaitTermination() before producing the interrupt, so it can be sure
that the interrupt it produces will be received/handled inside
awaitTermination().
  • Loading branch information
rehevkor5 authored and tweise committed Jul 22, 2019
1 parent 5da6e4c commit 091266d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ public void testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown() thro
DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);

CheckedThread consumerThread = new CheckedThread() {
CheckedThread consumerThread = new CheckedThread("FlinkKinesisConsumer") {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
Expand All @@ -858,6 +858,10 @@ public void go() throws Exception {
// ShardConsumer exception (from deserializer) will result in fetcher being shut down.
fetcher.waitUntilShutdown(20, TimeUnit.SECONDS);

// Ensure that KinesisDataFetcher has exited its while(running) loop and is inside its awaitTermination()
// method before we interrupt its thread, so that our interrupt doesn't get absorbed by any other mechanism.
fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS);

// Interrupt the thread so that KinesisDataFetcher#awaitTermination() will throw InterruptedException.
consumerThread.interrupt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kinesis.testutils;

import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
Expand All @@ -32,13 +33,19 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Extension of the {@link KinesisDataFetcher} for testing what happens when the thread is interrupted during
* {@link #awaitTermination()}.
*/
public class TestableKinesisDataFetcherForShardConsumerException<T> extends TestableKinesisDataFetcher<T> {
public volatile boolean wasInterrupted = false;

private OneShotLatch awaitTerminationWaiter = new OneShotLatch();

public TestableKinesisDataFetcherForShardConsumerException(final List<String> fakeStreams,
final SourceFunction.SourceContext<T> sourceContext,
final Properties fakeConfiguration,
Expand All @@ -54,7 +61,12 @@ public TestableKinesisDataFetcherForShardConsumerException(final List<String> fa
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
}

public volatile boolean wasInterrupted = false;
/**
* Block until awaitTermination() has been called on this class.
*/
public void waitUntilAwaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
awaitTerminationWaiter.await(timeout, timeUnit);
}

@Override
protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
Expand All @@ -65,6 +77,7 @@ protected ExecutorService createShardConsumersThreadPool(final String subtaskNam

@Override
public void awaitTermination() throws InterruptedException {
awaitTerminationWaiter.trigger();
try {
// Force this method to only exit by thread getting interrupted.
while (true) {
Expand Down

0 comments on commit 091266d

Please sign in to comment.