Skip to content

Commit

Permalink
[FLINK-20675][checkpointing] Only decline async checkpoint failure wh…
Browse files Browse the repository at this point in the history
…en task is still running
  • Loading branch information
Myasuka committed Jan 28, 2021
1 parent 9ce6bf7 commit be6eb95
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -50,6 +51,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
private final String taskName;
private final Consumer<AsyncCheckpointRunnable> registerConsumer;
private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
private final Supplier<Boolean> isTaskRunning;
private final Environment taskEnvironment;

public boolean isRunning() {
Expand Down Expand Up @@ -79,7 +81,8 @@ enum AsyncCheckpointState {
Consumer<AsyncCheckpointRunnable> register,
Consumer<AsyncCheckpointRunnable> unregister,
Environment taskEnvironment,
AsyncExceptionHandler asyncExceptionHandler) {
AsyncExceptionHandler asyncExceptionHandler,
Supplier<Boolean> isTaskRunning) {

this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress);
this.checkpointMetaData = checkNotNull(checkpointMetaData);
Expand All @@ -90,6 +93,7 @@ enum AsyncCheckpointState {
this.unregisterConsumer = unregister;
this.taskEnvironment = checkNotNull(taskEnvironment);
this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
this.isTaskRunning = isTaskRunning;
}

@Override
Expand Down Expand Up @@ -254,19 +258,29 @@ private void handleExecutionException(Exception e) {
+ '.',
e);

// We only report the exception for the original cause of fail and cleanup.
// Otherwise this followup exception could race the original exception in failing
// the task.
try {
taskEnvironment.declineCheckpoint(
if (isTaskRunning.get()) {
// We only report the exception for the original cause of fail and cleanup.
// Otherwise this followup exception could race the original exception in
// failing the task.
try {
taskEnvironment.declineCheckpoint(
checkpointMetaData.getCheckpointId(),
new CheckpointException(
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
checkpointException));
} catch (Exception unhandled) {
AsynchronousException asyncException = new AsynchronousException(unhandled);
asyncExceptionHandler.handleAsyncException(
"Failure in asynchronous checkpoint materialization",
asyncException);
}
} else {
// We never decline checkpoint after task is not running to avoid unexpected job
// failover, which caused by exceeding checkpoint tolerable failure threshold.
LOG.warn(
"As task is already not running, no longer decline checkpoint {}.",
checkpointMetaData.getCheckpointId(),
new CheckpointException(
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
checkpointException));
} catch (Exception unhandled) {
AsynchronousException asyncException = new AsynchronousException(unhandled);
asyncExceptionHandler.handleAsyncException(
"Failure in asynchronous checkpoint materialization", asyncException);
checkpointException);
}

currentState = AsyncCheckpointState.DISCARDED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ private boolean performCheckpoint(
checkpointOptions,
checkpointMetrics,
operatorChain,
this::isCanceled);
this::isRunning);
});

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void checkpointState(
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled)
Supplier<Boolean> isRunning)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void checkpointState(
CheckpointOptions options,
CheckpointMetricsBuilder metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled)
Supplier<Boolean> isRunning)
throws Exception {

checkNotNull(options);
Expand Down Expand Up @@ -296,8 +296,8 @@ public void checkpointState(
new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(
snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
finishAndReportAsync(snapshotFutures, metadata, metrics, options);
snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
Expand Down Expand Up @@ -500,7 +500,7 @@ private void finishAndReportAsync(
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures,
CheckpointMetaData metadata,
CheckpointMetricsBuilder metrics,
CheckpointOptions options) {
Supplier<Boolean> isRunning) {
// we are transferring ownership over snapshotInProgressList for cleanup to the thread,
// active on submit
asyncOperationsThreadPool.execute(
Expand All @@ -513,7 +513,8 @@ private void finishAndReportAsync(
registerConsumer(),
unregisterConsumer(),
env,
asyncExceptionHandler));
asyncExceptionHandler,
isRunning));
}

private Consumer<AsyncCheckpointRunnable> registerConsumer() {
Expand All @@ -538,7 +539,7 @@ private boolean takeSnapshotSync(
CheckpointMetricsBuilder checkpointMetrics,
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled)
Supplier<Boolean> isRunning)
throws Exception {

for (final StreamOperatorWrapper<?, ?> operatorWrapper :
Expand Down Expand Up @@ -576,7 +577,7 @@ private boolean takeSnapshotSync(
checkpointOptions,
operatorChain,
operatorWrapper.getStreamOperator(),
isCanceled,
isRunning,
channelStateWriteResult,
storage));
}
Expand All @@ -603,13 +604,13 @@ private OperatorSnapshotFutures buildOperatorSnapshotFutures(
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
StreamOperator<?> op,
Supplier<Boolean> isCanceled,
Supplier<Boolean> isRunning,
ChannelStateWriteResult channelStateWriteResult,
CheckpointStreamFactory storage)
throws Exception {
OperatorSnapshotFutures snapshotInProgress =
checkpointStreamOperator(
op, checkpointMetaData, checkpointOptions, storage, isCanceled);
op, checkpointMetaData, checkpointOptions, storage, isRunning);
if (op == operatorChain.getMainOperator()) {
snapshotInProgress.setInputChannelStateFuture(
channelStateWriteResult
Expand Down Expand Up @@ -684,7 +685,7 @@ private static OperatorSnapshotFutures checkpointStreamOperator(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory storageLocation,
Supplier<Boolean> isCanceled)
Supplier<Boolean> isRunning)
throws Exception {
try {
return op.snapshotState(
Expand All @@ -693,7 +694,7 @@ private static OperatorSnapshotFutures checkpointStreamOperator(
checkpointOptions,
storageLocation);
} catch (Exception ex) {
if (!isCanceled.get()) {
if (isRunning.get()) {
LOG.info(ex.getMessage(), ex);
}
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

/** Tests for {@link AsyncCheckpointRunnable}. */
public class AsyncCheckpointRunnableTest {

@Test
public void testAsyncCheckpointException() {
public void testDeclineWithAsyncCheckpointExceptionWhenRunning() {
testAsyncCheckpointException(() -> true);
}

@Test
public void testDeclineWithAsyncCheckpointExceptionWhenNotRunning() {
testAsyncCheckpointException(() -> false);
}

private void testAsyncCheckpointException(Supplier<Boolean> isTaskRunning) {
final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
snapshotsInProgress.put(
new OperatorID(),
Expand All @@ -66,13 +76,18 @@ public void testAsyncCheckpointException() {
r -> {},
r -> {},
environment,
(msg, ex) -> {});
(msg, ex) -> {},
isTaskRunning);
runnable.run();

Assert.assertTrue(environment.getCause() instanceof CheckpointException);
Assert.assertSame(
((CheckpointException) environment.getCause()).getCheckpointFailureReason(),
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
if (isTaskRunning.get()) {
Assert.assertTrue(environment.getCause() instanceof CheckpointException);
Assert.assertSame(
((CheckpointException) environment.getCause()).getCheckpointFailureReason(),
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
} else {
Assert.assertNull(environment.getCause());
}
}

private static class TestEnvironment extends StreamMockEnvironment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public void testReportingFromSnapshotToTaskStateManager() throws Exception {
asyncCheckpointRunnable -> {},
asyncCheckpointRunnable -> {},
testStreamTask.getEnvironment(),
testStreamTask);
testStreamTask,
() -> true);

checkpointMetrics.setAlignmentDurationNanos(0L);
checkpointMetrics.setBytesProcessedDuringAlignment(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent)
new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
new CheckpointMetricsBuilder(),
operatorChain,
() -> false);
() -> true);

assertEquals(false, broadcastedPriorityEvent.get());
}
Expand All @@ -192,7 +192,7 @@ public void testSkipChannelStateForSavepoints() throws Exception {
new CheckpointMetricsBuilder(),
new OperatorChain<>(
new NoOpStreamTask<>(new DummyEnvironment()), new NonRecordWriter<>()),
() -> false);
() -> true);
}

@Test
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetricsBuilder(),
operatorChain,
() -> true);
() -> false);
assertFalse(checkpointOperator.isCheckpointed());
assertEquals(-1, stateManager.getReportedCheckpointId());
assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize());
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throw
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetricsBuilder(),
operatorChain,
() -> true);
() -> false);

assertEquals(1, recordOrEvents.size());
Object recordOrEvent = recordOrEvents.get(0);
Expand Down Expand Up @@ -355,7 +355,7 @@ public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetricsBuilder(),
operatorChain,
() -> true);
() -> false);
rawKeyedStateHandleFuture.awaitRun();
assertEquals(1, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
assertFalse(rawKeyedStateHandleFuture.isCancelled());
Expand Down Expand Up @@ -385,7 +385,7 @@ public void testNotifyCheckpointAbortedAfterAsyncPhase() throws Exception {
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetricsBuilder(),
operatorChain,
() -> true);
() -> false);
subtaskCheckpointCoordinator.notifyCheckpointAborted(
checkpointId, operatorChain, () -> true);
assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void checkpointState(
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) {}
Supplier<Boolean> isRunning) {}

@Override
public void notifyCheckpointComplete(
Expand Down

0 comments on commit be6eb95

Please sign in to comment.