Skip to content

Commit

Permalink
[FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consume…
Browse files Browse the repository at this point in the history
…r in EFO mode
  • Loading branch information
dzikosc authored and dannycranmer committed Jul 25, 2022
1 parent 7496b68 commit 4bbf319
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ public void shutdownFetcher() {
LOG.warn("Encountered exception closing record publisher factory", e);
}
} finally {
shardConsumersExecutor.shutdown();
gracefulShutdownShardConsumers();

cancelFuture.complete(null);

Expand Down Expand Up @@ -852,6 +852,11 @@ protected void deregisterStreamConsumer() {
StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);
}

/** Gracefully stops shardConsumersExecutor without interrupting running threads. */
private void gracefulShutdownShardConsumers() {
shardConsumersExecutor.shutdown();
}

/**
* Returns a flag indicating if this fetcher is running.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,7 @@ private boolean consumeAllRecordsFromKinesisShard(
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
// The subscription is complete, but the shard might not be, so we return incomplete
result = false;
break;
return false;
} else {
handleError(subscriptionEvent.getThrowable());
result = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.flink.util.TestNameProvider;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -58,6 +58,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand All @@ -67,13 +69,13 @@ public class FlinkKinesisITCase extends TestLogger {
private String stream;
private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER =
@Rule
public final MiniClusterWithClientResource miniCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder().build());

@ClassRule
public static KinesaliteContainer kinesalite =
@Rule
public KinesaliteContainer kinesalite =
new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE));

@Rule public TemporaryFolder temp = new TemporaryFolder();
Expand All @@ -86,19 +88,31 @@ public class FlinkKinesisITCase extends TestLogger {

@Before
public void setupClient() throws Exception {
client = new KinesisPubsubClient(kinesalite.getContainerProperties());
client = new KinesisPubsubClient(getContainerProperties());
stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
client.createTopic(stream, 1, new Properties());
}

@Test
public void testStopWithSavepoint() throws Exception {
testStopWithSavepoint(false);
testStopWithSavepoint(false, false);
}

@Test
public void testStopWithSavepointWithDrain() throws Exception {
testStopWithSavepoint(true);
testStopWithSavepoint(true, false);
}

@Test
@Ignore("Kinesalite does not support EFO")
public void testStopWithSavepointWithEfo() throws Exception {
testStopWithSavepoint(false, true);
}

@Test
@Ignore("Kinesalite does not support EFO")
public void testStopWithSavepointWithDrainAndEfo() throws Exception {
testStopWithSavepoint(true, true);
}

/**
Expand All @@ -113,7 +127,7 @@ public void testStopWithSavepointWithDrain() throws Exception {
* <li>With the fix, the job proceeds and we can lift the backpressure.
* </ol>
*/
private void testStopWithSavepoint(boolean drain) throws Exception {
private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
// add elements to the test stream
int numElements = 1000;
client.sendMessage(
Expand All @@ -124,14 +138,10 @@ private void testStopWithSavepoint(boolean drain) throws Exception {
env.setParallelism(1);
env.enableCheckpointing(100L);

Properties config = kinesalite.getContainerProperties();
config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
FlinkKinesisConsumer<String> consumer =
new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);

SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
DataStream<String> stream =
env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
DataStream<String> outputStream =
env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));

// call stop with savepoint in another thread
ForkJoinTask<Object> stopTask =
ForkJoinPool.commonPool()
Expand All @@ -142,7 +152,7 @@ private void testStopWithSavepoint(boolean drain) throws Exception {
return null;
});
try {
List<String> result = stream.executeAndCollect(10000);
List<String> result = outputStream.executeAndCollect(10000);
if (drain) {
assertThat(
result,
Expand All @@ -165,10 +175,24 @@ private void testStopWithSavepoint(boolean drain) throws Exception {
}
}

private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
Properties config = getContainerProperties();
config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
if (efo) {
config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
}
return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
}

private Properties getContainerProperties() {
return kinesalite.getContainerProperties();
}

private String stopWithSavepoint(boolean drain) throws Exception {
JobStatusMessage job =
MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
return MINI_CLUSTER
miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
return miniCluster
.getClusterClient()
.stopWithSavepoint(
job.getJobId(),
Expand All @@ -188,13 +212,15 @@ private static class WaitingMapper

WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
this.savepointTrigger = savepointTrigger;
checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
// effectively set 1 hour timeout on the wait
// this is reduced to 1 second once the data starts flowing
checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
}

private void readObject(ObjectInputStream stream)
throws ClassNotFoundException, IOException {
stream.defaultReadObject();
checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
import static org.junit.Assert.assertFalse;

/** Tests for {@link FanOutShardSubscriber}. */
public class FanOutShardSubscriberTest {
Expand Down Expand Up @@ -120,6 +121,25 @@ public void testMultipleErrorsThrownPassesFirstErrorToConsumer() throws Exceptio
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
}

@Test
public void testSubscriptionCompletion() throws Exception {
FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2 errorKinesisV2 =
FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();

FanOutShardSubscriber subscriber =
new FanOutShardSubscriber(
"consumerArn",
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);

StartingPosition startingPosition = StartingPosition.builder().build();
boolean result =
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});

assertFalse(result);
}

@Test
public void testTimeoutSubscribingToShard() throws Exception {
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
Expand Down

0 comments on commit 4bbf319

Please sign in to comment.