Skip to content

Commit

Permalink
SQL: Add context and contextual functions to planner. (apache#3919)
Browse files Browse the repository at this point in the history
* SQL: Add context and contextual functions to planner.

Added support for context parameters specified as JDBC connection properties
or a JSON object for SQL-over-JSON-over-HTTP.

Also added features that depend on context functionality:

- Added CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP functions.
- Added support for time zones other than UTC via a "timeZone" context.
- Pass down query context to Druid queries too.

Also some bug fixes:

- Fix DATE handling, it was largely done incorrectly before.
- Fix CAST(__time TO DATE) which should do a floor-to-day.
- Fix non-equality comparisons to FLOOR(__time TO X).
- Fix maxQueryCount property.

* Pass down context to nested queries too.
  • Loading branch information
gianm authored and jon-wei committed Feb 15, 2017
1 parent 3c54fc9 commit 16ef513
Show file tree
Hide file tree
Showing 45 changed files with 1,936 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@
import io.druid.segment.column.ValueType;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.RowSignature;
import io.druid.sql.calcite.util.CalciteTests;
Expand All @@ -61,7 +61,6 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.tools.Planner;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -157,7 +156,6 @@ public void setup() throws Exception
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
new QueryMaker(walker, plannerConfig),
new TableDataSource("foo"),
RowSignature.builder()
.add("__time", ValueType.LONG)
Expand All @@ -177,6 +175,7 @@ protected Map<String, Table> getTableMap()
};
plannerFactory = new PlannerFactory(
Calcites.createRootSchema(druidSchema),
walker,
CalciteTests.createOperatorTable(),
plannerConfig
);
Expand Down Expand Up @@ -233,8 +232,8 @@ public void queryNative(Blackhole blackhole) throws Exception
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryPlanner(Blackhole blackhole) throws Exception
{
try (final Planner planner = plannerFactory.createPlanner()) {
final PlannerResult plannerResult = Calcites.plan(planner, sqlQuery);
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
final PlannerResult plannerResult = planner.plan(sqlQuery);
final ArrayList<Object[]> results = Sequences.toList(plannerResult.run(), Lists.<Object[]>newArrayList());
blackhole.consume(results);
}
Expand Down
22 changes: 18 additions & 4 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ layout: doc_page
Query Context
=============

The query context is used for various query configuration parameters.
The query context is used for various query configuration parameters. The following parameters apply to all queries.

|property |default | description |
|-----------------|---------------------|----------------------|
Expand All @@ -17,7 +17,21 @@ The query context is used for various query configuration parameters.
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |

In addition, some query types offer context parameters specific to that query type.

### TopN queries

|property |default | description |
|-----------------|---------------------|----------------------|
|minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. |
|`maxResults`|500000|Maximum number of results groupBy query can process. Default value used can be changed by `druid.query.groupBy.maxResults` in druid configuration at broker and historical nodes. At query time you can only lower the value.|
|`maxIntermediateRows`|50000|Maximum number of intermediate rows while processing single segment for groupBy query. Default value used can be changed by `druid.query.groupBy.maxIntermediateRows` in druid configuration at broker and historical nodes. At query time you can only lower the value.|
|`groupByIsSingleThreaded`|false|Whether to run single threaded group By queries. Default value used can be changed by `druid.query.groupBy.singleThreaded` in druid configuration at historical nodes.|

### Timeseries queries

|property |default | description |
|-----------------|---------------------|----------------------|
|skipEmptyBuckets | `false` | Disable timeseries zero-filling behavior, so only buckets with results will be returned. |

### GroupBy queries

See [GroupBy query context](groupbyquery.html#query-context).
56 changes: 48 additions & 8 deletions docs/content/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,24 @@ jdbc:avatica:remote:url=http://BROKER:8082/druid/v2/sql/avatica/
Example code:

```java
Connection connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/");
ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source");
while (resultSet.next()) {
// Do something
// Connect to /druid/v2/sql/avatica/ on your broker.
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/";

// Set any connection context parameters you need here (see "Connection context" below).
// Or leave empty for default behavior.
Properties connectionProperties = new Properties();

try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source")) {
while (resultSet.next()) {
// Do something
}
}
}
```

Table metadata is available over JDBC using `connection.getMetaData()`.
Table metadata is available over JDBC using `connection.getMetaData()` or by querying the "INFORMATION_SCHEMA" tables
(see below).

Parameterized queries don't work properly, so avoid those.

Expand All @@ -61,6 +71,17 @@ curl -XPOST -H'Content-Type: application/json' http://BROKER:8082/druid/v2/sql/

Metadata is only available over the HTTP API by querying the "INFORMATION_SCHEMA" tables (see below).

You can provide [connection context parameters](#connection-context) by adding a "context" map, like:

```json
{
"query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar' AND __time > TIMESTAMP '2000-01-01 00:00:00'",
"context" : {
"sqlTimeZone" : "America/Los_Angeles"
}
}
```

### Metadata

Druid brokers cache column type metadata for each dataSource and use it to plan SQL queries. This cache is updated
Expand All @@ -77,7 +98,7 @@ SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE SCHEMA_NAME = 'druid' AND TABLE_N

See the [INFORMATION_SCHEMA tables](#information_schema-tables) section below for details on the available metadata.

You can also access table and column metadata through JDBC using `connection.getMetaData()`.
You can access table and column metadata through JDBC using `connection.getMetaData()`.

### Approximate queries

Expand All @@ -91,8 +112,8 @@ algorithm.
- TopN-style queries with a single grouping column, like
`SELECT col1, SUM(col2) FROM data_source GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed
as [TopN queries](topnquery.html), which use an approximate algorithm. To disable this behavior, and use exact
algorithms for topN-style queries, set
[druid.sql.planner.useApproximateTopN](../configuration/broker.html#sql-planner-configuration) to "false".
algorithms for topN-style queries, set "useApproximateTopN" to "false", either through query context or through broker
configuration.

### Time functions

Expand All @@ -101,6 +122,10 @@ Druid's SQL language supports a number of time operations, including:
- `FLOOR(__time TO <granularity>)` for grouping or filtering on time buckets, like `SELECT FLOOR(__time TO MONTH), SUM(cnt) FROM data_source GROUP BY FLOOR(__time TO MONTH)`
- `EXTRACT(<granularity> FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM data_source GROUP BY EXTRACT(HOUR FROM __time)`
- Comparisons to `TIMESTAMP '<time string>'` for time filters, like `SELECT COUNT(*) FROM data_source WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2001-01-01 00:00:00'`
- `CURRENT_TIMESTAMP` for the current time, usable in filters like `SELECT COUNT(*) FROM data_source WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR`

By default, time operations use the UTC time zone. You can change the time zone for time operations by setting the
connection context parameter "sqlTimeZone" to the name of the time zone, like "America/Los_Angeles".

### Subqueries

Expand Down Expand Up @@ -138,6 +163,21 @@ For this query, the broker will first translate the inner select on data_source_
configuration parameter `druid.sql.planner.maxSemiJoinRowsInMemory` controls the maximum number of values that will be
materialized for this kind of plan.

### Connection context

Druid's SQL layer supports a connection context that influences SQL query planning and Druid native query execution.
The parameters in the table below affect SQL planning. All other context parameters you provide will be attached to
Druid queries and can affect how they run. See [Query context](query-context.html) for details on the possible options.

|Parameter|Description|Default value|
|---------|-----------|-------------|
|`sqlTimeZone`|Sets the time zone for this connection. Should be a time zone name like "America/Los_Angeles".|UTC|
|`useApproximateCountDistinct`|Whether to use an approximate cardinalty algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the broker|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.html) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.html) will be used instead.|druid.sql.planner.useApproximateTopN on the broker|
|`useFallback`|Whether to evaluate operations on the broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|druid.sql.planner.useFallback on the broker|

Connection context can be specified as JDBC connection properties or as a "context" object in the JSON API.

### Configuration

Druid's SQL layer can be configured on the [Broker node](../configuration/broker.html#sql-planner-configuration).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.Expressions;
import io.druid.sql.calcite.expression.RowExtraction;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
Expand Down Expand Up @@ -63,13 +64,15 @@ public SqlAggFunction calciteFunction()
public Aggregation toDruidAggregation(
final String name,
final RowSignature rowSignature,
final PlannerContext plannerContext,
final List<Aggregation> existingAggregations,
final Project project,
final AggregateCall aggregateCall,
final DimFilter filter
)
{
final RowExtraction rex = Expressions.toRowExtraction(
plannerContext,
rowSignature.getRowOrder(),
Expressions.fromFieldAccess(
rowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.query.aggregation.histogram.sql;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.druid.granularity.QueryGranularities;
Expand All @@ -41,11 +42,11 @@
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.sql.calcite.CalciteQueryTest;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
Expand All @@ -55,7 +56,6 @@
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Planner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -134,7 +134,7 @@ public void setUp() throws Exception
new QuantileSqlAggregator()
)
);
plannerFactory = new PlannerFactory(rootSchema, operatorTable, plannerConfig);
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
}

@After
Expand All @@ -147,7 +147,7 @@ public void tearDown() throws Exception
@Test
public void testQuantileOnFloatAndLongs() throws Exception
{
try (final Planner planner = plannerFactory.createPlanner()) {
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
final String sql = "SELECT\n"
+ "APPROX_QUANTILE(m1, 0.01),\n"
+ "APPROX_QUANTILE(m1, 0.5, 50),\n"
Expand All @@ -159,7 +159,7 @@ public void testQuantileOnFloatAndLongs() throws Exception
+ "APPROX_QUANTILE(cnt, 0.5)\n"
+ "FROM foo";

final PlannerResult plannerResult = Calcites.plan(planner, sql);
final PlannerResult plannerResult = planner.plan(sql);

// Verify results
final List<Object[]> results = Sequences.toList(plannerResult.run(), new ArrayList<Object[]>());
Expand Down Expand Up @@ -200,7 +200,7 @@ public void testQuantileOnFloatAndLongs() throws Exception
new QuantilePostAggregator("a6", "a4:agg", 0.999f),
new QuantilePostAggregator("a7", "a7:agg", 0.50f)
))
.context(CalciteQueryTest.TIMESERIES_CONTEXT)
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
Expand All @@ -210,7 +210,7 @@ public void testQuantileOnFloatAndLongs() throws Exception
@Test
public void testQuantileOnComplexColumn() throws Exception
{
try (final Planner planner = plannerFactory.createPlanner()) {
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
final String sql = "SELECT\n"
+ "APPROX_QUANTILE(hist_m1, 0.01),\n"
+ "APPROX_QUANTILE(hist_m1, 0.5, 50),\n"
Expand All @@ -221,7 +221,7 @@ public void testQuantileOnComplexColumn() throws Exception
+ "APPROX_QUANTILE(hist_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
+ "FROM foo";

final PlannerResult plannerResult = Calcites.plan(planner, sql);
final PlannerResult plannerResult = planner.plan(sql);

// Verify results
final List<Object[]> results = Sequences.toList(plannerResult.run(), new ArrayList<Object[]>());
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testQuantileOnComplexColumn() throws Exception
new QuantilePostAggregator("a5", "a5:agg", 0.999f),
new QuantilePostAggregator("a6", "a4:agg", 0.999f)
))
.context(CalciteQueryTest.TIMESERIES_CONTEXT)
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
Expand Down
11 changes: 10 additions & 1 deletion sql/src/main/java/io/druid/sql/avatica/DruidConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.druid.sql.avatica;

import com.google.common.collect.ImmutableMap;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
Expand All @@ -28,14 +30,21 @@
*/
public class DruidConnection
{
private final Map<String, Object> context;
private final Map<Integer, DruidStatement> statements;
private Future<?> timeoutFuture;

public DruidConnection()
public DruidConnection(final Map<String, Object> context)
{
this.context = ImmutableMap.copyOf(context);
this.statements = new HashMap<>();
}

public Map<String, Object> context()
{
return context;
}

public Map<Integer, DruidStatement> statements()
{
return statements;
Expand Down
Loading

0 comments on commit 16ef513

Please sign in to comment.