Skip to content

Commit

Permalink
ability to not rollup at index time, make pre aggregation an option (a…
Browse files Browse the repository at this point in the history
…pache#3020)

* ability to not rollup at index time, make pre aggregation an option

* rename getRowIndexForRollup to getPriorIndex

* fix doc misspelling

* test query using no-rollup indexes

* fix benchmark fail due to jmh bug
  • Loading branch information
kaijianding authored and fjy committed Aug 2, 2016
1 parent 0bdaaa2 commit 50d52a2
Show file tree
Hide file tree
Showing 50 changed files with 1,247 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class IncrementalIndexReadBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
Expand Down Expand Up @@ -125,6 +128,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class IndexIngestionBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IndexIngestionBenchmark.class);
private static final int RNG_SEED = 9999;

Expand Down Expand Up @@ -107,11 +110,12 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
true,
rowsPerSegment
rowsPerSegment * 2
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class IndexMergeBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMerger INDEX_MERGER;
Expand Down Expand Up @@ -155,6 +158,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand All @@ -174,7 +178,7 @@ public void merge(Blackhole blackhole) throws Exception
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();

File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());

blackhole.consume(mergedFile);

Expand All @@ -192,7 +196,7 @@ public void mergeV9(Blackhole blackhole) throws Exception
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();

File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());

blackhole.consume(mergedFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class IndexPersistBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IndexPersistBenchmark.class);
private static final int RNG_SEED = 9999;

Expand Down Expand Up @@ -156,6 +159,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand Down
2 changes: 2 additions & 0 deletions docs/content/ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ This spec is used to generated segments with uniform intervals.
| type | string | The type of granularity spec. | no (default == 'uniform') |
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |

### Arbitrary Granularity Spec
Expand All @@ -196,6 +197,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |

# IO Config
Expand Down
6 changes: 5 additions & 1 deletion docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,18 @@ Append tasks append a list of segments together into a single segment (one after

### Merge Task

Merge tasks merge a list of segments together. Any common timestamps are merged. The grammar is:
Merge tasks merge a list of segments together. Any common timestamps are merged.
If rollup is disabled as part of ingestion, common timestamps are not merged and rows are reordered by their timestamp.

The grammar is:

```json
{
"type": "merge",
"id": <task_id>,
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"segments": <JSON list of DataSegment objects to merge>
}
```
Expand Down
6 changes: 6 additions & 0 deletions docs/content/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Segment metadata queries return per-segment information about:
* Interval the segment covers
* Column type of all the columns in the segment
* Estimated total segment byte size in if it was stored in a flat format
* Is the segment rolled up
* Segment id

```json
Expand Down Expand Up @@ -143,6 +144,11 @@ null if the aggregators are unknown or unmergeable (if merging is enabled).

* The form of the result is a map of column name to aggregator.

#### rollup

* `rollup` in the result is true/false/null.
* When merging is enabled, if some are rollup, others are not, result is null.

### lenientAggregatorMerge

Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public boolean run()
new UniformGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
intervals
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ private static IncrementalIndex makeIncrementalIndex(
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();

OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
Expand Down Expand Up @@ -514,13 +515,14 @@ protected File mergeQueryableIndex(
ProgressIndicator progressIndicator
) throws IOException
{
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce
new UniformGranularitySpec(
segmentGranularity,
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
Lists.newArrayList(bucketsToRun)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void finishJob()
}

fileToUpload = new File(tmpSegmentDir, "merged");
theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
}

// Map merged segment so we can extract dimensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MergeTask extends MergeTaskBase
{
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final Boolean rollup;
private final IndexSpec indexSpec;

@JsonCreator
Expand All @@ -52,12 +53,14 @@ public MergeTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments, context);
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}

Expand All @@ -82,6 +85,7 @@ public QueryableIndex apply(@Nullable File input)
}
}
),
rollup,
aggregators.toArray(new AggregatorFactory[aggregators.size()]),
outDir,
indexSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void testMergeTaskSerde() throws Exception
"foo",
segments,
aggregators,
true,
indexSpec,
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ public static SegmentAnalysis mergeAnalyses(
mergedId = "merged";
}

final Boolean rollup;

if (arg1.isRollup() != null && arg2.isRollup() != null && arg1.isRollup().equals(arg2.isRollup())) {
rollup = arg1.isRollup();
} else {
rollup = null;
}

return new SegmentAnalysis(
mergedId,
newIntervals,
Expand All @@ -363,7 +371,8 @@ public static SegmentAnalysis mergeAnalyses(
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity
queryGranularity,
rollup
);
}

Expand All @@ -378,7 +387,8 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
analysis.getNumRows(),
analysis.getAggregators(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity()
analysis.getQueryGranularity(),
analysis.isRollup()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
queryGranularity = null;
}

Boolean rollup = null;
if (query.hasRollup()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
rollup = metadata != null ? metadata.isRollup() : null;
if (rollup == null) {
// in this case, this segment is built before no-rollup function is coded,
// thus it is built with rollup
rollup = Boolean.TRUE;
}
}

return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
Expand All @@ -158,7 +171,8 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
numRows,
aggregators,
timestampSpec,
queryGranularity
queryGranularity,
rollup
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
private final Map<String, AggregatorFactory> aggregators;
private final TimestampSpec timestampSpec;
private final QueryGranularity queryGranularity;
private final Boolean rollup;

@JsonCreator
public SegmentAnalysis(
Expand All @@ -50,7 +51,8 @@ public SegmentAnalysis(
@JsonProperty("numRows") long numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators,
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup
)
{
this.id = id;
Expand All @@ -61,6 +63,7 @@ public SegmentAnalysis(
this.aggregators = aggregators;
this.timestampSpec = timestampSpec;
this.queryGranularity = queryGranularity;
this.rollup = rollup;
}

@JsonProperty
Expand Down Expand Up @@ -105,6 +108,12 @@ public QueryGranularity getQueryGranularity()
return queryGranularity;
}

@JsonProperty
public Boolean isRollup()
{
return rollup;
}

@JsonProperty
public Map<String, AggregatorFactory> getAggregators()
{
Expand All @@ -123,6 +132,7 @@ public String toString()
", aggregators=" + aggregators +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
}

Expand All @@ -141,6 +151,7 @@ public boolean equals(Object o)
SegmentAnalysis that = (SegmentAnalysis) o;
return size == that.size &&
numRows == that.numRows &&
rollup == that.rollup &&
Objects.equals(id, that.id) &&
Objects.equals(interval, that.interval) &&
Objects.equals(columns, that.columns) &&
Expand All @@ -156,7 +167,7 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity);
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity, rollup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public enum AnalysisType
AGGREGATORS,
MINMAX,
TIMESTAMPSPEC,
QUERYGRANULARITY;
QUERYGRANULARITY,
ROLLUP;

@JsonValue
@Override
Expand Down Expand Up @@ -199,6 +200,11 @@ public boolean hasQueryGranularity()
return analysisTypes.contains(AnalysisType.QUERYGRANULARITY);
}

public boolean hasRollup()
{
return analysisTypes.contains(AnalysisType.ROLLUP);
}

public boolean hasMinMax()
{
return analysisTypes.contains(AnalysisType.MINMAX);
Expand Down
Loading

0 comments on commit 50d52a2

Please sign in to comment.