Skip to content

Commit

Permalink
Merge pull request apache#2598 from himanshug/handoff_timeout
Browse files Browse the repository at this point in the history
optional ability to configure handoff wait timeout on realtime tasks
  • Loading branch information
fjy committed Mar 8, 2016
2 parents de869f6 + 0402636 commit e7018f5
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/content/ingestion/stream-pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|false|
|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0 and 0 means wait forerver.|0|

Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
Expand Down Expand Up @@ -144,7 +143,8 @@ static RealtimeTuningConfig convertTuningConfig(
buildV9Directly,
0,
0,
true
true,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,48 @@ public void testDefaultResource() throws Exception
Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup());
}


@Test(timeout = 60_000L, expected = ExecutionException.class)
public void testHandoffTimeout() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);

// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}

final TestFirehose firehose = (TestFirehose) task.getFirehose();

firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
)
)
);

// Stop the firehose, this will drain out existing events.
firehose.close();

// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}

Assert.assertEquals(1, task.getMetrics().processed());
Assert.assertNotNull(Iterables.getOnlyElement(mdc.getPublished()));


// handoff would timeout, resulting in exception
statusFuture.get();
}

@Test(timeout = 60_000L)
public void testBasics() throws Exception
{
Expand Down Expand Up @@ -818,10 +860,15 @@ public TaskStatus call() throws Exception

private RealtimeIndexTask makeRealtimeTask(final String taskId)
{
return makeRealtimeTask(taskId, true);
return makeRealtimeTask(taskId, true, 0);
}

private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions)
{
return makeRealtimeTask(taskId, reportParseExceptions, 0);
}

private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout)
{
ObjectMapper objectMapper = new DefaultObjectMapper();
DataSchema dataSchema = new DataSchema(
Expand Down Expand Up @@ -849,7 +896,8 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa
buildV9Directly,
0,
0,
reportParseExceptions
reportParseExceptions,
handoffTimeout
);
return new RealtimeIndexTask(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ public Plumber findPlumber(
null,
0,
0,
true
true,
null
)
),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,7 @@ private RealtimeIndexTask newRealtimeIndexTask()
null,
0,
0,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import io.druid.segment.IndexSpec;
import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
Expand All @@ -47,6 +48,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final Boolean defaultBuildV9Directly = Boolean.FALSE;
private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
private static final long defaultHandoffConditionTimeout = 0;

private static File createNewBasePersistDirectory()
{
Expand All @@ -69,7 +71,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis
defaultBuildV9Directly,
0,
0,
defaultReportParseExceptions
defaultReportParseExceptions,
defaultHandoffConditionTimeout
);
}

Expand All @@ -86,6 +89,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis
private final int persistThreadPriority;
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;

@JsonCreator
public RealtimeTuningConfig(
Expand All @@ -101,7 +105,8 @@ public RealtimeTuningConfig(
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
Expand All @@ -123,6 +128,11 @@ public RealtimeTuningConfig(
this.reportParseExceptions = reportParseExceptions == null
? defaultReportParseExceptions
: reportParseExceptions;

this.handoffConditionTimeout = handoffConditionTimeout == null
? defaultHandoffConditionTimeout
: handoffConditionTimeout;
Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0");
}

@JsonProperty
Expand Down Expand Up @@ -203,6 +213,12 @@ public boolean isReportParseExceptions()
return reportParseExceptions;
}

@JsonProperty
public long getHandoffConditionTimeout()
{
return handoffConditionTimeout;
}

public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
Expand All @@ -218,7 +234,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
buildV9Directly,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions
reportParseExceptions,
handoffConditionTimeout
);
}

Expand All @@ -237,7 +254,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir)
buildV9Directly,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions
reportParseExceptions,
handoffConditionTimeout
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ public void finishJob()
persistAndMerge(entry.getKey(), entry.getValue());
}

final long forceEndWaitTime = System.currentTimeMillis() + config.getHandoffConditionTimeout();
while (!sinks.isEmpty()) {
try {
log.info(
Expand All @@ -629,7 +630,19 @@ public String apply(Sink input)

synchronized (handoffCondition) {
while (!sinks.isEmpty()) {
handoffCondition.wait();
if (config.getHandoffConditionTimeout() == 0) {
handoffCondition.wait();
} else {
long curr = System.currentTimeMillis();
if (forceEndWaitTime - curr > 0) {
handoffCondition.wait(forceEndWaitTime - curr);
} else {
throw new ISE(
"Segment handoff wait timeout. [%s] segments might not have completed handoff.",
sinks.size()
);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package io.druid.segment.indexing;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.segment.TestHelper;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -42,4 +47,76 @@ public void testSpecificBasePersistDirectory()
);
Assert.assertEquals(new File("/tmp/nonexistent"), tuningConfig.getBasePersistDirectory());
}

@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\"type\":\"realtime\"}";

ObjectMapper mapper = TestHelper.getObjectMapper();
RealtimeTuningConfig config = (RealtimeTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);

Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(new NoneShardSpec(), config.getShardSpec());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(0, config.getMergeThreadPriority());
Assert.assertEquals(0, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());
Assert.assertEquals(false, config.isReportParseExceptions());
}

@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"realtime\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"windowPeriod\": \"PT1H\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": true,\n"
+ " \"persistThreadPriority\": 100,\n"
+ " \"mergeThreadPriority\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ "}";

ObjectMapper mapper = TestHelper.getObjectMapper();
RealtimeTuningConfig config = (RealtimeTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);

Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(new NoneShardSpec(), config.getShardSpec());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMergeThreadPriority());
Assert.assertEquals(100, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod());
Assert.assertEquals(true, config.isReportParseExceptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException
null,
0,
0,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
Expand Down Expand Up @@ -240,6 +241,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException
null,
0,
0,
null,
null
);

Expand All @@ -256,6 +258,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException
null,
0,
0,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void setUp() throws Exception
buildV9Directly,
0,
0,
false
false,
null
);

realtimePlumberSchool = new RealtimePlumberSchool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testSwap() throws Exception
null,
0,
0,
null,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);
Expand Down

0 comments on commit e7018f5

Please sign in to comment.