Skip to content

Commit

Permalink
[FLINK-10482] Fix double counting of checkpoint stat
Browse files Browse the repository at this point in the history
  • Loading branch information
azagrebin authored and tillrohrmann committed Dec 6, 2018
1 parent 64f6d0b commit 114cb2c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.checkpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -26,6 +29,7 @@
* Counts of checkpoints.
*/
public class CheckpointStatsCounts implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointStatsCounts.class);

private static final long serialVersionUID = -5229425063269482528L;

Expand Down Expand Up @@ -147,9 +151,8 @@ void incrementInProgressCheckpoints() {
* {@link #incrementInProgressCheckpoints()}.
*/
void incrementCompletedCheckpoints() {
if (--numInProgressCheckpoints < 0) {
throw new IllegalStateException("Incremented the completed number of checkpoints " +
"without incrementing the in progress checkpoints before.");
if (canDecrementOfInProgressCheckpointsNumber()) {
numInProgressCheckpoints--;
}
numCompletedCheckpoints++;
}
Expand All @@ -161,9 +164,8 @@ void incrementCompletedCheckpoints() {
* {@link #incrementInProgressCheckpoints()}.
*/
void incrementFailedCheckpoints() {
if (--numInProgressCheckpoints < 0) {
throw new IllegalStateException("Incremented the completed number of checkpoints " +
"without incrementing the in progress checkpoints before.");
if (canDecrementOfInProgressCheckpointsNumber()) {
numInProgressCheckpoints--;
}
numFailedCheckpoints++;
}
Expand All @@ -181,4 +183,14 @@ CheckpointStatsCounts createSnapshot() {
numCompletedCheckpoints,
numFailedCheckpoints);
}

private boolean canDecrementOfInProgressCheckpointsNumber() {
boolean decrementLeadsToNegativeNumber = numInProgressCheckpoints - 1 < 0;
if (decrementLeadsToNegativeNumber) {
String errorMessage = "Incremented the completed number of checkpoints " +
"without incrementing the in progress checkpoints before.";
LOG.warn(errorMessage);
}
return !decrementLeadsToNegativeNumber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -964,19 +964,18 @@ public CompletableFuture<String> triggerSavepoint(
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.thenApplyAsync(path -> {
if (cancelJob) {
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel(timeout);
}
return path;
}, getMainThreadExecutor())
.exceptionally(throwable -> {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
});
}, getMainThreadExecutor());
}

private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;

/** Test checkpoint statistics counters. */
public class CheckpointStatsCountsTest {

/**
* Tests that counts are reported correctly.
*/
@Test
public void testCounts() throws Exception {
public void testCounts() {
CheckpointStatsCounts counts = new CheckpointStatsCounts();
assertEquals(0, counts.getNumberOfRestoredCheckpoints());
assertEquals(0, counts.getTotalNumberOfCheckpoints());
Expand Down Expand Up @@ -80,27 +81,23 @@ public void testCounts() throws Exception {
* incrementing the in progress checkpoints before throws an Exception.
*/
@Test
public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception {
public void testCompleteOrFailWithoutInProgressCheckpoint() {
CheckpointStatsCounts counts = new CheckpointStatsCounts();
try {
counts.incrementCompletedCheckpoints();
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
}

try {
counts.incrementFailedCheckpoints();
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
}
counts.incrementCompletedCheckpoints();
assertTrue("Number of checkpoints in progress should never be negative",
counts.getNumberOfInProgressCheckpoints() >= 0);

counts.incrementFailedCheckpoints();
assertTrue("Number of checkpoints in progress should never be negative",
counts.getNumberOfInProgressCheckpoints() >= 0);
}

/**
* Tests that that taking snapshots of the state are independent from the
* parent.
*/
@Test
public void testCreateSnapshot() throws Exception {
public void testCreateSnapshot() {
CheckpointStatsCounts counts = new CheckpointStatsCounts();
counts.incrementRestoredCheckpoints();
counts.incrementRestoredCheckpoints();
Expand Down

0 comments on commit 114cb2c

Please sign in to comment.