Skip to content

Commit

Permalink
[FLINK-18068] Use FatalErrorHandler to handle the error thrown from e…
Browse files Browse the repository at this point in the history
…xecuting on main thread in YarnResourceManagerDriver

This closes apache#13571.
  • Loading branch information
Jiayi-Liao authored and tillrohrmann committed Oct 12, 2020
1 parent bbc1c4d commit 56229c7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class YarnContainerEventHandler implements AMRMClientAsync.CallbackHandler, NMCl

@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
getMainThreadExecutor().execute(() -> {
runAsyncWithFatalHandler(() -> {
log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses);
for (final ContainerStatus containerStatus : statuses) {

Expand All @@ -539,7 +539,7 @@ public void onContainersCompleted(List<ContainerStatus> statuses) {

@Override
public void onContainersAllocated(List<Container> containers) {
getMainThreadExecutor().execute(() -> {
runAsyncWithFatalHandler(() -> {
log.info("Received {} containers.", containers.size());

for (Map.Entry<Resource, List<Container>> entry : groupContainerByResource(containers).entrySet()) {
Expand All @@ -554,6 +554,16 @@ public void onContainersAllocated(List<Container> containers) {
});
}

private void runAsyncWithFatalHandler(Runnable runnable) {
getMainThreadExecutor().execute(() -> {
try {
runnable.run();
} catch (Throwable t) {
onError(t);
}
});
}

@Override
public void onShutdownRequest() {
getResourceEventHandler().onError(new ResourceManagerException(ERROR_MESSAGE_ON_SHUTDOWN_REQUEST));
Expand Down Expand Up @@ -592,7 +602,7 @@ public void onContainerStopped(ContainerId containerId) {

@Override
public void onStartContainerError(ContainerId containerId, Throwable throwable) {
getMainThreadExecutor().execute(() -> {
runAsyncWithFatalHandler(() -> {
resourceManagerClient.releaseAssignedContainer(containerId);
getResourceEventHandler().onWorkerTerminated(new ResourceID(containerId.toString()), throwable.getMessage());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -111,6 +112,32 @@ protected Context createContext() {
return new Context();
}

@Test
public void testRunAsyncCausesFatalError() throws Exception {
new Context() {{
final String exceptionMessage = "runAsyncCausesFatalError";
addContainerRequestFutures.add(CompletableFuture.completedFuture(null));

testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored -> {
throw new RuntimeException(exceptionMessage);
});

final CompletableFuture<Throwable> throwableCompletableFuture = new CompletableFuture<>();
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);

runTest(() -> {
runInMainThread(() -> getDriver().requestResource(testingTaskExecutorProcessSpec));
resourceManagerClientCallbackHandler.onContainersAllocated(ImmutableList.of(testingContainer));

Throwable t = throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
final Optional<RuntimeException> optionalCause = ExceptionUtils.findThrowable(t, RuntimeException.class);

assertTrue(optionalCause.isPresent());
assertThat(optionalCause.get().getMessage(), is(exceptionMessage));
});
}};
}

@Test
public void testShutdownRequestCausesFatalError() throws Exception {
new Context() {{
Expand Down

0 comments on commit 56229c7

Please sign in to comment.