Skip to content

Commit

Permalink
[FLINK-24846][streaming] Ignoring completing async operator record if…
Browse files Browse the repository at this point in the history
… mailbox is closed already
  • Loading branch information
akalash authored and pnowojski committed Dec 15, 2021
1 parent a6e1cc6 commit 4065bfb
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -301,8 +302,18 @@ private void outputCompletedElement() {
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent mails
if (queue.hasCompletedElements()) {
mailboxExecutor.execute(
this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
try {
mailboxExecutor.execute(
this::outputCompletedElement,
"AsyncWaitOperator#outputCompletedElement");
} catch (RejectedExecutionException mailboxClosedException) {
// This exception can only happen if the operator is cancelled which means all
// pending records can be safely ignored since they will be processed one more
// time after recovery.
LOG.debug(
"Attempt to complete element is ignored since the mailbox rejected the execution.",
mailboxClosedException);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
Expand All @@ -68,6 +70,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.testcontainers.shaded.com.google.common.collect.Lists;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -110,6 +113,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
private static final long TIMEOUT = 1000L;

@Rule public Timeout timeoutRule = new Timeout(100, TimeUnit.SECONDS);
@Rule public final SharedObjects sharedObjects = SharedObjects.create();

private abstract static class MyAbstractAsyncFunction<IN>
extends RichAsyncFunction<IN, Integer> {
Expand Down Expand Up @@ -1022,6 +1026,55 @@ public void testRestartWithFullQueue() throws Exception {
assertThat(outputElements, Matchers.equalTo(expectedOutput));
}

@Test
public void testIgnoreAsyncOperatorRecordsOnDrain() throws Exception {
// given: Async wait operator which are able to collect result futures.
StreamTaskMailboxTestHarnessBuilder<Integer> builder =
new StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
SharedReference<List<ResultFuture<?>>> resultFutures = sharedObjects.add(new ArrayList<>());
try (StreamTaskMailboxTestHarness<Integer> harness =
builder.setupOutputForSingletonOperatorChain(
new AsyncWaitOperatorFactory<>(
new CollectableFuturesAsyncFunction<>(resultFutures),
TIMEOUT,
5,
AsyncDataStream.OutputMode.ORDERED))
.build()) {
// when: Processing at least two elements in reverse order to keep completed queue not
// empty.
harness.processElement(new StreamRecord<>(1));
harness.processElement(new StreamRecord<>(2));

for (ResultFuture<?> resultFuture : Lists.reverse(resultFutures.get())) {
resultFuture.complete(Collections.emptyList());
}

// then: All records from async operator should be ignored during drain since they will
// be processed on recovery.
harness.finishProcessing();
assertTrue(harness.getOutput().isEmpty());
}
}

private static class CollectableFuturesAsyncFunction<IN> implements AsyncFunction<IN, IN> {

private static final long serialVersionUID = -4214078239227288637L;

private final SharedReference<List<ResultFuture<?>>> resultFutures;

private CollectableFuturesAsyncFunction(
SharedReference<List<ResultFuture<?>>> resultFutures) {
this.resultFutures = resultFutures;
}

@Override
public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
resultFutures.get().add(resultFuture);
}
}

private static class ControllableAsyncFunction<IN> implements AsyncFunction<IN, IN> {

private static final long serialVersionUID = -4214078239267288636L;
Expand Down

0 comments on commit 4065bfb

Please sign in to comment.