Skip to content

Commit

Permalink
Automatically determine numShards for parallel ingestion hash partiti…
Browse files Browse the repository at this point in the history
…oning (apache#10419)

* Automatically determine numShards for parallel ingestion hash partitioning

* Fix inspection, tests, coverage

* Docs and some PR comments

* Adjust locking

* Use HllSketch instead of HyperLogLogCollector

* Fix tests

* Address some PR comments

* Fix granularity bug

* Small doc fix
  • Loading branch information
jon-wei authored Sep 24, 2020
1 parent 89160c2 commit cb30b1f
Show file tree
Hide file tree
Showing 20 changed files with 1,198 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
{
static final String NAME = "hashed";
public static final String NAME = "hashed";
@VisibleForTesting
static final String NUM_SHARDS = "numShards";

Expand Down Expand Up @@ -160,7 +160,7 @@ public List<String> getPartitionDimensions()
@Override
public String getForceGuaranteedRollupIncompatiblityReason()
{
return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
public static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();

private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();

Expand Down Expand Up @@ -159,8 +159,7 @@ public static int hash(ObjectMapper jsonMapper, List<String> partitionDimensions
}
}

@VisibleForTesting
static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
public static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
Expand Down
11 changes: 9 additions & 2 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,18 @@ How the worker task creates segments is:
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `hashed`|none|yes|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|targetRowsPerSegment|A target row count for each partition. If `numShards` is left unspecified, the Parallel task will determine a partition count automatically such that each partition has a row count close to the target, assuming evenly distributed keys in the input data. A target per-segment row count of 5 million is used if both `numShards` and `targetRowsPerSegment` are null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are null)|no|

The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
The task runs in up to 3 phases: `partial dimension cardinality`, `partial segment generation` and `partial segment merge`.
- The `partial dimension cardinality` phase is an optional phase that only runs if `numShards` is not specified.
The Parallel task splits the input data and assigns them to worker tasks based on the split hint spec.
Each worker task (type `partial_dimension_cardinality`) gathers estimates of partitioning dimensions cardinality for
each time chunk. The Parallel task will aggregate these estimates from the worker tasks and determine the highest
cardinality across all of the time chunks in the input data, dividing this cardinality by `targetRowsPerSegment` to
automatically determine `numShards`.
- In the `partial segment generation` phase, just like the Map phase in MapReduce,
the Parallel task splits the input data based on the split hint spec
and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@

public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
{
public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();

private static final Logger log = new Logger(IndexTask.class);
private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
private static final String TYPE = "index";

private static String makeGroupId(IndexIngestionSpec ingestionSchema)
Expand Down Expand Up @@ -599,7 +600,8 @@ private PartitionAnalysis determineShardSpecs(
if (partitionsSpec.getType() == SecondaryPartitionType.HASH) {
return PartialHashSegmentGenerateTask.createHashPartitionAnalysisFromPartitionsSpec(
granularitySpec,
(HashedPartitionsSpec) partitionsSpec
(HashedPartitionsSpec) partitionsSpec,
null // not overriding numShards
);
} else if (partitionsSpec.getType() == SecondaryPartitionType.LINEAR) {
return createLinearPartitionAnalysis(granularitySpec, (DynamicPartitionsSpec) partitionsSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
Expand Down Expand Up @@ -62,6 +63,7 @@
// for backward compatibility
@Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = LegacySinglePhaseSubTask.class),
@Type(name = PartialHashSegmentGenerateTask.TYPE, value = PartialHashSegmentGenerateTask.class),
@Type(name = PartialDimensionCardinalityTask.TYPE, value = PartialDimensionCardinalityTask.class),
@Type(name = PartialRangeSegmentGenerateTask.TYPE, value = PartialRangeSegmentGenerateTask.class),
@Type(name = PartialDimensionDistributionTask.TYPE, value = PartialDimensionDistributionTask.class),
@Type(name = PartialGenericSegmentMergeTask.TYPE, value = PartialGenericSegmentMergeTask.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.hll.HllSketch;
import org.joda.time.Interval;

import java.util.Map;
import java.util.Objects;

public class DimensionCardinalityReport implements SubTaskReport
{
// We choose logK=11 because the following link shows that HllSketch with K=2048 has roughly the same
// serialized size as HyperLogLogCollector.
// http://datasketches.apache.org/docs/HLL/HllSketchVsDruidHyperLogLogCollector.html
public static final int HLL_SKETCH_LOG_K = 11;

static final String TYPE = "dimension_cardinality";
private static final String PROP_CARDINALITIES = "cardinalities";


private final String taskId;

/**
* A map of intervals to byte arrays, representing {@link HllSketch} objects,
* serialized using {@link HllSketch#toCompactByteArray()}.
*
* The HllSketch objects should be created with the HLL_SKETCH_LOG_K constant defined in this class.
*
* The collector is used to determine cardinality estimates for each interval.
*/
private final Map<Interval, byte[]> intervalToCardinalities;

@JsonCreator
public DimensionCardinalityReport(
@JsonProperty("taskId") String taskId,
@JsonProperty(PROP_CARDINALITIES) Map<Interval, byte[]> intervalToCardinalities
)
{
this.taskId = taskId;
this.intervalToCardinalities = intervalToCardinalities;
}

@Override
@JsonProperty
public String getTaskId()
{
return taskId;
}

@JsonProperty(PROP_CARDINALITIES)
public Map<Interval, byte[]> getIntervalToCardinalities()
{
return intervalToCardinalities;
}

@Override
public String toString()
{
return "DimensionCardinalityReport{" +
"taskId='" + taskId + '\'' +
", intervalToCardinalities=" + intervalToCardinalities +
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DimensionCardinalityReport that = (DimensionCardinalityReport) o;
return Objects.equals(getTaskId(), that.getTaskId()) &&
Objects.equals(getIntervalToCardinalities(), that.getIntervalToCardinalities());
}

@Override
public int hashCode()
{
return Objects.hash(getTaskId(), getIntervalToCardinalities());
}

public static HllSketch createHllSketchForReport()
{
return new HllSketch(HLL_SKETCH_LOG_K);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
import com.google.common.collect.Multimap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.Counters;
Expand Down Expand Up @@ -271,9 +275,9 @@ SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolb
}

@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(TaskToolbox toolbox)
PartialDimensionCardinalityParallelIndexTaskRunner createPartialDimensionCardinalityRunner(TaskToolbox toolbox)
{
return new PartialHashSegmentGenerateParallelIndexTaskRunner(
return new PartialDimensionCardinalityParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
Expand All @@ -282,6 +286,22 @@ PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenera
);
}

@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(
TaskToolbox toolbox,
Integer numShardsOverride
)
{
return new PartialHashSegmentGenerateParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
ingestionSchema,
getContext(),
numShardsOverride
);
}

@VisibleForTesting
PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox toolbox)
{
Expand Down Expand Up @@ -499,17 +519,67 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception

private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
TaskState state;

if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
// only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
throw new ISE(
"forceGuaranteedRollup is set but partitionsSpec [%s] is not a single_dim or hash partition spec.",
ingestionSchema.getTuningConfig().getPartitionsSpec()
);
}

final Integer numShardsOverride;
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
if (partitionsSpec.getNumShards() == null) {
// 0. need to determine numShards by scanning the data
LOG.info("numShards is unspecified, beginning %s phase.", PartialDimensionCardinalityTask.TYPE);
ParallelIndexTaskRunner<PartialDimensionCardinalityTask, DimensionCardinalityReport> cardinalityRunner =
createRunner(
toolbox,
this::createPartialDimensionCardinalityRunner
);

if (cardinalityRunner == null) {
throw new ISE("Could not create cardinality runner for hash partitioning.");
}

state = runNextPhase(cardinalityRunner);
if (state.isFailure()) {
return TaskStatus.failure(getId());
}

int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null
? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
: partitionsSpec.getMaxRowsPerSegment();
LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment);

if (cardinalityRunner.getReports() == null) {
throw new ISE("Could not determine cardinalities for hash partitioning.");
}
numShardsOverride = determineNumShardsFromCardinalityReport(
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);

LOG.info("Automatically determined numShards: " + numShardsOverride);
} else {
numShardsOverride = null;
}

// 1. Partial segment generation phase
ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner
= createRunner(toolbox, this::createPartialHashSegmentGenerateRunner);
ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
createRunner(
toolbox,
f -> createPartialHashSegmentGenerateRunner(toolbox, numShardsOverride)
);

TaskState state = runNextPhase(indexingRunner);
state = runNextPhase(indexingRunner);
if (state.isFailure()) {
return TaskStatus.failure(getId());
}

// 2. Partial segment merge phase

// partition (interval, partitionId) -> partition locations
Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations =
groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
Expand Down Expand Up @@ -582,6 +652,50 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro
return TaskStatus.fromCode(getId(), mergeState);
}

@VisibleForTesting
public static int determineNumShardsFromCardinalityReport(
Collection<DimensionCardinalityReport> reports,
int maxRowsPerSegment
)
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = new HashMap<>();
reports.forEach(report -> {
Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
Union union = finalCollectors.computeIfAbsent(
entry.getKey(),
(key) -> {
return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
}
);
HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
union.update(entryHll);
}
});

// determine the highest cardinality in any interval
long maxCardinality = 0;
for (Union union : finalCollectors.values()) {
maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
}

LOG.info("Estimated max cardinality: " + maxCardinality);

// determine numShards based on maxRowsPerSegment and the highest per-interval cardinality
long numShards = maxCardinality / maxRowsPerSegment;
if (maxCardinality % maxRowsPerSegment != 0) {
// if there's a remainder add 1 so we stay under maxRowsPerSegment
numShards += 1;
}
try {
return Math.toIntExact(numShards);
}
catch (ArithmeticException ae) {
throw new ISE("Estimated numShards [%s] exceeds integer bounds.", numShards);
}
}

private Map<Interval, PartitionBoundaries> determineAllRangePartitions(Collection<DimensionDistributionReport> reports)
{
Multimap<Interval, StringDistribution> intervalToDistributions = ArrayListMultimap.create();
Expand Down
Loading

0 comments on commit cb30b1f

Please sign in to comment.