Skip to content

Commit

Permalink
[FLINK-12614][yarn] Refactor test to not do assertions in @after methods
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 6, 2019
1 parent e1a4db2 commit 3508465
Showing 7 changed files with 559 additions and 511 deletions.
Original file line number Diff line number Diff line change
@@ -153,51 +153,55 @@ private void initJobGraph() throws IOException {
*/
@Test
public void testKillYarnSessionClusterEntrypoint() throws Exception {
assumeTrue(
"This test kills processes via the pkill command. Thus, it only runs on Linux, Mac OS, Free BSD and Solaris.",
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
runTest(() -> {
assumeTrue(
"This test kills processes via the pkill command. Thus, it only runs on Linux, Mac OS, Free BSD and Solaris.",
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());

final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));

final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);

try {
final JobID jobId = submitJob(restClusterClient);
final ApplicationId id = restClusterClient.getClusterId();
try {
final JobID jobId = submitJob(restClusterClient);
final ApplicationId id = restClusterClient.getClusterId();

waitUntilJobIsRunning(restClusterClient, jobId);
waitUntilJobIsRunning(restClusterClient, jobId);

killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
waitForApplicationAttempt(id, 2);
killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
waitForApplicationAttempt(id, 2);

waitForJobTermination(restClusterClient, jobId);
waitForJobTermination(restClusterClient, jobId);

killApplicationAndWait(id);
} finally {
restClusterClient.shutdown();
}
killApplicationAndWait(id);
} finally {
restClusterClient.shutdown();
}
});
}

@Test
public void testJobRecoversAfterKillingTaskManager() throws Exception {
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
runTest(() -> {
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));

final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
waitUntilJobIsRunning(restClusterClient, jobId);
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
waitUntilJobIsRunning(restClusterClient, jobId);

stopTaskManagerContainer();
waitUntilJobIsRestarted(restClusterClient, jobId, 1);
stopTaskManagerContainer();
waitUntilJobIsRestarted(restClusterClient, jobId, 1);

waitForJobTermination(restClusterClient, jobId);
waitForJobTermination(restClusterClient, jobId);

killApplicationAndWait(restClusterClient.getClusterId());
} finally {
restClusterClient.shutdown();
}
killApplicationAndWait(restClusterClient.getClusterId());
} finally {
restClusterClient.shutdown();
}
});
}

private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId) throws Exception {
Original file line number Diff line number Diff line change
@@ -58,69 +58,71 @@ public static void setup() {

@Test
public void testPerJobMode() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
final YarnClient yarnClient = getYarnClient();
runTest(() -> {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
final YarnClient yarnClient = getYarnClient();

try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration,
getYarnConfiguration(),
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
yarnClient,
true)) {
try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration,
getYarnConfiguration(),
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
yarnClient,
true)) {

yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));

final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(768)
.setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.createClusterSpecification();
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(768)
.setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.createClusterSpecification();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

env.addSource(new NoDataSource())
.shuffle()
.addSink(new DiscardingSink<>());
env.addSource(new NoDataSource())
.shuffle()
.addSink(new DiscardingSink<>());

final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();

File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));

jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));

ApplicationId applicationId = null;
ClusterClient<ApplicationId> clusterClient = null;
ApplicationId applicationId = null;
ClusterClient<ApplicationId> clusterClient = null;

try {
clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
false);
applicationId = clusterClient.getClusterId();
try {
clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
false);
applicationId = clusterClient.getClusterId();

assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;
assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;

final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());
final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());

final JobResult jobResult = jobResultCompletableFuture.get();
final JobResult jobResult = jobResultCompletableFuture.get();

assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
} finally {
if (clusterClient != null) {
clusterClient.shutdown();
}
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
} finally {
if (clusterClient != null) {
clusterClient.shutdown();
}

if (applicationId != null) {
yarnClusterDescriptor.killCluster(applicationId);
if (applicationId != null) {
yarnClusterDescriptor.killCluster(applicationId);
}
}
}
}
});
}
}
Loading

0 comments on commit 3508465

Please sign in to comment.