Skip to content

Commit

Permalink
[FLINK-11417] Make access to ExecutionGraph single threaded from JobM…
Browse files Browse the repository at this point in the history
…aster main thread

This closes apache#7568.
  • Loading branch information
StefanRRichter committed Feb 5, 2019
1 parent 2296299 commit 85bae44
Show file tree
Hide file tree
Showing 55 changed files with 1,893 additions and 1,403 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Utility class for Flink's functions.
Expand Down Expand Up @@ -95,4 +96,23 @@ public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingC
}
};
}

/**
* Converts a {@link SupplierWithException} into a {@link Supplier} which throws all checked exceptions
* as unchecked.
*
* @param supplierWithException to convert into a {@link Supplier}
* @return {@link Supplier} which throws all checked exceptions as unchecked.
*/
public static <T> Supplier<T> uncheckedSupplier(SupplierWithException<T, ?> supplierWithException) {
return () -> {
T result = null;
try {
result = supplierWithException.get();
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
return result;
};
}
}
1 change: 1 addition & 0 deletions flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ function run_ha_test() {
# change the pid dir to start log files always from 0, this is important for checks in the
# jm killing loop
set_conf "env.pid.dir" "${TEST_DATA_DIR}"
set_conf "env.java.opts" "-ea"
start_local_zk
start_cluster

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.concurrent;

import javax.annotation.Nonnull;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Interface for an executor that runs tasks in the main thread of an {@link org.apache.flink.runtime.rpc.RpcEndpoint}.
*/
public interface ComponentMainThreadExecutor extends ScheduledExecutor {

/**
* Returns true if the method was called in the thread of this executor.
*/
void assertRunningInMainThread();

/**
* Dummy implementation of ComponentMainThreadExecutor.
*/
final class DummyComponentMainThreadExecutor implements ComponentMainThreadExecutor {

/** Customized message for the exception that is thrown on method invocation. */
private final String exceptionMessageOnInvocation;

public DummyComponentMainThreadExecutor(String exceptionMessageOnInvocation) {
this.exceptionMessageOnInvocation = exceptionMessageOnInvocation;
}

@Override
public void assertRunningInMainThread() {
throw createException();
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw createException();
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw createException();
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
throw createException();
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw createException();
}

@Override
public void execute(@Nonnull Runnable command) {
throw createException();
}

private UnsupportedOperationException createException() {
return new UnsupportedOperationException(exceptionMessageOnInvocation);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.concurrent;

import javax.annotation.Nonnull;

import java.util.concurrent.ScheduledExecutorService;

/**
* Adapter class for a {@link ScheduledExecutorService} which shall be used as a
* {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
* main thread of the executor.
*/
public class ComponentMainThreadExecutorServiceAdapter
extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor {

/** A runnable that should assert that the current thread is the expected main thread. */
@Nonnull
private final Runnable mainThreadCheck;

public ComponentMainThreadExecutorServiceAdapter(
@Nonnull ScheduledExecutorService scheduledExecutorService,
@Nonnull Runnable mainThreadCheck) {
super(scheduledExecutorService);
this.mainThreadCheck = mainThreadCheck;
}

@Override
public void assertRunningInMainThread() {
mainThreadCheck.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand All @@ -61,7 +64,6 @@ public class FutureUtils {
// retrying operations
// ------------------------------------------------------------------------


/**
* Retry the given operation the given number of times in case of a failure.
*
Expand Down Expand Up @@ -802,6 +804,110 @@ public void onComplete(Throwable failure, U success) {
return result;
}

/**
* This function takes a {@link CompletableFuture} and a function to apply to this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenApply(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenApplyAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to apply.
* @param executor the executor to run the apply function if the future is not yet done.
* @param applyFun the function to apply.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is applying the given function to the input future.
*/
public static <IN, OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends OUT> applyFun) {
return completableFuture.isDone() ?
completableFuture.thenApply(applyFun) :
completableFuture.thenApplyAsync(applyFun, executor);
}

/**
* This function takes a {@link CompletableFuture} and a function to compose with this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenCompose(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenComposeAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to compose.
* @param executor the executor to run the compose function if the future is not yet done.
* @param composeFun the function to compose.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is a composition of the input future and the function.
*/
public static <IN, OUT> CompletableFuture<OUT> thenComposeAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends CompletionStage<OUT>> composeFun) {
return completableFuture.isDone() ?
completableFuture.thenCompose(composeFun) :
completableFuture.thenComposeAsync(composeFun, executor);
}

/**
* This function takes a {@link CompletableFuture} and a bi-consumer to call on completion of this future. If the
* input future is already done, this function returns {@link CompletableFuture#whenComplete(BiConsumer)}.
* Otherwise, the return value is {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)} with the given
* executor.
*
* @param completableFuture the completable future for which we want to call #whenComplete.
* @param executor the executor to run the whenComplete function if the future is not yet done.
* @param whenCompleteFun the bi-consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<IN> whenCompleteAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiConsumer<? super IN, ? super Throwable> whenCompleteFun) {
return completableFuture.isDone() ?
completableFuture.whenComplete(whenCompleteFun) :
completableFuture.whenCompleteAsync(whenCompleteFun, executor);
}

/**
* This function takes a {@link CompletableFuture} and a consumer to accept the result of this future. If the input
* future is already done, this function returns {@link CompletableFuture#thenAccept(Consumer)}. Otherwise, the
* return value is {@link CompletableFuture#thenAcceptAsync(Consumer, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #thenAccept.
* @param executor the executor to run the thenAccept function if the future is not yet done.
* @param consumer the consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Consumer<? super IN> consumer) {
return completableFuture.isDone() ?
completableFuture.thenAccept(consumer) :
completableFuture.thenAcceptAsync(consumer, executor);
}

/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}

/**
* Runnable to complete the given future with a {@link TimeoutException}.
*/
Expand Down
Loading

0 comments on commit 85bae44

Please sign in to comment.