Skip to content

Commit

Permalink
[FLINK-21518][tests] Fix testMinCheckpointPause instability
Browse files Browse the repository at this point in the history
The instability is caused by last assertion depending on timing.
Solve by waiting on condition.

Another issue is that failure can cause RejectionExecutionException.
This is caused by shutting down executor (in tests finally block)
without shutting down CheckpointCoordinator.
Solve by shutting down CheckpointCoordinator first.
  • Loading branch information
rkhachatryan committed Feb 27, 2021
1 parent c02efc6 commit 8c610fc
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public void testMinCheckpointPause() throws Exception {
// will use a different thread to allow checkpoint triggering before exiting from
// receiveAcknowledgeMessage
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
CheckpointCoordinator coordinator = null;
try {
int pause = 1000;
JobVertexID jobVertexId = new JobVertexID();
Expand All @@ -291,7 +292,7 @@ public void testMinCheckpointPause() throws Exception {
ExecutionVertex vertex = graph.getJobVertex(jobVertexId).getTaskVertices()[0];
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();

CheckpointCoordinator coordinator =
coordinator =
new CheckpointCoordinatorBuilder()
.setTimer(new ScheduledExecutorServiceAdapter(executorService))
.setCheckpointCoordinatorConfiguration(
Expand Down Expand Up @@ -319,9 +320,14 @@ public void testMinCheckpointPause() throws Exception {
TASK_MANAGER_LOCATION_INFO);
Thread.sleep(pause / 2);
assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
Thread.sleep(pause);
assertEquals(1, coordinator.getNumberOfPendingCheckpoints());
// make sure that the 2nd request is eventually processed
while (coordinator.getNumberOfPendingCheckpoints() == 0) {
Thread.sleep(1);
}
} finally {
if (coordinator != null) {
coordinator.shutdown();
}
executorService.shutdownNow();
}
}
Expand Down

0 comments on commit 8c610fc

Please sign in to comment.