Skip to content

Commit

Permalink
[hotfix] Make TaskManagerRunner shutdown asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed May 17, 2018
1 parent c832f52 commit 4922ced
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
Expand All @@ -47,6 +48,7 @@
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;

Expand All @@ -56,6 +58,8 @@

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -69,10 +73,12 @@
* It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
* and starts them.
*/
public class TaskManagerRunner implements FatalErrorHandler {
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {

private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);

private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;

private static final int STARTUP_FAILURE_RETURN_CODE = 1;

private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
Expand All @@ -98,6 +104,10 @@ public class TaskManagerRunner implements FatalErrorHandler {

private final TaskExecutor taskManager;

private final CompletableFuture<Void> terminationFuture;

private boolean shutdown;

public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
this.configuration = checkNotNull(configuration);
this.resourceId = checkNotNull(resourceId);
Expand Down Expand Up @@ -137,6 +147,9 @@ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) thr
blobCacheService,
false,
this);

this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -147,19 +160,37 @@ public void start() throws Exception {
taskManager.start();
}

public void shutDown() throws Exception {
shutDownInternally();
}

protected void shutDownInternally() throws Exception {
Exception exception = null;

@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
try {
if (!shutdown) {
shutdown = true;

taskManager.shutDown();
} catch (Exception e) {
exception = e;
final CompletableFuture<Void> taskManagerTerminationFuture = taskManager.getTerminationFuture();

final CompletableFuture<Void> serviceTerminationFuture = FutureUtils.composeAfterwards(
taskManagerTerminationFuture,
this::shutDownServices);

serviceTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
});
}
}

return terminationFuture;
}

private CompletableFuture<Void> shutDownServices() {
synchronized (lock) {
Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
Exception exception = null;

try {
blobCacheService.close();
Expand All @@ -173,33 +204,27 @@ protected void shutDownInternally() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
rpcService.stopService().get();
} catch (InterruptedException ie) {
exception = ExceptionUtils.firstOrSuppressed(ie, exception);

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
highAvailabilityServices.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
terminationFutures.add(rpcService.stopService());

terminationFutures.add(ExecutorUtils.nonBlockingShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor));

if (exception != null) {
throw exception;
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}

return FutureUtils.completeAll(terminationFutures);
}
}

// export the termination future for caller to know it is terminated
public CompletableFuture<Void> getTerminationFuture() {
return taskManager.getTerminationFuture();
return terminationFuture;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -210,12 +235,21 @@ public CompletableFuture<Void> getTerminationFuture() {
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);

try {
shutDown();
} catch (Throwable t) {
LOG.error("Could not properly shut down TaskManager.", t);
if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
terminateJVM();
} else {
closeAsync();

FutureUtils.orTimeout(terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);

terminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
terminateJVM();
});
}
}

protected void terminateJVM() {
System.exit(RUNTIME_FAILURE_RETURN_CODE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.net.ServerSocket;
import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link TaskManagerRunner}.
*/
public class TaskManagerRunnerTest extends TestLogger {

@Test
public void testTaskManagerRunnerShutdown() throws Exception {
final Configuration configuration = new Configuration();
final ResourceID taskManagerResourceId = ResourceID.generate();

final ServerSocket localhost = new ServerSocket(0);

configuration.setString(JobManagerOptions.ADDRESS, localhost.getInetAddress().getHostName());
configuration.setInteger(JobManagerOptions.PORT, localhost.getLocalPort());
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
final CompletableFuture<Void> jvmTerminationFuture = new CompletableFuture<>();
final TestingTaskManagerRunner taskManagerRunner = new TestingTaskManagerRunner(configuration, taskManagerResourceId, jvmTerminationFuture);

taskManagerRunner.start();

try {
// wait until we trigger the jvm termination
jvmTerminationFuture.get();

assertThat(taskManagerRunner.getTerminationFuture().isDone(), is(true));
} finally {
localhost.close();
taskManagerRunner.close();
}
}

private static class TestingTaskManagerRunner extends TaskManagerRunner {

private final CompletableFuture<Void> jvmTerminationFuture;

public TestingTaskManagerRunner(Configuration configuration, ResourceID resourceId, CompletableFuture<Void> jvmTerminationFuture) throws Exception {
super(configuration, resourceId);
this.jvmTerminationFuture = jvmTerminationFuture;
}

@Override
protected void terminateJVM() {
jvmTerminationFuture.complete(null);
}
}
}

0 comments on commit 4922ced

Please sign in to comment.