Skip to content

Commit

Permalink
[FLINK-20981][coordination] Allow initializing jobs to be suspended
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 15, 2021
1 parent 4da9ff9 commit 78b4e60
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,11 @@ public static ArchivedExecutionGraph createFromInitializingJob(

ErrorInfo failureInfo = null;
if (throwable != null) {
Preconditions.checkState(jobStatus == JobStatus.FAILED);
Preconditions.checkState(
jobStatus == JobStatus.FAILED || jobStatus == JobStatus.SUSPENDED);
long failureTime = System.currentTimeMillis();
failureInfo = new ErrorInfo(throwable, failureTime);
timestamps[JobStatus.FAILED.ordinal()] = failureTime;
timestamps[jobStatus.ordinal()] = failureTime;
}

return new ArchivedExecutionGraph(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.testutils.CommonTestUtils;
Expand Down Expand Up @@ -51,9 +52,12 @@
import java.util.Map;
import java.util.Map.Entry;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/** Tests for the {@link ArchivedExecutionGraph}. */
Expand Down Expand Up @@ -135,6 +139,20 @@ public void testSerialization() throws IOException, ClassNotFoundException {
verifySerializability(archivedGraph);
}

@Test
public void testCreateFromInitializingJobForSuspendedJob() {
final ArchivedExecutionGraph suspendedExecutionGraph =
ArchivedExecutionGraph.createFromInitializingJob(
new JobID(),
"TestJob",
JobStatus.SUSPENDED,
new Exception("Test suspension exception"),
System.currentTimeMillis());

assertThat(suspendedExecutionGraph.getState(), is(JobStatus.SUSPENDED));
assertThat(suspendedExecutionGraph.getFailureInfo(), notNullValue());
}

private static void compareExecutionGraph(
AccessExecutionGraph runtimeGraph, AccessExecutionGraph archivedGraph)
throws IOException, ClassNotFoundException {
Expand Down

0 comments on commit 78b4e60

Please sign in to comment.