Skip to content

Commit

Permalink
SQL: Ditch CalciteConnection layer and add DruidMeta, extension aggre…
Browse files Browse the repository at this point in the history
…gators. (apache#3852)

* SQL: Ditch CalciteConnection layer and add DruidMeta, extension aggregators.

Switched from CalciteConnection to Planner, bringing benefits:

- CalciteConnection's JDBC interface no longer sits between the SQL server
  (HTTP/Avatica) and Druid's query layer. Instead, the SQL servers can use
  Druid Sequence objects directly, reducing overhead in the query return path.

- Implemented our own Planner-based Avatica Meta, letting us control
  connection timeouts and connection / statement limits. The previous
  CalciteConnection-based implementation didn't have any limits or timeouts.

- The Planner interface lets us override the operator table, opening up
  SQL language extensions. This patch includes two: APPROX_COUNT_DISTINCT
  in core, and a QUANTILE aggregator in the druid-histogram extension.

Also:

- Added INFORMATION_SCHEMA metadata schema.

- Added tests for Unicode literals and escapes.

* Verify statement is actually open before closing it.

* More detailed INFORMATION_SCHEMA docs.
  • Loading branch information
gianm authored and fjy committed Jan 20, 2017
1 parent 33ae9dd commit d51f5e0
Show file tree
Hide file tree
Showing 51 changed files with 4,106 additions and 1,064 deletions.
53 changes: 38 additions & 15 deletions benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,33 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.IndexBuilder;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.column.ValueType;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.calcite.jdbc.CalciteConnection;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.tools.Planner;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -71,8 +79,6 @@
import org.openjdk.jmh.infra.Blackhole;

import java.io.File;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -96,7 +102,7 @@ public class SqlBenchmark

private File tmpDir;
private SpecificSegmentsQuerySegmentWalker walker;
private CalciteConnection calciteConnection;
private PlannerFactory plannerFactory;
private GroupByQuery groupByQuery;
private String sqlQuery;

Expand Down Expand Up @@ -130,7 +136,23 @@ public void setup() throws Exception
log.info("%,d/%,d rows generated.", rows.size(), rowsPerSegment);

final PlannerConfig plannerConfig = new PlannerConfig();
walker = CalciteTests.createWalker(tmpDir, rows);
final QueryRunnerFactoryConglomerate conglomerate = CalciteTests.queryRunnerFactoryConglomerate();
final QueryableIndex index = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.indexMerger(TestHelper.getTestIndexMergerV9())
.rows(rows)
.buildMMappedIndex();

this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource("foo")
.interval(index.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index
);

final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
Expand All @@ -152,15 +174,19 @@ protected Map<String, Table> getTableMap()
return tableMap;
}
};
calciteConnection = Calcites.jdbc(druidSchema, plannerConfig);
plannerFactory = new PlannerFactory(
Calcites.createRootSchema(druidSchema),
CalciteTests.createOperatorTable(),
plannerConfig
);
groupByQuery = GroupByQuery
.builder()
.setDataSource("foo")
.setInterval(new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT))
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec("dimZipf", "d0"),
new DefaultDimensionSpec("dimSequential", "d1")
new DefaultDimensionSpec("dimSequential", "d1")
)
)
.setAggregatorSpecs(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("c")))
Expand Down Expand Up @@ -204,15 +230,12 @@ public void queryNative(Blackhole blackhole) throws Exception
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
public void queryPlanner(Blackhole blackhole) throws Exception
{
final ResultSet resultSet = calciteConnection.createStatement().executeQuery(sqlQuery);
final ResultSetMetaData metaData = resultSet.getMetaData();

while (resultSet.next()) {
for (int i = 0; i < metaData.getColumnCount(); i++) {
blackhole.consume(resultSet.getObject(i + 1));
}
try (final Planner planner = plannerFactory.createPlanner()) {
final PlannerResult plannerResult = Calcites.plan(planner, sqlQuery);
final ArrayList<Object[]> results = Sequences.toList(plannerResult.run(), Lists.<Object[]>newArrayList());
blackhole.consume(results);
}
}
}
7 changes: 5 additions & 2 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ The broker's [built-in SQL server](../querying/sql.html) can be configured throu
|Property|Description|Default|
|--------|-----------|-------|
|`druid.sql.enable`|Whether to enable SQL at all, including background metadata fetching. If false, this overrides all other SQL-related properties and disables SQL metadata, serving, and planning completely.|false|
|`druid.sql.server.enableAvatica`|Whether to enable an Avatica server at `/druid/v2/sql/avatica/`.|true|
|`druid.sql.server.enableJsonOverHttp`|Whether to enable a simple JSON over HTTP route at `/druid/v2/sql/`.|true|
|`druid.sql.avatica.enable`|Whether to enable an Avatica server at `/druid/v2/sql/avatica/`.|true|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT30M|
|`druid.sql.avatica.maxConnections`|Maximum number of open connections for the Avatica server. These are not HTTP connections, but are logical client connections that may span multiple HTTP connections.|25|
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
|`druid.sql.http.enable`|Whether to enable a simple JSON over HTTP route at `/druid/v2/sql/`.|true|

