Skip to content

Commit

Permalink
In persistAndMerge, increase the scope of try-catch block so that any…
Browse files Browse the repository at this point in the history
… exception while persisting hydrants is caught and consequently that sink is abandoned or the task will forever wait for handoff to happen.
  • Loading branch information
himanshug committed Feb 24, 2016
1 parent 6c9e1a2 commit a3b37e9
Showing 1 changed file with 35 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
Expand All @@ -59,7 +59,6 @@
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
Expand All @@ -69,7 +68,6 @@
import io.druid.segment.Metadata;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
Expand All @@ -90,7 +88,6 @@
import org.joda.time.Period;

import javax.annotation.Nullable;
import javax.ws.rs.HEAD;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
Expand Down Expand Up @@ -481,50 +478,52 @@ private void persistAndMerge(final long truncatedTime, final Sink sink)
mergeExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
final Interval interval = sink.getInterval();
Stopwatch mergeStopwatch = null;

@Override
public void doRun()
{
final Interval interval = sink.getInterval();

// Bail out if this sink has been abandoned by a previously-executed task.
if (sinks.get(truncatedTime) != sink) {
log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink);
return;
}
try {
// Bail out if this sink has been abandoned by a previously-executed task.
if (sinks.get(truncatedTime) != sink) {
log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink);
return;
}

// Use a file to indicate that pushing has completed.
final File persistDir = computePersistDir(schema, interval);
final File mergedTarget = new File(persistDir, "merged");
final File isPushedMarker = new File(persistDir, "isPushedMarker");
// Use a file to indicate that pushing has completed.
final File persistDir = computePersistDir(schema, interval);
final File mergedTarget = new File(persistDir, "merged");
final File isPushedMarker = new File(persistDir, "isPushedMarker");

if (!isPushedMarker.exists()) {
removeSegment(sink, mergedTarget);
if (mergedTarget.exists()) {
log.wtf("Merged target[%s] exists?!", mergedTarget);
if (!isPushedMarker.exists()) {
removeSegment(sink, mergedTarget);
if (mergedTarget.exists()) {
log.wtf("Merged target[%s] exists?!", mergedTarget);
return;
}
} else {
log.info("Already pushed sink[%s]", sink);
return;
}
} else {
log.info("Already pushed sink[%s]", sink);
return;
}

/*
Note: it the plumber crashes after persisting a subset of hydrants then might duplicate data as these
hydrants will be read but older commitMetadata will be used. fixing this possibly needs structural
changes to plumber.
*/
for (FireHydrant hydrant : sink) {
synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval, null);
metrics.incrementRowOutputCount(rowCount);
for (FireHydrant hydrant : sink) {
synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval, null);
metrics.incrementRowOutputCount(rowCount);
}
}
}
}
final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
final Stopwatch mergeStopwatch = Stopwatch.createStarted();
try {
final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
mergeStopwatch = Stopwatch.createStarted();

List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
Expand Down Expand Up @@ -575,7 +574,9 @@ public void doRun()
}
}
finally {
mergeStopwatch.stop();
if (mergeStopwatch != null) {
mergeStopwatch.stop();
}
}
}
}
Expand Down

0 comments on commit a3b37e9

Please sign in to comment.