Skip to content

Commit

Permalink
[FLINK-13946] Make the ClusterClient autocloseable
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Sep 6, 2019
1 parent 80fa99d commit e8650cd
Show file tree
Hide file tree
Showing 18 changed files with 45 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void setDefaultParallelism(int defaultParallelism) {
public int getDefaultParallelism() {
return defaultParallelism;
}

// ------------------------------------------------------------------------
// Executing programs
// ------------------------------------------------------------------------
Expand All @@ -123,14 +123,8 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
private JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
checkNotNull(program);

ClusterClient<?> client = null;
try {
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
try (ClusterClient<?> client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor")) {
return client.run(program, defaultParallelism).getJobExecutionResult();
} finally {
if (client != null) {
client.shutdown();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private <T> void runProgram(
logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());

try {
client.shutdown();
client.close();
} catch (Exception e) {
LOG.info("Could not properly shut down the client.", e);
}
Expand Down Expand Up @@ -284,7 +284,7 @@ private <T> void runProgram(
}
}
try {
client.shutdown();
client.close();
} catch (Exception e) {
LOG.info("Could not properly shut down the client.", e);
}
Expand Down Expand Up @@ -944,7 +944,7 @@ private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, Comman
clusterAction.runAction(clusterClient);
} finally {
try {
clusterClient.shutdown();
clusterClient.close();
} catch (Exception e) {
LOG.info("Could not properly shut down the cluster client.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
*
* @param <T> type of the cluster id
*/
public abstract class ClusterClient<T> {
public abstract class ClusterClient<T> implements AutoCloseable {

protected final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -101,14 +101,8 @@ public ClusterClient(Configuration flinkConfig) {
this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
}

// ------------------------------------------------------------------------
// Startup & Shutdown
// ------------------------------------------------------------------------

/**
* User overridable hook to close the client, possibly closes internal services.
*/
public void shutdown() throws Exception {
@Override
public void close() throws Exception {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private void startLeaderRetrievers() throws Exception {
}

@Override
public void shutdown() {
public void close() {
ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, retryExecutorService);

this.restClient.shutdown(Time.seconds(5));
Expand All @@ -203,6 +203,12 @@ public void shutdown() {
} catch (Exception e) {
log.error("An error occurred during stopping the ClientHighAvailabilityServices", e);
}

try {
super.close();
} catch (Exception e) {
log.error("Error while closing the Cluster Client", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testTriggerSavepointSuccess() throws Exception {
assertTrue(buffer.toString().contains(savepointPath));
}
finally {
clusterClient.shutdown();
clusterClient.close();
restoreStdOutAndStdErr();
}
}
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testTriggerSavepointFailure() throws Exception {
}
}
finally {
clusterClient.shutdown();
clusterClient.close();
restoreStdOutAndStdErr();
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testTriggerSavepointCustomTarget() throws Exception {
assertTrue(buffer.toString().contains(savepointDirectory));
}
finally {
clusterClient.shutdown();
clusterClient.close();

restoreStdOutAndStdErr();
}
Expand Down Expand Up @@ -206,7 +206,7 @@ public void testDisposeSavepointSuccess() throws Exception {
assertTrue(outMsg.contains("disposed"));
}
finally {
clusterClient.shutdown();
clusterClient.close();
restoreStdOutAndStdErr();
}
}
Expand Down Expand Up @@ -243,7 +243,7 @@ public void testDisposeWithJar() throws Exception {

assertEquals(disposePath, actualSavepointPath);
} finally {
clusterClient.shutdown();
clusterClient.close();
restoreStdOutAndStdErr();
}
}
Expand Down Expand Up @@ -272,7 +272,7 @@ public void testDisposeSavepointFailure() throws Exception {
}
}
finally {
clusterClient.shutdown();
clusterClient.close();
restoreStdOutAndStdErr();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testJobSubmitCancel() throws Exception {
restClusterClient.cancel(jobId);
Assert.assertTrue(terminationHandler.jobCanceled);
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand All @@ -264,7 +264,7 @@ public void testDetachedJobSubmission() throws Exception {
assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
assertThat(jobSubmissionResult.getJobID(), is(jobId));
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}

Expand Down Expand Up @@ -383,7 +383,7 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
assertThat(cause.get().getMessage(), equalTo("expected"));
}
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand Down Expand Up @@ -432,7 +432,7 @@ public void testDisposeSavepoint() throws Exception {
}
}
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand Down Expand Up @@ -504,7 +504,7 @@ public void testListJobs() throws Exception {
JobStatusMessage job2 = jobDetailsIterator.next();
Assert.assertNotEquals("The job status should not be equal.", job1.getJobState(), job2.getJobState());
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand All @@ -528,7 +528,7 @@ public void testGetAccumulators() throws Exception {
assertEquals("testValue", accumulators.get("testKey").get().toString());
}
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand Down Expand Up @@ -583,7 +583,7 @@ public void testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() th

restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand All @@ -598,7 +598,7 @@ public void testJobSubmissionFailureThrowsProgramInvocationException() throws Ex
} catch (final ProgramInvocationException expected) {
// expected
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand Down Expand Up @@ -635,7 +635,7 @@ public void testSendIsNotRetriableIfHttpNotFound() throws Exception {
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent(), is(true));
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ object FlinkShell {
case Some(Left(miniCluster)) => miniCluster.close()
case Some(Right(yarnCluster)) =>
yarnCluster.shutDownCluster()
yarnCluster.shutdown()
yarnCluster.close()
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
}
finally {
try {
client.shutdown();
client.close();
} catch (Exception e) {
LOG.warn("Could not properly shut down the cluster client.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultI
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
clusterClient.close();
}
} catch (Exception e) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private <T> void deployJobOnNewCluster(
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
clusterClient.close();
}
} catch (Exception e) {
// ignore
Expand Down Expand Up @@ -172,7 +172,7 @@ private <T> void deployJobOnExistingCluster(
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
clusterClient.close();
}
} catch (Exception e) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void after() {

if (clusterClient != null) {
try {
clusterClient.shutdown();
clusterClient.close();
} catch (Exception e) {
exception = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setUp() throws Exception {
@After
public void tearDown() {
if (client != null) {
client.shutdown();
client.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Long map(Long value) throws Exception {
taskManagerProcess.destroy();
}
if (clusterClient != null) {
clusterClient.shutdown();
clusterClient.close();
}
if (dispatcherResourceManagerComponent != null) {
dispatcherResourceManagerComponent.deregisterApplicationAndClose(ApplicationStatus.SUCCEEDED, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public String map(Integer value) throws Exception {

assertEquals(expected, result);
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testKillYarnSessionClusterEntrypoint() throws Exception {

killApplicationAndWait(id);
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
});
}
Expand All @@ -199,7 +199,7 @@ public void testJobRecoversAfterKillingTaskManager() throws Exception {

killApplicationAndWait(restClusterClient.getClusterId());
} finally {
restClusterClient.shutdown();
restClusterClient.close();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void testPerJobMode() throws Exception {
waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor);
} finally {
if (clusterClient != null) {
clusterClient.shutdown();
clusterClient.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void testFlinkContainerMemory() throws Exception {
assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB));
} finally {
restClient.shutdown(TIMEOUT);
clusterClient.shutdown();
clusterClient.close();
}

clusterDescriptor.killCluster(clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ public int run(String[] args) throws CliArgsException, FlinkException {
yarnClusterDescriptor.getDynamicPropertiesEncoded());
} catch (Exception e) {
try {
clusterClient.shutdown();
clusterClient.close();
} catch (Exception ex) {
LOG.info("Could not properly shutdown cluster client.", ex);
}
Expand Down Expand Up @@ -705,7 +705,7 @@ private void shutdownCluster(
clusterClient.shutDownCluster();

try {
clusterClient.shutdown();
clusterClient.close();
} catch (Exception e) {
LOG.info("Could not properly shutdown cluster client.", e);
}
Expand Down

0 comments on commit e8650cd

Please sign in to comment.