#### SQL Planner Configuration

Expand Down
116 changes: 87 additions & 29 deletions docs/content/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ subject to change.

Druid includes a native SQL layer with an [Apache Calcite](https://calcite.apache.org/)-based parser and planner. All
parsing and planning takes place on the Broker, where SQL is converted to native Druid queries. Those native Druid
queries are then passed down to data nodes. Each Druid dataSource appears as a table in the "druid" schema.
queries are then passed down to data nodes. Each Druid datasource appears as a table in the "druid" schema. Datasource
and column names are both case-sensitive and can optionally be quoted using double quotes. Literal strings should be
quoted with single quotes, like `'foo'`. Literal strings with Unicode escapes can be written like `U&'fo\00F6'`, where
character codes in hex are prefixed by a backslash.

Add "EXPLAIN PLAN FOR" to the beginning of any query to see how Druid will plan that query.

Expand All @@ -29,7 +32,7 @@ Example code:

```java
Connection connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/");
ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source");
while (resultSet.next()) {
// Do something
}
Expand All @@ -46,17 +49,17 @@ is:

```json
{
"query" : "SELECT COUNT(*) FROM druid.ds WHERE foo = ?"
"query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar'"
}
```

You can use _curl_ to send these queries from the command-line:

```bash
curl -XPOST -H'Content-Type: application/json' http://BROKER:8082/druid/v2/sql/ -d '{"query":"SELECT COUNT(*) FROM druid.ds"}'
curl -XPOST -H'Content-Type: application/json' http://BROKER:8082/druid/v2/sql/ -d '{"query":"SELECT COUNT(*) FROM data_source"}'
```

Metadata is not available over the HTTP API.
Metadata is only available over the HTTP API by querying the "INFORMATION_SCHEMA" tables (see below).

### Metadata

Expand All @@ -65,28 +68,28 @@ on broker startup and also periodically in the background through
[SegmentMetadata queries](../querying/segmentmetadataquery.html). Background metadata refreshing is triggered by
segments entering and exiting the cluster, and can also be throttled through configuration.

This cached metadata is queryable through the "metadata.COLUMNS" and "metadata.TABLES" tables. When
`druid.sql.planner.useFallback` is disabled (the default), only full scans of this table are possible. For example, to
retrieve column metadata, use the query:
This cached metadata is queryable through "INFORMATION_SCHEMA" tables. For example, to retrieve metadata for the Druid
datasource "foo", use the query:

```sql
SELECT * FROM metadata.COLUMNS
SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE SCHEMA_NAME = 'druid' AND TABLE_NAME = 'foo'
```

If `druid.sql.planner.useFallback` is enabled, full SQL is possible on metadata tables. However, useFallback is not
recommended in production since it can generate unscalable query plans. The JDBC driver allows accessing
table and column metadata through `connection.getMetaData()` even if useFallback is off.
See the [INFORMATION_SCHEMA tables](#information_schema-tables) section below for details on the available metadata.

You can also access table and column metadata through JDBC using `connection.getMetaData()`.

### Approximate queries

The following SQL queries and features may be executed using approximate algorithms:

- `COUNT(DISTINCT col)` aggregations use [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf), a
fast approximate distinct counting algorithm. If you need exact distinct counts, you can instead use
`SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)`, which will use a slower and more resource intensive exact
- `COUNT(DISTINCT col)` and `APPROX_COUNT_DISTINCT(col)` aggregations use
[HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf), a fast approximate distinct counting
algorithm. If you need exact distinct counts, you can instead use
`SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source)`, which will use a slower and more resource intensive exact
algorithm.
- TopN-style queries with a single grouping column, like
`SELECT col1, SUM(col2) FROM druid.foo GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed
`SELECT col1, SUM(col2) FROM data_source GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed
as [TopN queries](topnquery.html), which use an approximate algorithm. To disable this behavior, and use exact
algorithms for topN-style queries, set
[druid.sql.planner.useApproximateTopN](../configuration/broker.html#sql-planner-configuration) to "false".
Expand All @@ -95,9 +98,9 @@ algorithms for topN-style queries, set

Druid's SQL language supports a number of time operations, including:

- `FLOOR(__time TO <granularity>)` for grouping or filtering on time buckets, like `SELECT FLOOR(__time TO MONTH), SUM(cnt) FROM druid.foo GROUP BY FLOOR(__time TO MONTH)`
- `EXTRACT(<granularity> FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM druid.foo GROUP BY EXTRACT(HOUR FROM __time)`
- Comparisons to `TIMESTAMP '<time string>'` for time filters, like `SELECT COUNT(*) FROM druid.foo WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2001-01-01 00:00:00'`
- `FLOOR(__time TO <granularity>)` for grouping or filtering on time buckets, like `SELECT FLOOR(__time TO MONTH), SUM(cnt) FROM data_source GROUP BY FLOOR(__time TO MONTH)`
- `EXTRACT(<granularity> FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM data_source GROUP BY EXTRACT(HOUR FROM __time)`
- Comparisons to `TIMESTAMP '<time string>'` for time filters, like `SELECT COUNT(*) FROM data_source WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2001-01-01 00:00:00'`

### Subqueries

Expand All @@ -110,7 +113,7 @@ Subqueries involving `FROM (SELECT ... GROUP BY ...)` may be executed as
exact distinct count using a nested groupBy.

```sql
SELECT COUNT(*) FROM (SELECT DISTINCT col FROM druid.foo)
SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source)
```

Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
Expand All @@ -126,40 +129,95 @@ Semi-join subqueries involving `WHERE ... IN (SELECT ...)`, like the following,

```sql
SELECT x, COUNT(*)
FROM druid.foo
WHERE x IN (SELECT x FROM druid.bar WHERE y = 'baz')
FROM data_source_1
WHERE x IN (SELECT x FROM data_source_2 WHERE y = 'baz')
GROUP BY x
```

For this query, the broker will first translate the inner select on dataSource `bar` into a groupBy to find distinct
`x` values. Then it'll use those distinct values to build an "in" filter on dataSource `foo` for the outer query. The
For this query, the broker will first translate the inner select on data_source_2 into a groupBy to find distinct
`x` values. Then it'll use those distinct values to build an "in" filter on data_source_1 for the outer query. The
configuration parameter `druid.sql.planner.maxSemiJoinRowsInMemory` controls the maximum number of values that will be
materialized for this kind of plan.

### Configuration

Druid's SQL planner can be configured on the [Broker node](../configuration/broker.html#sql-planner-configuration).
Druid's SQL layer can be configured on the [Broker node](../configuration/broker.html#sql-planner-configuration).

### Extensions

Some Druid extensions also include SQL language extensions.

If the [approximate histogram extension](../development/extensions-core/approximate-histograms.html) is loaded:

- `QUANTILE(column, probability)` on numeric or approximate histogram columns computes approximate quantiles. The
"probability" should be between 0 and 1 (exclusive).

### Unsupported features

Druid does not support all SQL features. Most of these are due to missing features in Druid's native JSON-based query
language. Some unsupported SQL features include:

- Grouping on functions of multiple columns, like concatenation: `SELECT COUNT(*) FROM druid.foo GROUP BY dim1 || ' ' || dim2`
- Grouping on functions of multiple columns, like concatenation: `SELECT COUNT(*) FROM data_source GROUP BY dim1 || ' ' || dim2`
- Grouping on long and float columns.
- Filtering on float columns.
- Filtering on non-boolean interactions between columns, like two columns equaling each other: `SELECT COUNT(*) FROM druid.foo WHERE dim1 = dim2`.
- Filtering on non-boolean interactions between columns, like two columns equaling each other: `SELECT COUNT(*) FROM data_source WHERE dim1 = dim2`.
- A number of miscellaneous functions, like `TRIM`.
- Joins, other than semi-joins as described above.

Additionally, some Druid features are not supported by the SQL language. Some unsupported Druid features include:

- [Multi-value dimensions](multi-value-dimensions.html).
- [Query-time lookups](lookups.html).
- Extensions, including [approximate histograms](../development/extensions-core/approximate-histograms.html) and
[DataSketches](../development/extensions-core/datasketches-aggregators.html).
- [DataSketches](../development/extensions-core/datasketches-aggregators.html).

## Third-party SQL libraries

A number of third parties have also released SQL libraries for Druid. Links to popular options can be found on
our [libraries](/libraries.html) page. These libraries make native Druid JSON queries and do not use Druid's SQL layer.

## INFORMATION_SCHEMA tables

Druid metadata is queryable through "INFORMATION_SCHEMA" tables described below.

### SCHEMATA table

|Column|Notes|
|------|-----|
|CATALOG_NAME|Unused|
|SCHEMA_NAME||
|SCHEMA_OWNER|Unused|
|DEFAULT_CHARACTER_SET_CATALOG|Unused|
|DEFAULT_CHARACTER_SET_SCHEMA|Unused|
|DEFAULT_CHARACTER_SET_NAME|Unused|
|SQL_PATH|Unused|

### TABLES table

|Column|Notes|
|------|-----|
|TABLE_CATALOG|Unused|
|TABLE_SCHEMA||
|TABLE_NAME||
|TABLE_TYPE|"TABLE" or "SYSTEM_TABLE"|

### COLUMNS table

|Column|Notes|
|------|-----|
|TABLE_CATALOG|Unused|
|TABLE_SCHEMA||
|TABLE_NAME||
|COLUMN_NAME||
|ORDINAL_POSITION||
|COLUMN_DEFAULT|Unused|
|IS_NULLABLE||
|DATA_TYPE||
|CHARACTER_MAXIMUM_LENGTH|Unused|
|CHARACTER_OCTET_LENGTH|Unused|
|NUMERIC_PRECISION||
|NUMERIC_PRECISION_RADIX||
|NUMERIC_SCALE||
|DATETIME_PRECISION||
|CHARACTER_SET_NAME||
|COLLATION_NAME||
|JDBC_TYPE|Type code from java.sql.Types (Druid extension)|
13 changes: 13 additions & 0 deletions extensions-core/histogram/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
Expand All @@ -46,6 +52,13 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import io.druid.query.aggregation.histogram.sql.QuantileSqlAggregator;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.guice.SqlBindings;

import java.util.List;

Expand Down Expand Up @@ -56,5 +58,10 @@ public void configure(Binder binder)
if (ComplexMetrics.getSerdeForType("approximateHistogram") == null) {
ComplexMetrics.registerSerde("approximateHistogram", new ApproximateHistogramFoldingSerde());
}

if (binder != null) {
// Binder is null in some tests.
SqlBindings.addAggregator(binder, QuantileSqlAggregator.class);
}
}
}
Loading

0 comments on commit d51f5e0

Please sign in to comment.