Skip to content

Commit

Permalink
report hand off count finite appenderator driver (apache#3925)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjain1 authored and gianm committed Feb 13, 2017
1 parent b7a8870 commit 8e31a46
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception

try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
appenderator = appenderator0;
Expand Down Expand Up @@ -841,7 +841,8 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox

private FiniteAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox
final TaskToolbox toolbox,
final FireDepartmentMetrics metrics
)
{
return new FiniteAppenderatorDriver(
Expand All @@ -851,7 +852,8 @@ private FiniteAppenderatorDriver newDriver(
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getHandoffConditionTimeout()
tuningConfig.getHandoffConditionTimeout(),
metrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin

try (
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema);
final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator, fireDepartmentMetrics);
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser())
) {
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
Expand Down Expand Up @@ -504,7 +504,8 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox
private FiniteAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
final SegmentAllocator segmentAllocator
final SegmentAllocator segmentAllocator,
final FireDepartmentMetrics metrics
)
{
return new FiniteAppenderatorDriver(
Expand All @@ -514,7 +515,8 @@ private FiniteAppenderatorDriver newDriver(
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
Integer.MAX_VALUE, // rows for a partition is already determined by the shardSpec
0
0,
metrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class FiniteAppenderatorDriver implements Closeable
private final ObjectMapper objectMapper;
private final int maxRowsPerSegment;
private final long handoffConditionTimeout;
private final FireDepartmentMetrics metrics;

// All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments".

Expand All @@ -100,6 +102,7 @@ public class FiniteAppenderatorDriver implements Closeable
* @param maxRowsPerSegment maximum number of rows allowed in an entire segment (not a single persist)
* @param handoffConditionTimeout maximum number of millis allowed for handoff (not counting push/publish), zero
* means wait forever.
* @param metrics Firedepartment metrics
*/
public FiniteAppenderatorDriver(
Appenderator appenderator,
Expand All @@ -108,7 +111,8 @@ public FiniteAppenderatorDriver(
UsedSegmentChecker usedSegmentChecker,
ObjectMapper objectMapper,
int maxRowsPerSegment,
long handoffConditionTimeout
long handoffConditionTimeout,
FireDepartmentMetrics metrics
)
{
this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
Expand All @@ -119,6 +123,7 @@ public FiniteAppenderatorDriver(
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.maxRowsPerSegment = maxRowsPerSegment;
this.handoffConditionTimeout = handoffConditionTimeout;
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
}

/**
Expand Down Expand Up @@ -469,6 +474,7 @@ public void run()
{
final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment);
log.info("Segment[%s] successfully handed off, dropping.", identifier);
metrics.incrementHandOffCount();
final ListenableFuture<?> dropFuture = appenderator.drop(identifier);
Futures.addCallback(
dropFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Granularity;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -102,7 +103,8 @@ public void setUp()
new TestUsedSegmentChecker(),
OBJECT_MAPPER,
MAX_ROWS_PER_SEGMENT,
HANDOFF_CONDITION_TIMEOUT
HANDOFF_CONDITION_TIMEOUT,
new FireDepartmentMetrics()
);
}

Expand Down

0 comments on commit 8e31a46

Please sign in to comment.