Skip to content

Commit

Permalink
Fix excess memory usage in HttpRemoteTask
Browse files Browse the repository at this point in the history
If an HttpRemoteTask ran for a long time without transitioning to a
state where more splits could be queued, and many callers added
dependent futures to the CompletableFuture returned from
whenSplitQueueHasSpace(), there could be a large amount of memory used by
those dependents because they aren't cleaned up.
Due to JDK-8161600 there is no way to fix this with CompletableFutures, so
the implementation has been changed to use a FutureStateChange.
  • Loading branch information
cberner committed Nov 18, 2016
1 parent dc419db commit 01b7b84
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class FutureStateChange<T>
Expand All @@ -46,14 +50,25 @@ public synchronized CompletableFuture<T> createNewListener()

public void complete(T newState)
{
fireStateChange(newState, directExecutor());
}

public void complete(T newState, Executor executor)
{
fireStateChange(newState, executor);
}

private void fireStateChange(T newState, Executor executor)
{
requireNonNull(executor, "executor is null");
Set<CompletableFuture<T>> futures;
synchronized (this) {
futures = ImmutableSet.copyOf(listeners);
listeners.clear();
}

for (CompletableFuture<T> future : futures) {
future.complete(newState);
executor.execute(() -> future.complete(newState));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.ScheduledSplit;
import com.facebook.presto.Session;
import com.facebook.presto.TaskSource;
import com.facebook.presto.execution.FutureStateChange;
import com.facebook.presto.execution.NodeTaskMap.PartitionedSplitCountTracker;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -83,7 +84,6 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.concurrent.MoreFutures.unmodifiableFuture;
import static io.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
Expand Down Expand Up @@ -126,10 +126,9 @@ public final class HttpRemoteTask
private final Set<PlanNodeId> noMoreSplits = new HashSet<>();
@GuardedBy("this")
private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>();
private final FutureStateChange<?> whenSplitQueueHasSpace = new FutureStateChange<>();
@GuardedBy("this")
private CompletableFuture<?> whenSplitQueueHasSpace = completedFuture(null);
@GuardedBy("this")
private CompletableFuture<?> unmodifiableWhenSplitQueueHasSpace = completedFuture(null);
private boolean splitQueueHasSpace = true;
@GuardedBy("this")
private OptionalInt whenSplitQueueHasSpaceThreshold = OptionalInt.empty();

Expand Down Expand Up @@ -394,42 +393,23 @@ public synchronized CompletableFuture<?> whenSplitQueueHasSpace(int threshold)
whenSplitQueueHasSpaceThreshold = OptionalInt.of(threshold);
updateSplitQueueSpace();
}
return unmodifiableWhenSplitQueueHasSpace;
if (splitQueueHasSpace) {
return completedFuture(null);
}
return whenSplitQueueHasSpace.createNewListener();
}

private synchronized void updateSplitQueueSpace()
{
if (!whenSplitQueueHasSpaceThreshold.isPresent()) {
return;
}
if (getQueuedPartitionedSplitCount() < whenSplitQueueHasSpaceThreshold.getAsInt()) {
if (!whenSplitQueueHasSpace.isDone()) {
fireSplitQueueHasSpace(whenSplitQueueHasSpace);
whenSplitQueueHasSpace = completedFuture(null);
unmodifiableWhenSplitQueueHasSpace = unmodifiableFuture(whenSplitQueueHasSpace);
}
}
else {
if (whenSplitQueueHasSpace.isDone()) {
whenSplitQueueHasSpace = new CompletableFuture<>();
unmodifiableWhenSplitQueueHasSpace = unmodifiableFuture(whenSplitQueueHasSpace);
}
splitQueueHasSpace = getQueuedPartitionedSplitCount() < whenSplitQueueHasSpaceThreshold.getAsInt();
if (splitQueueHasSpace) {
whenSplitQueueHasSpace.complete(null, executor);
}
}

private void fireSplitQueueHasSpace(CompletableFuture<?> future)
{
executor.execute(() -> {
checkState(!Thread.holdsLock(this), "Can not notify split queue future while holding the lock");
try {
future.complete(null);
}
catch (Throwable e) {
log.error(e, "Error notifying split queue future for %s", taskId);
}
});
}

private synchronized void processTaskUpdate(TaskInfo newValue, List<TaskSource> sources)
{
updateTaskInfo(newValue);
Expand Down Expand Up @@ -569,11 +549,8 @@ private synchronized void cleanUpTask()
pendingSplits.clear();
pendingSourceSplitCount = 0;
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
if (!whenSplitQueueHasSpace.isDone()) {
fireSplitQueueHasSpace(whenSplitQueueHasSpace);
whenSplitQueueHasSpace = completedFuture(null);
unmodifiableWhenSplitQueueHasSpace = unmodifiableFuture(whenSplitQueueHasSpace);
}
splitQueueHasSpace = true;
whenSplitQueueHasSpace.complete(null, executor);

// cancel pending request
if (currentRequest != null) {
Expand Down

0 comments on commit 01b7b84

Please sign in to comment.