From 01b7b84d754f58757731002139bfed32030b0f5a Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Fri, 18 Nov 2016 11:01:23 -0800 Subject: [PATCH] Fix excess memory usage in HttpRemoteTask If an HttpRemoteTask ran for a long time without transitioning to a state where more splits could be queued, and many callers added dependent futures to the CompletableFuture returned from whenSplitQueueHasSpace(), there could be a large amount of memory used by those dependents because they aren't cleaned up. Due to JDK-8161600 there is no way to fix this with CompletableFutures, so the implementation has been changed to use a FutureStateChange. --- .../presto/execution/FutureStateChange.java | 17 ++++++- .../server/remotetask/HttpRemoteTask.java | 47 +++++-------------- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/FutureStateChange.java b/presto-main/src/main/java/com/facebook/presto/execution/FutureStateChange.java index 7ceb64e07b1f1..46b1f20be39d2 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/FutureStateChange.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/FutureStateChange.java @@ -21,6 +21,10 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.Objects.requireNonNull; @ThreadSafe public class FutureStateChange @@ -46,6 +50,17 @@ public synchronized CompletableFuture createNewListener() public void complete(T newState) { + fireStateChange(newState, directExecutor()); + } + + public void complete(T newState, Executor executor) + { + fireStateChange(newState, executor); + } + + private void fireStateChange(T newState, Executor executor) + { + requireNonNull(executor, "executor is null"); Set> futures; synchronized (this) { futures = ImmutableSet.copyOf(listeners); @@ -53,7 +68,7 @@ public void complete(T newState) } for (CompletableFuture future : futures) { - future.complete(newState); + executor.execute(() -> future.complete(newState)); } } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java index 45a84a64e750d..c5fc6448129d8 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java @@ -17,6 +17,7 @@ import com.facebook.presto.ScheduledSplit; import com.facebook.presto.Session; import com.facebook.presto.TaskSource; +import com.facebook.presto.execution.FutureStateChange; import com.facebook.presto.execution.NodeTaskMap.PartitionedSplitCountTracker; import com.facebook.presto.execution.RemoteTask; import com.facebook.presto.execution.StateMachine.StateChangeListener; @@ -83,7 +84,6 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static io.airlift.concurrent.MoreFutures.unmodifiableFuture; import static io.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; import static io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator; @@ -126,10 +126,9 @@ public final class HttpRemoteTask private final Set noMoreSplits = new HashSet<>(); @GuardedBy("this") private final AtomicReference outputBuffers = new AtomicReference<>(); + private final FutureStateChange whenSplitQueueHasSpace = new FutureStateChange<>(); @GuardedBy("this") - private CompletableFuture whenSplitQueueHasSpace = completedFuture(null); - @GuardedBy("this") - private CompletableFuture unmodifiableWhenSplitQueueHasSpace = completedFuture(null); + private boolean splitQueueHasSpace = true; @GuardedBy("this") private OptionalInt whenSplitQueueHasSpaceThreshold = OptionalInt.empty(); @@ -394,7 +393,10 @@ public synchronized CompletableFuture whenSplitQueueHasSpace(int threshold) whenSplitQueueHasSpaceThreshold = OptionalInt.of(threshold); updateSplitQueueSpace(); } - return unmodifiableWhenSplitQueueHasSpace; + if (splitQueueHasSpace) { + return completedFuture(null); + } + return whenSplitQueueHasSpace.createNewListener(); } private synchronized void updateSplitQueueSpace() @@ -402,34 +404,12 @@ private synchronized void updateSplitQueueSpace() if (!whenSplitQueueHasSpaceThreshold.isPresent()) { return; } - if (getQueuedPartitionedSplitCount() < whenSplitQueueHasSpaceThreshold.getAsInt()) { - if (!whenSplitQueueHasSpace.isDone()) { - fireSplitQueueHasSpace(whenSplitQueueHasSpace); - whenSplitQueueHasSpace = completedFuture(null); - unmodifiableWhenSplitQueueHasSpace = unmodifiableFuture(whenSplitQueueHasSpace); - } - } - else { - if (whenSplitQueueHasSpace.isDone()) { - whenSplitQueueHasSpace = new CompletableFuture<>(); - unmodifiableWhenSplitQueueHasSpace = unmodifiableFuture(whenSplitQueueHasSpace); - } + splitQueueHasSpace = getQueuedPartitionedSplitCount() < whenSplitQueueHasSpaceThreshold.getAsInt(); + if (splitQueueHasSpace) { + whenSplitQueueHasSpace.complete(null, executor); } } - private void fireSplitQueueHasSpace(CompletableFuture future) - { - executor.execute(() -> { - checkState(!Thread.holdsLock(this), "Can not notify split queue future while holding the lock"); - try { - future.complete(null); - } - catch (Throwable e) { - log.error(e, "Error notifying split queue future for %s", taskId); - } - }); - } - private synchronized void processTaskUpdate(TaskInfo newValue, List sources) { updateTaskInfo(newValue); @@ -569,11 +549,8 @@ private synchronized void cleanUpTask() pendingSplits.clear(); pendingSourceSplitCount = 0; partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount()); - if (!whenSplitQueueHasSpace.isDone()) { - fireSplitQueueHasSpace(whenSplitQueueHasSpace); - whenSplitQueueHasSpace = completedFuture(null); - unmodifiableWhenSplitQueueHasSpace = unmodifiableFuture(whenSplitQueueHasSpace); - } + splitQueueHasSpace = true; + whenSplitQueueHasSpace.complete(null, executor); // cancel pending request if (currentRequest != null) {