Skip to content

Commit

Permalink
[FLINK-10753] Improve propagation and logging of snapshot exceptions
Browse files Browse the repository at this point in the history
This closes apache#7064.
  • Loading branch information
StefanRRichter committed Nov 12, 2018
1 parent bf760f9 commit dc8e27f
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1249,11 +1249,14 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th

final long checkpointId = pendingCheckpoint.getCheckpointId();

final String reason = (cause != null) ? cause.getMessage() : "";
LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause);

LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
if (cause != null) {
pendingCheckpoint.abortError(cause);
} else {
pendingCheckpoint.abortDeclined();
}

pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);

// we don't have to schedule another "dissolving" checkpoint any more because the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -433,25 +434,23 @@ public void abortSubsumed() {
}
}


public void abortDeclined() {
try {
Exception cause = new Exception("Checkpoint was declined (tasks not ready)");
onCompletionPromise.completeExceptionally(cause);
reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
abortWithCause(new Exception("Checkpoint was declined (tasks not ready)"));
}

/**
* Aborts the pending checkpoint due to an error.
* @param cause The error's exception.
*/
public void abortError(Throwable cause) {
public void abortError(@Nonnull Throwable cause) {
abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
}

private void abortWithCause(@Nonnull Exception cause) {
try {
Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause);
onCompletionPromise.completeExceptionally(failure);
reportFailedCheckpoint(failure);
onCompletionPromise.completeExceptionally(cause);
reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcCheckpointResponder implements CheckpointResponder {

private static final Logger LOG = LoggerFactory.getLogger(RpcCheckpointResponder.class);

private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;

public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) {
Expand Down Expand Up @@ -57,6 +62,7 @@ public void declineCheckpoint(
long checkpointId,
Throwable cause) {

LOG.info("Declining checkpoint {} of job {}.", checkpointId, jobID, cause);
checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,10 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
snapshotException.addSuppressed(e);
}

throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + '.', snapshotException);
String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + ".";

throw new Exception(snapshotFailMessage, snapshotException);
}

return snapshotInProgress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien
} catch (Exception e) {
String exceptionString = ExceptionUtils.stringifyException(e);
if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy
|| exceptionString.matches("(.*\n)*.*was not running(.*\n)*")
|| exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // new
|| exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // new
throw e;
Expand Down

0 comments on commit dc8e27f

Please sign in to comment.