Skip to content

Commit

Permalink
Fix PathChildrenCache's ExecutorService leak (apache#3726)
Browse files Browse the repository at this point in the history
* Fix PathChildrenCache's executorService leak in Announcer, CuratorInventoryManager and RemoteTaskRunner

* Use a single ExecutorService for all workerStatusPathChildrenCaches in RemoteTaskRunner
  • Loading branch information
leventov authored and gianm committed Dec 7, 2016
1 parent dc8f814 commit 70e83be
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -134,7 +135,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final Duration shutdownTimeout;
private final IndexerZkConfig indexerZkConfig;
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCacheFactory workerStatusPathChildrenCacheFactory;
private final ExecutorService workerStatusPathChildrenCacheExecutor;
private final PathChildrenCache workerPathCache;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
Expand Down Expand Up @@ -181,7 +183,7 @@ public RemoteTaskRunner(
RemoteTaskRunnerConfig config,
IndexerZkConfig indexerZkConfig,
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
Expand All @@ -193,8 +195,12 @@ public RemoteTaskRunner(
this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast
this.indexerZkConfig = indexerZkConfig;
this.cf = cf;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.workerPathCache = pathChildrenCacheFactory.build().make(cf, indexerZkConfig.getAnnouncementsPath());
this.workerStatusPathChildrenCacheExecutor = PathChildrenCacheFactory.Builder.createDefaultExecutor();
this.workerStatusPathChildrenCacheFactory = pathChildrenCacheFactory
.withExecutorService(workerStatusPathChildrenCacheExecutor)
.withShutdownExecutorOnClose(false)
.build();
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
Expand Down Expand Up @@ -337,10 +343,17 @@ public void stop()

resourceManagement.stopManagement();

Closer closer = Closer.create();
for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close();
closer.register(zkWorker);
}
closer.register(workerPathCache);
try {
closer.close();
}
finally {
workerStatusPathChildrenCacheExecutor.shutdown();
}
workerPathCache.close();

if (runPendingTasksExec != null) {
runPendingTasksExec.shutdown();
Expand Down Expand Up @@ -889,7 +902,7 @@ private ListenableFuture<ZkWorker> addWorker(final Worker worker)
cancelWorkerCleanup(worker.getHost());

final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final PathChildrenCache statusCache = workerStatusPathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
final ZkWorker zkWorker = new ZkWorker(
worker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
Expand Down Expand Up @@ -81,10 +81,7 @@ public RemoteTaskRunner build()
remoteTaskRunnerConfig,
zkPaths,
curator,
new SimplePathChildrenCacheFactory
.Builder()
.withCompressed(true)
.build(),
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
Expand Down Expand Up @@ -120,7 +120,7 @@ public String getBase()
}, null, null, null, null, null
),
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
new PathChildrenCacheFactory.Builder(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
Expand Down

This file was deleted.

25 changes: 18 additions & 7 deletions server/src/main/java/io/druid/curator/announcement/Announcer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;

import io.druid.curator.ShutdownNowIgnoringExecutorService;
import com.google.common.io.Closer;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
Expand Down Expand Up @@ -64,6 +61,7 @@ public class Announcer

private final CuratorFramework curator;
private final PathChildrenCacheFactory factory;
private final ExecutorService pathChildrenCacheExecutor;

private final List<Announceable> toAnnounce = Lists.newArrayList();
private final List<Announceable> toUpdate = Lists.newArrayList();
Expand All @@ -79,7 +77,13 @@ public Announcer(
)
{
this.curator = curator;
this.factory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec));
this.pathChildrenCacheExecutor = exec;
this.factory = new PathChildrenCacheFactory.Builder()
.withCacheData(false)
.withCompressed(true)
.withExecutorService(exec)
.withShutdownExecutorOnClose(false)
.build();
}

@LifecycleStart
Expand Down Expand Up @@ -114,8 +118,15 @@ public void stop()

started = false;

for (Map.Entry<String, PathChildrenCache> entry : listeners.entrySet()) {
CloseQuietly.close(entry.getValue());
Closer closer = Closer.create();
for (PathChildrenCache cache : listeners.values()) {
closer.register(cache);
}
try {
CloseQuietly.close(closer);
}
finally {
pathChildrenCacheExecutor.shutdown();
}

for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry : announcements.entrySet()) {
Expand Down
Loading

0 comments on commit 70e83be

Please sign in to comment.