Skip to content

Commit

Permalink
Add druid.sql.approxCountDistinct.function property. (apache#11181)
Browse files Browse the repository at this point in the history
* Add druid.sql.approxCountDistinct.function property.

The new property allows admins to configure the implementation for
APPROX_COUNT_DISTINCT and COUNT(DISTINCT expr) in approximate mode.

The motivation for adding this setting is to enable site admins to
switch the default HLL implementation to DataSketches.

For example, an admin can set:

  druid.sql.approxCountDistinct.function = APPROX_COUNT_DISTINCT_DS_HLL

* Fixes

* Fix tests.

* Remove erroneous cannotVectorize.

* Remove unused import.

* Remove unused test imports.
  • Loading branch information
gianm authored Oct 25, 2021
1 parent 43383c7 commit 8276c03
Show file tree
Hide file tree
Showing 22 changed files with 378 additions and 75 deletions.
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of published segments in broker. If true, broker polls coordinator in background to get segments from metadata store and maintains a local cache. If false, coordinator's REST API will be invoked when broker needs published segments info.|false|
|`druid.sql.planner.metadataSegmentPollPeriod`|How often to poll coordinator for published segments list if `druid.sql.planner.metadataSegmentCacheEnable` is set to true. Poll period is in milliseconds. |60000|
|`druid.sql.planner.authorizeSystemTablesDirectly`|If true, Druid authorizes queries against any of the system schema tables (`sys` in SQL) as `SYSTEM_TABLE` resources which require `READ` access, in addition to permissions based content filtering.|false|
|`druid.sql.approxCountDistinct.function`|Implementation to use for the [`APPROX_COUNT_DISTINCT` function](../querying/sql.md#aggregation-functions). Without extensions loaded, the only valid value is `APPROX_COUNT_DISTINCT_BUILTIN` (a HyperLogLog, or HLL, based implementation). If the [DataSketches extension](../development/extensions-core/datasketches-extension.md) is loaded, this can also be `APPROX_COUNT_DISTINCT_DS_HLL` (alternative HLL implementation) or `APPROX_COUNT_DISTINCT_DS_THETA`.<br><br>Theta sketches use significantly more memory than HLL sketches, so you should prefer one of the two HLL implementations.|APPROX_COUNT_DISTINCT_BUILTIN|

> Previous versions of Druid had properties named `druid.sql.planner.maxQueryCount` and `druid.sql.planner.maxSemiJoinRowsInMemory`.
> These properties are no longer available. Since Druid 0.18.0, you can use `druid.server.http.maxSubqueryRows` to control the maximum
Expand Down
9 changes: 5 additions & 4 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,15 @@ Only the COUNT, ARRAY_AGG, and STRING_AGG aggregations can accept the DISTINCT k
|Function|Notes|Default|
|--------|-----|-------|
|`COUNT(*)`|Counts the number of rows.|`0`|
|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted unless `useGroupingSetForExactDistinct` is set to true in query contexts or broker configurations.|`0`|
|`COUNT(DISTINCT expr)`|Counts distinct values of expr.<br><br>When "useApproximateCountDistinct" is set to "true" (the default), this is an alias for APPROX_COUNT_DISTINCT. The specific algorithm that will be used depends on the value of [`druid.sql.approxCountDistinct.function`](../configuration/index.md#sql). In this mode, you can could strings, numbers, or prebuilt sketches. If counting prebuilt sketches, the prebuilt sketch type must match the selected algorithm.<br><br>When "useApproximateCountDistinct" is set to "false", the computation will be exact. In this case, expr must be string or numeric, since exact counts are not possible using prebuilt sketches. In exact mode, only one distinct count per query is permitted unless "useGroupingSetForExactDistinct" is enabled.|
|`SUM(expr)`|Sums numbers.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`MIN(expr)`|Takes the minimum of numbers.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `9223372036854775807` (maximum LONG value)|
|`MAX(expr)`|Takes the maximum of numbers.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `-9223372036854775808` (minimum LONG value)|
|`AVG(expr)`|Averages numbers.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`APPROX_COUNT_DISTINCT(expr)`|_Usage note:_ consider using `APPROX_COUNT_DISTINCT_DS_HLL` instead, which offers better accuracy in many cases.<br/><br/>Counts distinct values of expr, which can be a regular column or a hyperUnique column. This is always approximate, regardless of the value of "useApproximateCountDistinct". This uses Druid's built-in "cardinality" or "hyperUnique" aggregators. See also `COUNT(DISTINCT expr)`.|`0`|
|`APPROX_COUNT_DISTINCT_DS_HLL(expr, [lgK, tgtHllType])`|Counts distinct values of `expr`, which can be a regular column or an [HLL sketch](../development/extensions-core/datasketches-hll.md) column. Results are always approximate, regardless of the value of [`useApproximateCountDistinct`](#connection-context). The `lgK` and `tgtHllType` parameters here are, like the equivalents in the [aggregator](../development/extensions-core/datasketches-hll.md#aggregators), described in the HLL sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function. See also `COUNT(DISTINCT expr)`. |`0`|
|`APPROX_COUNT_DISTINCT_DS_THETA(expr, [size])`|Counts distinct values of expr, which can be a regular column or a [Theta sketch](../development/extensions-core/datasketches-theta.md) column. This is always approximate, regardless of the value of [`useApproximateCountDistinct`](#connection-context). The `size` parameter is described in the Theta sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function. See also `COUNT(DISTINCT expr)`. |`0`|
|`APPROX_COUNT_DISTINCT(expr)`|Counts distinct values of expr using an approximate algorithm. The expr can be a regular column or a prebuilt sketch column.<br><br>The specific algorithm that will be used depends on the value of [`druid.sql.approxCountDistinct.function`](../configuration/index.md#sql). By default, this is `APPROX_COUNT_DISTINCT_BUILTIN`. If the [DataSketches extension](../development/extensions-core/datasketches-extension.md) is loaded, this can also be set to `APPROX_COUNT_DISTINCT_DS_HLL` or `APPROX_COUNT_DISTINCT_DS_THETA`.<br><br>When run on prebuilt sketch columns, the sketch column type must match the implementation of this function. For example: when `druid.sql.approxCountDistinct.function` is set to `APPROX_COUNT_DISTINCT_BUILTIN`, this function will be able to run on prebuilt hyperUnique columns, but not on prebuilt HLLSketchBuild columns.|
|`APPROX_COUNT_DISTINCT_BUILTIN(expr)`|_Usage note:_ consider using `APPROX_COUNT_DISTINCT_DS_HLL` instead, which offers better accuracy in many cases.<br/><br/>Counts distinct values of expr using Druid's built-in "cardinality" or "hyperUnique" aggregators, which implement a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). The expr can be a string, number, or prebuilt hyperUnique column. This is always approximate, regardless of the value of "useApproximateCountDistinct".|
|`APPROX_COUNT_DISTINCT_DS_HLL(expr, [lgK, tgtHllType])`|Counts distinct values of expr, which can be a regular column or an [HLL sketch](../development/extensions-core/datasketches-hll.md) column. Results are always approximate, regardless of the value of [`useApproximateCountDistinct`](#connection-context). The `lgK` and `tgtHllType` parameters here are, like the equivalents in the [aggregator](../development/extensions-core/datasketches-hll.md#aggregators), described in the HLL sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function. See also `COUNT(DISTINCT expr)`. |`0`|
|`APPROX_COUNT_DISTINCT_DS_THETA(expr, [size])`|Counts distinct values of expr, which can be a regular column or a [Theta sketch](../development/extensions-core/datasketches-theta.md) column. Results are always approximate, regardless of the value of [`useApproximateCountDistinct`](#connection-context). The `size` parameter is described in the Theta sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function. See also `COUNT(DISTINCT expr)`. |`0`|
|`DS_HLL(expr, [lgK, tgtHllType])`|Creates an [HLL sketch](../development/extensions-core/datasketches-hll.md) on the values of expr, which can be a regular column or a column containing HLL sketches. The `lgK` and `tgtHllType` parameters are described in the HLL sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function.|`'0'` (STRING)|
|`DS_THETA(expr, [size])`|Creates a [Theta sketch](../development/extensions-core/datasketches-theta.md) on the values of expr, which can be a regular column or a column containing Theta sketches. The `size` parameter is described in the Theta sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function.|`'0.0'` (STRING)|
|`APPROX_QUANTILE(expr, probability, [resolution])`|_Deprecated._ Use `APPROX_QUANTILE_DS` instead, which provides a superior distribution-independent algorithm with formal error guarantees.<br/><br/>Computes approximate quantiles on numeric or [approxHistogram](../development/extensions-core/approximate-histograms.md#approximate-histogram-aggregator) exprs. The "probability" should be between 0 and 1 (exclusive). The "resolution" is the number of centroids to use for the computation. Higher resolutions will give more precise results but also have higher overhead. If not provided, the default resolution is 50. The [approximate histogram extension](../development/extensions-core/approximate-histograms.md) must be loaded to use this function.|`NaN`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public void configure(final Binder binder)
SqlBindings.addOperatorConversion(binder, HllSketchEstimateWithErrorBoundsOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HllSketchSetUnionOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HllSketchToStringOperatorConversion.class);

SqlBindings.addApproxCountDistinctChoice(
binder,
HllSketchApproxCountDistinctSqlAggregator.NAME,
HllSketchApproxCountDistinctSqlAggregator.class
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@

public class HllSketchApproxCountDistinctSqlAggregator extends HllSketchBaseSqlAggregator implements SqlAggregator
{
public static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";

private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchApproxCountDistinctSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";

@Override
public SqlAggFunction calciteFunction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public void configure(Binder binder)
SqlBindings.addOperatorConversion(binder, ThetaSketchSetIntersectOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchSetUnionOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchSetNotOperatorConversion.class);

SqlBindings.addApproxCountDistinctChoice(
binder,
ThetaSketchApproxCountDistinctSqlAggregator.NAME,
ThetaSketchApproxCountDistinctSqlAggregator.class
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@

public class ThetaSketchApproxCountDistinctSqlAggregator extends ThetaSketchBaseSqlAggregator implements SqlAggregator
{
public static final String NAME = "APPROX_COUNT_DISTINCT_DS_THETA";

private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_THETA";

@Override
public SqlAggFunction calciteFunction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
Expand Down Expand Up @@ -122,10 +124,17 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker() throws IOEx
@Override
public DruidOperatorTable createOperatorTable()
{
final HllSketchApproxCountDistinctSqlAggregator approxCountDistinctSqlAggregator =
new HllSketchApproxCountDistinctSqlAggregator();

return new DruidOperatorTable(
ImmutableSet.of(
new HllSketchApproxCountDistinctSqlAggregator(),
new HllSketchObjectSqlAggregator()
approxCountDistinctSqlAggregator,
new HllSketchObjectSqlAggregator(),

// Use APPROX_COUNT_DISTINCT_DS_HLL as APPROX_COUNT_DISTINCT impl for these tests.
new CountSqlAggregator(new ApproxCountDistinctSqlAggregator(approxCountDistinctSqlAggregator)),
new ApproxCountDistinctSqlAggregator(approxCountDistinctSqlAggregator)
),
ImmutableSet.of(
new HllSketchSetUnionOperatorConversion(),
Expand All @@ -142,6 +151,16 @@ public void testApproxCountDistinctHllSketch() throws Exception
// Can't vectorize due to SUBSTRING expression.
cannotVectorize();

final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n" // uppercase
+ " APPROX_COUNT_DISTINCT_DS_HLL(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
+ " APPROX_COUNT_DISTINCT(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn, using generic A.C.D.
+ " COUNT(DISTINCT SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression, using COUNT DISTINCT
+ " APPROX_COUNT_DISTINCT_DS_HLL(hllsketch_dim1, 21, 'HLL_8'),\n" // on native HllSketch column
+ " APPROX_COUNT_DISTINCT_DS_HLL(hllsketch_dim1)\n" // on native HllSketch column
+ "FROM druid.foo";

final List<Object[]> expectedResults;

if (NullHandling.replaceWithDefault()) {
Expand All @@ -155,15 +174,7 @@ public void testApproxCountDistinctHllSketch() throws Exception
}

testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n" // uppercase
+ " APPROX_COUNT_DISTINCT_DS_HLL(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
+ " APPROX_COUNT_DISTINCT_DS_HLL(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
+ " APPROX_COUNT_DISTINCT_DS_HLL(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
+ " APPROX_COUNT_DISTINCT_DS_HLL(hllsketch_dim1, 21, 'HLL_8'),\n" // on native HllSketch column
+ " APPROX_COUNT_DISTINCT_DS_HLL(hllsketch_dim1)\n" // on native HllSketch column
+ "FROM druid.foo",
sql,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
Expand Down Expand Up @@ -118,10 +120,17 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker() throws IOEx
@Override
public DruidOperatorTable createOperatorTable()
{
final ThetaSketchApproxCountDistinctSqlAggregator approxCountDistinctSqlAggregator =
new ThetaSketchApproxCountDistinctSqlAggregator();

return new DruidOperatorTable(
ImmutableSet.of(
new ThetaSketchApproxCountDistinctSqlAggregator(),
new ThetaSketchObjectSqlAggregator()
new ThetaSketchObjectSqlAggregator(),

// Use APPROX_COUNT_DISTINCT_DS_THETA as APPROX_COUNT_DISTINCT impl for these tests.
new CountSqlAggregator(new ApproxCountDistinctSqlAggregator(approxCountDistinctSqlAggregator)),
new ApproxCountDistinctSqlAggregator(approxCountDistinctSqlAggregator)
),
ImmutableSet.of(
new ThetaSketchEstimateOperatorConversion(),
Expand All @@ -133,13 +142,28 @@ public DruidOperatorTable createOperatorTable()
);
}


@Test
public void testApproxCountDistinctThetaSketch() throws Exception
{
// Cannot vectorize due to SUBSTRING.
cannotVectorize();

final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n"
// uppercase
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n"
// lowercase; also, filtered
+ " APPROX_COUNT_DISTINCT(SUBSTRING(dim2, 1, 1)),\n"
// on extractionFn, using A.C.D.
+ " COUNT(DISTINCT SUBSTRING(dim2, 1, 1) || 'x'),\n"
// on expression, using COUNT DISTINCT
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n"
// on native theta sketch column
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n"
// on native theta sketch column
+ "FROM druid.foo";

final List<Object[]> expectedResults;

if (NullHandling.replaceWithDefault()) {
Expand Down Expand Up @@ -169,21 +193,7 @@ public void testApproxCountDistinctThetaSketch() throws Exception
}

testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n"
// uppercase
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n"
// lowercase; also, filtered
+ " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n"
// on extractionFn
+ " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n"
// on expression
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n"
// on native theta sketch column
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n"
// on native theta sketch column
+ "FROM druid.foo",
sql,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.BuiltinApproxCountDistinctSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
Expand All @@ -56,7 +59,9 @@ public DruidOperatorTable createOperatorTable()
{
CalciteTests.getJsonMapper().registerModule(new BloomFilterSerializersModule());
return new DruidOperatorTable(
ImmutableSet.of(),
ImmutableSet.of(
new CountSqlAggregator(new ApproxCountDistinctSqlAggregator(new BuiltinApproxCountDistinctSqlAggregator()))
),
ImmutableSet.of(new BloomFilterOperatorConversion())
);
}
Expand Down
4 changes: 4 additions & 0 deletions sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
Expand Down
Loading

0 comments on commit 8276c03

Please sign in to comment.