Skip to content

Commit

Permalink
[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorServ…
Browse files Browse the repository at this point in the history
…ice has been shut down

This PR suppresses occurring RejectedExecutionExceptions if an ExecutorService has been shut
down. This only works for ExecutorServices at the moment. All other exceptions are logged.

This closes apache#2757
  • Loading branch information
tillrohrmann authored and StephanEwen committed Nov 8, 2016
1 parent c21ecae commit d390824
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
import akka.japi.Procedure;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
Expand All @@ -43,6 +46,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -53,6 +57,8 @@
*/
public class FlinkFuture<T> implements Future<T> {

private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class);

protected scala.concurrent.Future<T> scalaFuture;

FlinkFuture() {
Expand Down Expand Up @@ -335,8 +341,25 @@ public static <T> Future<T> supplyAsync(Callable<T> callable, Executor executor)
// Helper functions and types
//-----------------------------------------------------------------------------------

private static ExecutionContext createExecutionContext(Executor executor) {
return ExecutionContexts$.MODULE$.fromExecutor(executor);
private static ExecutionContext createExecutionContext(final Executor executor) {
return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() {
@Override
public void apply(Throwable throwable) throws Exception {
if (executor instanceof ExecutorService) {
ExecutorService executorService = (ExecutorService) executor;
// only log the exception if the executor service is still running
if (!executorService.isShutdown()) {
logThrowable(throwable);
}
} else {
logThrowable(throwable);
}
}

private void logThrowable(Throwable throwable) {
LOG.warn("Uncaught exception in execution context.", throwable);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
import java.net._
import java.util.UUID
import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, TimeoutException}

import akka.actor.Status.{Failure, Success}
import akka.actor._
Expand Down Expand Up @@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.Disconnect
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
Expand All @@ -87,7 +87,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps

/**
Expand Down

0 comments on commit d390824

Please sign in to comment.