Skip to content

Commit

Permalink
Use GroupBy V2 as default (apache#3953)
Browse files Browse the repository at this point in the history
* Use GroupBy V2 as default

* Remove unused line

* Change assert to exception propagation
  • Loading branch information
jon-wei authored and b-slim committed Feb 18, 2017
1 parent 361d9d9 commit bc33b68
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 28 deletions.
2 changes: 1 addition & 1 deletion docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
Expand Down
2 changes: 1 addition & 1 deletion docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
Expand Down
2 changes: 1 addition & 1 deletion docs/content/configuration/realtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ The realtime node uses several of the global configs in [Configuration](../confi
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
Expand Down
11 changes: 3 additions & 8 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ See [Multi-value dimensions](multi-value-dimensions.html) for more details.

GroupBy queries can be executed using two different strategies. The default strategy for a cluster is determined by the
"druid.query.groupBy.defaultStrategy" runtime property on the broker. This can be overridden using "groupByStrategy" in
the query context. If neither the context field nor the property is set, the "v1" strategy will be used.
the query context. If neither the context field nor the property is set, the "v2" strategy will be used.

- "v1", the default, generates per-segment results on data nodes (historical, realtime, middleManager) using a map which
is partially on-heap (dimension keys and the map itself) and partially off-heap (the aggregated values). Data nodes then
Expand Down Expand Up @@ -168,7 +168,7 @@ When using the "v1" strategy, the following runtime properties apply:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
Expand All @@ -177,17 +177,12 @@ When using the "v2" strategy, the following runtime properties apply:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|

Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that
does so. By default, Druid is configured without any merging buffer pool, so to use the "v2" strategy you must also
set `druid.processing.numMergeBuffers` to some non-zero number. Furthermore, if you want to execute deeply nested groupBys,
you must set `druid.processing.numMergeBuffers` to at least 2.

This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public int getNumThreads()
@Config("${base_path}.numMergeBuffers")
public int getNumMergeBuffers()
{
return 0;
return Math.max(2, getNumThreads() / 4);
}

@Config(value = "${base_path}.columnCache.sizeBytes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";

@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V1;
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;

@JsonProperty
private boolean singleThreaded = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void testDeserialization() throws Exception
} else {
Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors());
}
Assert.assertEquals(Math.max(2, config.getNumThreads() / 4), config.getNumMergeBuffers());
Assert.assertEquals(0, config.columnCacheSizeBytes());
Assert.assertFalse(config.isFifo());
Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir());
Expand All @@ -55,7 +56,7 @@ public void testDeserialization() throws Exception
Properties props = new Properties();
props.setProperty("druid.processing.buffer.sizeBytes", "1");
props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1");
props.setProperty("druid.processing.numThreads", "5");
props.setProperty("druid.processing.numThreads", "256");
props.setProperty("druid.processing.columnCache.sizeBytes", "1");
props.setProperty("druid.processing.fifo", "true");
props.setProperty("druid.processing.tmpDir", "/test/path");
Expand All @@ -65,7 +66,8 @@ public void testDeserialization() throws Exception

Assert.assertEquals(1, config.intermediateComputeSizeBytes());
Assert.assertEquals(1, config.poolCacheMaxCount());
Assert.assertEquals(5, config.getNumThreads());
Assert.assertEquals(256, config.getNumThreads());
Assert.assertEquals(64, config.getNumMergeBuffers());
Assert.assertEquals(1, config.columnCacheSizeBytes());
Assert.assertTrue(config.isFifo());
Assert.assertEquals("/test/path", config.getTmpDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.query.groupby;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -28,8 +29,10 @@
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
Expand All @@ -49,7 +52,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.Map;

/**
*/
Expand All @@ -61,15 +64,6 @@ public class GroupByQueryRunnerFactoryTest
@Test
public void testMergeRunnersEnsureGroupMerging() throws Exception
{
QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig());
QueryRunner mergedRunner = factory.mergeRunners(
Executors.newSingleThreadExecutor(),
ImmutableList.of(
factory.createRunner(createSegment()),
factory.createRunner(createSegment())
)
);

GroupByQuery query = GroupByQuery
.builder()
.setDataSource("xx")
Expand All @@ -86,6 +80,41 @@ public void testMergeRunnersEnsureGroupMerging() throws Exception
)
.build();

final QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig());

QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
{
return factory.getToolchest().mergeResults(
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
{
try {
return new MergeSequence(
query.getResultOrdering(),
Sequences.simple(
Arrays.asList(
factory.createRunner(createSegment()).run(query, responseContext),
factory.createRunner(createSegment()).run(query, responseContext)
)
)
);
} catch (Exception e) {
Throwables.propagate(e);
return null;
}
}
}
).run(query, responseContext);
}
}
);

Sequence<Row> result = mergedRunner.run(query, Maps.newHashMap());

List<Row> expectedResults = Arrays.asList(
Expand Down
Loading

0 comments on commit bc33b68

Please sign in to comment.