Skip to content

Commit

Permalink
Merge pull request apache#1444 from druid-io/logging-improvement
Browse files Browse the repository at this point in the history
Separate bootstrap threads from loading threads on historical startup
  • Loading branch information
xvrl committed Jun 17, 2015
2 parents f7a7dae + 06c97b6 commit d276d2c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class SegmentLoaderConfig
@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 1;

@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;

@JsonProperty
private File infoDir = null;

Expand Down Expand Up @@ -72,6 +75,10 @@ public int getNumLoadingThreads()
return numLoadingThreads;
}

public int getNumBootstrapThreads() {
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
}

public File getInfoDir()
{
if (infoDir == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler

private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
private final ListeningExecutorService loadingExec;

public BaseZkCoordinator(
ObjectMapper jsonMapper,
Expand All @@ -68,12 +67,6 @@ public BaseZkCoordinator(
this.config = config;
this.me = me;
this.curator = curator;
this.loadingExec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
config.getNumLoadingThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
)
);
}

@LifecycleStart
Expand All @@ -95,7 +88,10 @@ public void start() throws IOException
loadQueueLocation,
true,
true,
loadingExec
Executors.newFixedThreadPool(
config.getNumLoadingThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
)
);

try {
Expand Down Expand Up @@ -217,9 +213,4 @@ public boolean isStarted()
public abstract void loadLocalCache();

public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();

public ListeningExecutorService getLoadingExecutor()
{
return loadingExec;
}
}
102 changes: 56 additions & 46 deletions server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.initialization.ZkPathsConfig;
Expand All @@ -34,13 +35,18 @@

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
Expand Down Expand Up @@ -86,8 +92,10 @@ public void loadLocalCache()
}

List<DataSegment> cachedSegments = Lists.newArrayList();
for (File file : baseDir.listFiles()) {
log.info("Loading segment cache file [%s]", file);
File[] segmentsToLoad = baseDir.listFiles();
for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file);
try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) {
Expand Down Expand Up @@ -179,69 +187,71 @@ public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
}
}

public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
ExecutorService loadingExecutor = null;
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {

backgroundSegmentAnnouncer.startAnnouncing();

final List<ListenableFuture> segmentLoading = Lists.newArrayList();
loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s");

final int numSegments = segments.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
segmentLoading.add(
getLoadingExecutor().submit(
new Callable<Void>()
{
@Override
public Void call() throws SegmentLoadingException
{
loadingExecutor.submit(
new Runnable() {
@Override
public void run() {
try {
log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier());
final boolean loaded = loadSegment(segment, callback);
if (loaded) {
try {
log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded = loadSegment(segment, callback);
if (loaded) {
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
return null;
} catch(SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
throw e;
backgroundSegmentAnnouncer.announceSegment(segment);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
} catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
failedSegments.add(segment);
} finally {
latch.countDown();
}
)
}
}
);
}

int failed = 0;
for(ListenableFuture future : segmentLoading) {
try {
future.get();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
} catch(ExecutionException e) {
failed++;
try{
latch.await();

if(failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
.addData("failedSegments", failedSegments);
}
}
if(failed > 0) {
throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted");
}

backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments")
.addData("segments", segments)
.emit();
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size())
.emit();
}
finally {
callback.execute();
if (loadingExecutor != null) {
loadingExecutor.shutdownNow();
}
}
}

Expand Down

0 comments on commit d276d2c

Please sign in to comment.