Skip to content

Commit

Permalink
SQL support for nested groupBys. (apache#3806)
Browse files Browse the repository at this point in the history
* SQL support for nested groupBys.

Allows, for example, doing exact count distinct by writing:

  SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)

Contrast with approximate count distinct, which is:

  SELECT COUNT(DISTINCT col) FROM druid.foo

* Add deeply-nested groupBy docs, tests, and maxQueryCount config.

* Extract magic constants into statics.

* Rework rules to put preconditions in the "matches" method.
  • Loading branch information
gianm authored and jon-wei committed Jan 12, 2017
1 parent 7662061 commit e86859b
Show file tree
Hide file tree
Showing 31 changed files with 2,118 additions and 1,002 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
Expand Down Expand Up @@ -133,9 +134,8 @@ public void setup() throws Exception
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
walker,
new QueryMaker(walker, plannerConfig),
new TableDataSource("foo"),
plannerConfig,
ImmutableMap.of(
"__time", ValueType.LONG,
"dimSequential", ValueType.STRING,
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The broker's [SQL planner](../querying/sql.html) can be configured through the f

|Property|Description|Default|
|--------|-----------|-------|
|`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8|
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
Expand Down
6 changes: 6 additions & 0 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ indexing mechanism, and runs the outer query on these materialized results. "v2"
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
strategy perform the outer query on the broker in a single-threaded fashion.

Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
safe and do not suffer from this issue.

#### Server configuration

When using the "v1" strategy, the following runtime properties apply:
Expand Down
42 changes: 38 additions & 4 deletions docs/content/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ If `druid.sql.planner.useFallback` is enabled, full SQL is possible on metadata
recommended in production since it can generate unscalable query plans. The JDBC driver allows accessing
table and column metadata through `connection.getMetaData()` even if useFallback is off.

### Approximate queries

The following SQL queries and features may be executed using approximate algorithms:

- `COUNT(DISTINCT col)` aggregations use [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf), a
fast approximate distinct counting algorithm. If you need exact distinct counts, you can instead use
`SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)`, which will use a slower and more resource intensive exact
algorithm.
- TopN-style queries with a single grouping column, like
`SELECT col1, SUM(col2) FROM druid.foo GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed
as [TopN queries](topnquery.html), which use an approximate algorithm. To disable this behavior, and use exact
algorithms for topN-style queries, set
[druid.sql.planner.useApproximateTopN](../configuration/broker.html#sql-planner-configuration) to "false".

### Time functions

Druid's SQL language supports a number of time operations, including:
Expand All @@ -85,12 +99,33 @@ Druid's SQL language supports a number of time operations, including:
- `EXTRACT(<granularity> FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM druid.foo GROUP BY EXTRACT(HOUR FROM __time)`
- Comparisons to `TIMESTAMP '<time string>'` for time filters, like `SELECT COUNT(*) FROM druid.foo WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2001-01-01 00:00:00'`

### Semi-joins
### Subqueries

Druid's SQL layer supports many types of subqueries, including the ones listed below.

#### Nested groupBy

Subqueries involving `FROM (SELECT ... GROUP BY ...)` may be executed as
[nested groupBys](groupbyquery.html#nested-groupbys). For example, the following query can be used to perform an
exact distinct count using a nested groupBy.

```sql
SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)
```

Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
safe and do not suffer from this issue. If you like, you can forbid deeper nesting by setting
`druid.sql.planner.maxQueryCount = 2`.

#### Semi-joins

Semi-joins involving `IN (SELECT ...)`, like the following, are planned with a special process.
Semi-join subqueries involving `WHERE ... IN (SELECT ...)`, like the following, are executed with a special process.

```sql
SELECT x, count(*)
SELECT x, COUNT(*)
FROM druid.foo
WHERE x IN (SELECT x FROM druid.bar WHERE y = 'baz')
GROUP BY x
Expand Down Expand Up @@ -121,7 +156,6 @@ Additionally, some Druid features are not supported by the SQL language. Some un

- [Multi-value dimensions](multi-value-dimensions.html).
- [Query-time lookups](lookups.html).
- [Nested groupBy queries](groupbyquery.html#nested-groupbys).
- Extensions, including [approximate histograms](../development/extensions-core/approximate-histograms.html) and
[DataSketches](../development/extensions-core/datasketches-aggregators.html).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,35 @@
@RunWith(Parameterized.class)
public class GroupByQueryRunnerTest
{
public static final ObjectMapper DEFAULT_MAPPER = new DefaultObjectMapper(new SmileFactory());
public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}

@Override
public int intermediateComputeSizeBytes()
{
return 10 * 1024 * 1024;
}

@Override
public int getNumMergeBuffers()
{
// There are some tests that need to allocate two buffers (simulating two levels of merging)
return 2;
}

@Override
public int getNumThreads()
{
return 2;
}
};

private final QueryRunner<Row> runner;
private GroupByQueryRunnerFactory factory;
private GroupByQueryConfig config;
Expand Down Expand Up @@ -253,13 +282,22 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config
)
{
return makeQueryRunnerFactory(new DefaultObjectMapper(new SmileFactory()), config);
return makeQueryRunnerFactory(DEFAULT_MAPPER, config, DEFAULT_PROCESSING_CONFIG);
}

public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config
)
{
return makeQueryRunnerFactory(mapper, config, DEFAULT_PROCESSING_CONFIG);
}

public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
Expand All @@ -269,7 +307,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(10 * 1024 * 1024);
return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
}
}
);
Expand All @@ -279,10 +317,10 @@ public ByteBuffer get()
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(10 * 1024 * 1024);
return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
}
},
2 // There are some tests that need to allocate two buffers (simulating two levels of merging)
processingConfig.getNumMergeBuffers()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
Expand All @@ -293,20 +331,7 @@ public ByteBuffer get()
bufferPool
),
new GroupByStrategyV2(
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}

@Override
public int getNumThreads()
{
return 2;
}
},
processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
Expand Down
6 changes: 4 additions & 2 deletions sql/src/main/java/io/druid/sql/calcite/DruidSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.segment.column.ValueType;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.timeline.DataSegment;
import org.apache.calcite.linq4j.tree.Expression;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class DruidSchema extends AbstractSchema
private final TimelineServerView serverView;
private final PlannerConfig config;
private final ExecutorService cacheExec;
private final QueryMaker queryMaker;
private final ConcurrentMap<String, Table> tables;

// For awaitInitialization.
Expand All @@ -102,6 +104,7 @@ public DruidSchema(
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
this.config = Preconditions.checkNotNull(config, "config");
this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
this.queryMaker = new QueryMaker(walker, config);
this.tables = Maps.newConcurrentMap();
}

Expand Down Expand Up @@ -349,9 +352,8 @@ private DruidTable computeTable(final String dataSource)
}

return new DruidTable(
walker,
queryMaker,
new TableDataSource(dataSource),
config,
columnValueTypes
);
}
Expand Down
42 changes: 19 additions & 23 deletions sql/src/main/java/io/druid/sql/calcite/expression/Expressions.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import io.druid.segment.column.Column;
import io.druid.sql.calcite.aggregation.PostAggregatorFactory;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexCall;
Expand Down Expand Up @@ -112,23 +112,23 @@ private Expressions()
}

/**
* Translate a field access, possibly through a projection, to an underlying Druid table.
* Translate a field access, possibly through a projection, to an underlying Druid dataSource.
*
* @param druidTable underlying Druid table
* @param project projection, or null
* @param fieldNumber number of the field to access
* @param rowSignature row signature of underlying Druid dataSource
* @param project projection, or null
* @param fieldNumber number of the field to access
*
* @return row expression
*/
public static RexNode fromFieldAccess(
final DruidTable druidTable,
final RowSignature rowSignature,
final Project project,
final int fieldNumber
)
{
if (project == null) {
// I don't think the factory impl matters here.
return RexInputRef.of(fieldNumber, druidTable.getRowType(new JavaTypeFactoryImpl()));
return RexInputRef.of(fieldNumber, rowSignature.getRelDataType(new JavaTypeFactoryImpl()));
} else {
return project.getChildExps().get(fieldNumber);
}
Expand Down Expand Up @@ -332,13 +332,11 @@ public static String toMathExpression(
/**
* Translates "condition" to a Druid filter, or returns null if we cannot translate the condition.
*
* @param druidTable Druid table, if the rows come from a table scan; null otherwise
* @param rowOrder order of columns in the rows to be filtered
* @param expression Calcite row expression
* @param rowSignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
*/
public static DimFilter toFilter(
final DruidTable druidTable,
final List<String> rowOrder,
final RowSignature rowSignature,
final RexNode expression
)
{
Expand All @@ -347,7 +345,7 @@ public static DimFilter toFilter(
|| expression.getKind() == SqlKind.NOT) {
final List<DimFilter> filters = Lists.newArrayList();
for (final RexNode rexNode : ((RexCall) expression).getOperands()) {
final DimFilter nextFilter = toFilter(druidTable, rowOrder, rexNode);
final DimFilter nextFilter = toFilter(rowSignature, rexNode);
if (nextFilter == null) {
return null;
}
Expand All @@ -364,21 +362,19 @@ public static DimFilter toFilter(
}
} else {
// Handle filter conditions on everything else.
return toLeafFilter(druidTable, rowOrder, expression);
return toLeafFilter(rowSignature, expression);
}
}

/**
* Translates "condition" to a Druid filter, assuming it does not contain any boolean expressions. Returns null
* if we cannot translate the condition.
*
* @param druidTable Druid table, if the rows come from a table scan; null otherwise
* @param rowOrder order of columns in the rows to be filtered
* @param expression Calcite row expression
* @param rowSignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
*/
private static DimFilter toLeafFilter(
final DruidTable druidTable,
final List<String> rowOrder,
final RowSignature rowSignature,
final RexNode expression
)
{
Expand All @@ -392,8 +388,8 @@ private static DimFilter toLeafFilter(

if (kind == SqlKind.LIKE) {
final List<RexNode> operands = ((RexCall) expression).getOperands();
final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowOrder, operands.get(0));
if (rex == null || !rex.isFilterable(druidTable)) {
final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowSignature.getRowOrder(), operands.get(0));
if (rex == null || !rex.isFilterable(rowSignature)) {
return null;
}
return new LikeDimFilter(
Expand Down Expand Up @@ -428,8 +424,8 @@ private static DimFilter toLeafFilter(
}

// lhs must be translatable to a RowExtraction to be filterable
final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowOrder, lhs);
if (rex == null || !rex.isFilterable(druidTable)) {
final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowSignature.getRowOrder(), lhs);
if (rex == null || !rex.isFilterable(rowSignature)) {
return null;
}

Expand Down
Loading

0 comments on commit e86859b

Please sign in to comment.