Skip to content

Commit

Permalink
Calcite upgrade 1.35 (apache#14510)
Browse files Browse the repository at this point in the history
* Update to Calcite 1.35.0
* Update from.ftl for Calcite 1.35.0.
* Fixed tests in Calcite upgrade by doing the following:
1. Added a new rule, CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS, to Base rules
2. Refactored the CorrelateUnnestRule
3. Updated CorrelateUnnestRel accordingly
4. Fixed a case with selector filters on the left where Calcite was eliding the virtual column
5. Additional test cases for fixes in 2,3,4
6. Update to StringListAggregator to fail a query if separators are not propagated appropriately
* Refactored for testcases to pass after the upgrade, introduced 2 new data sources for handling filters and select projects
* Added a literalSqlAggregator as the upgraded Calcite involved changes to subquery remove rule. This corrected plans for 2 queries with joins and subqueries by replacing an useless literal dimension with a post agg. Additionally a test with COUNT DISTINCT and FILTER which was failing with Calcite 1.21 is added here which passes with 1.35
* Updated to latest avatica and updated code as SqlUnknownTimeStamp is now used in Calcite which needs to be resolved to a timestamp literal
* Added a wrapper segment ref to use for unnest and filter segment reference
  • Loading branch information
somu-imply authored Aug 11, 2023
1 parent c8a1170 commit afe2290
Show file tree
Hide file tree
Showing 106 changed files with 3,444 additions and 2,098 deletions.
55 changes: 39 additions & 16 deletions docs/api-reference/sql-jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,44 @@ sidebar_label: SQL JDBC driver
> This document describes the SQL language.

You can make [Druid SQL](../querying/sql.md) queries using the [Avatica JDBC driver](https://calcite.apache.org/avatica/downloads/). We recommend using Avatica JDBC driver version 1.17.0 or later. Note that as of the time of this writing, Avatica 1.17.0, the latest version, does not support passing connection string parameters from the URL to Druid, so you must pass them using a `Properties` object. Once you've downloaded the Avatica client jar, add it to your classpath and use the connect string `jdbc:avatica:remote:url=http://BROKER:8082/druid/v2/sql/avatica/`.
You can make [Druid SQL](../querying/sql.md) queries using the [Avatica JDBC driver](https://calcite.apache.org/avatica/downloads/).
We recommend using Avatica JDBC driver version 1.22.0 or later.
Once you've downloaded the Avatica client jar, add it to your classpath.

When using the JDBC connector for the [examples](#examples) or in general, it's helpful to understand the parts of the connect string stored in the `url` variable:
Example connection string:

- `jdbc:avatica:remote:url=` is prepended to the hostname and port.
- The hostname and port number for your Druid deployment depends on whether you want to connect to the Router or a specific Broker. For more information, see [Connection stickiness](#connection-stickiness). In the case of the quickstart deployment, the hostname and port are `http://localhost:8888`, which connects to the Router running on your local machine.
- The SQL endpoint in Druid for the Avatica driver is `/druid/v2/sql/avatica/`.
```
jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnect=true
```

Or, to use the protobuf protocol instead of JSON:

```
jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica-protobuf/;transparent_reconnect=true;serialization=protobuf
```

Example code:
The `url` is the `/druid/v2/sql/avatica/` endpoint on the Router, which routes JDBC connections to a consistent Broker.
For more information, see [Connection stickiness](#connection-stickiness).

Set `transparent_reconnect` to `true` so your connection is not interrupted if the pool of Brokers changes membership,
or if a Broker is restarted.

Set `serialization` to `protobuf` if using the protobuf endpoint.

Note that as of the time of this writing, Avatica 1.23.0, the latest version, does not support passing
[connection context parameters](../querying/sql-query-context.md) from the JDBC connection string to Druid. These context parameters
must be passed using a `Properties` object instead. Refer to the Java code below for an example.

Example Java code:

```java
// Connect to /druid/v2/sql/avatica/ on your Broker.
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/";
String url = "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnect=true";

// Set any connection context parameters you need here
// Or leave empty for default behavior.
// Set any connection context parameters you need here.
// Any property from https://druid.apache.org/docs/latest/querying/sql-query-context.html can go here.
Properties connectionProperties = new Properties();
connectionProperties.setProperty("sqlTimeZone", "Etc/UTC");

try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (
Expand All @@ -62,7 +83,7 @@ For a runnable example that includes a query that you might run, see [Examples](
It is also possible to use a protocol buffers JDBC connection with Druid, this offer reduced bloat and potential performance
improvements for larger result sets. To use it apply the following connection URL instead, everything else remains the same
```
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica-protobuf/;serialization=protobuf";
String url = "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica-protobuf/;transparent_reconnect=true;serialization=protobuf";
```

> The protobuf endpoint is also known to work with the official [Golang Avatica driver](https://github.com/apache/calcite-avatica-go)
Expand Down Expand Up @@ -130,11 +151,12 @@ public class JdbcListColumns {
{
// Connect to /druid/v2/sql/avatica/ on your Router.
// You can connect to a Broker but must configure connection stickiness if you do.
String url = "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/";
String url = "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnect=true";

String query = "SELECT COLUMN_NAME,* FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'wikipedia' and TABLE_SCHEMA='druid'";
// Set any connection context parameters you need here
// Or leave empty for default behavior.

// Set any connection context parameters you need here.
// Any property from https://druid.apache.org/docs/latest/querying/sql-query-context.html can go here.
Properties connectionProperties = new Properties();

try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
Expand Down Expand Up @@ -169,12 +191,13 @@ public class JdbcCountryAndTime {
{
// Connect to /druid/v2/sql/avatica/ on your Router.
// You can connect to a Broker but must configure connection stickiness if you do.
String url = "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/";
String url = "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnect=true";

//The query you want to run.
String query = "SELECT __time, isRobot, countryName, comment FROM wikipedia WHERE countryName='Japan'";
// Set any connection context parameters you need here
// Or leave empty for default behavior.

// Set any connection context parameters you need here.
// Any property from https://druid.apache.org/docs/latest/querying/sql-query-context.html can go here.
Properties connectionProperties = new Properties();
connectionProperties.setProperty("sqlTimeZone", "America/Los_Angeles");

Expand Down
4 changes: 2 additions & 2 deletions docs/querying/sql-aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
|`ARRAY_AGG(DISTINCT expr, [size])`|Collects all distinct values of `expr` into an ARRAY, including null values, with `size` in bytes limit on aggregation size (default of 1024 bytes) per aggregate. If the aggregated array grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `ARRAY_AGG` expression is not currently supported, and the ordering of results will be based on the default for the element type.|`null`|
|`ARRAY_CONCAT_AGG(expr, [size])`|Concatenates all array `expr` into a single ARRAY, with `size` in bytes limit on aggregation size (default of 1024 bytes). Input `expr` _must_ be an array. Null `expr` will be ignored, but any null values within an `expr` _will_ be included in the resulting array. If the aggregated array grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `ARRAY_CONCAT_AGG` expression is not currently supported, and the ordering of results within the output array may vary depending on processing order.|`null`|
|`ARRAY_CONCAT_AGG(DISTINCT expr, [size])`|Concatenates all distinct values of all array `expr` into a single ARRAY, with `size` in bytes limit on aggregation size (default of 1024 bytes) per aggregate. Input `expr` _must_ be an array. Null `expr` will be ignored, but any null values within an `expr` _will_ be included in the resulting array. If the aggregated array grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `ARRAY_CONCAT_AGG` expression is not currently supported, and the ordering of results will be based on the default for the element type.|`null`|
|`STRING_AGG(expr, separator, [size])`|Collects all values of `expr` into a single STRING, ignoring null values. Each value is joined by the `separator` which must be a literal STRING. An optional `size` in bytes can be supplied to limit aggregation size (default of 1024 bytes). If the aggregated string grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `STRING_AGG` expression is not currently supported, and the ordering of results within the output string may vary depending on processing order.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`STRING_AGG(DISTINCT expr, separator, [size])`|Collects all distinct values of `expr` into a single STRING, ignoring null values. Each value is joined by the `separator` which must be a literal STRING. An optional `size` in bytes can be supplied to limit aggregation size (default of 1024 bytes). If the aggregated string grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `STRING_AGG` expression is not currently supported, and the ordering of results will be based on the default `STRING` ordering.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`STRING_AGG([DISTINCT] expr, [separator, [size]])`|Collects all values (or all distinct values) of `expr` into a single STRING, ignoring null values. Each value is joined by an optional `separator`, which must be a literal STRING. If the `separator` is not provided, strings are concatenated without a separator.<br /><br />An optional `size` in bytes can be supplied to limit aggregation size (default of 1024 bytes). If the aggregated string grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `STRING_AGG` expression is not currently supported, and the ordering of results within the output string may vary depending on processing order.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LISTAGG([DISTINCT] expr, [separator, [size]])`|Synonym for `STRING_AGG`.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`BIT_AND(expr)`|Performs a bitwise AND operation on all input values.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`BIT_OR(expr)`|Performs a bitwise OR operation on all input values.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`BIT_XOR(expr)`|Performs a bitwise XOR operation on all input values.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator
private static final String NAME = "APPROX_QUANTILE_DS";
private static final SqlAggFunction FUNCTION_INSTANCE =
OperatorConversions.aggregatorBuilder(NAME)
.operandNames("column", "probability", "k")
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.EXACT_NUMERIC)
.returnTypeNonNull(SqlTypeName.DOUBLE)
.requiredOperandCount(2)
.literalOperands(1, 2)
.functionCategory(SqlFunctionCategory.NUMERIC)
.build();
.operandNames("column", "probability", "k")
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.EXACT_NUMERIC)
.returnTypeNonNull(SqlTypeName.DOUBLE)
.requiredOperandCount(2)
.literalOperands(1, 2)
.functionCategory(SqlFunctionCategory.NUMERIC)
.build();

@Override
public SqlAggFunction calciteFunction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.BasicOperandTypeChecker;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
Expand Down Expand Up @@ -143,7 +144,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail

// Verify that 'operand' is a literal number.
if (!SqlUtil.isLiteral(operand)) {
return OperatorConversions.throwOrReturn(
return BasicOperandTypeChecker.throwOrReturn(
throwOnFailure,
callBinding,
cb -> cb.getValidator()
Expand All @@ -155,7 +156,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail
}

if (!SqlTypeFamily.NUMERIC.contains(operandType)) {
return OperatorConversions.throwOrReturn(
return BasicOperandTypeChecker.throwOrReturn(
throwOnFailure,
callBinding,
SqlCallBinding::newValidationSignatureError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,12 @@ public void testQuantileOnInnerQuery()
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("_a0:sum", "a0"),
new CountAggregatorFactory("_a0:count"),
NullHandling.replaceWithDefault() ?
new CountAggregatorFactory("_a0:count") :
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
notNull("a0")
),
new DoublesSketchAggregatorFactory(
"_a1:agg",
"a0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,12 @@ public void testQuantileOnInnerQuery()
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("_a0:sum", "a0"),
new CountAggregatorFactory("_a0:count"),
NullHandling.replaceWithDefault() ?
new CountAggregatorFactory("_a0:count") :
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
notNull("a0")
),
new FixedBucketsHistogramAggregatorFactory(
"_a1:agg",
"a0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,12 @@ public void testQuantileOnInnerQuery()
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("_a0:sum", "a0"),
new CountAggregatorFactory("_a0:count"),
NullHandling.replaceWithDefault() ?
new CountAggregatorFactory("_a0:count") :
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
notNull("a0")
),
new ApproximateHistogramAggregatorFactory(
"_a1:agg",
"a0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ public void testReplaceIncorrectSyntax()
.setExpectedDataSource("foo1")
.setQueryContext(context)
.setExpectedValidationErrorMatcher(invalidSqlContains(
"Missing time chunk information in OVERWRITE clause for REPLACE. "
+ "Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table."
"Incorrect syntax near the keyword 'OVERWRITE'"
))
.verifyPlanningErrors();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2051,26 +2051,32 @@ public void testJoinUsesDifferentAlgorithm()
new QueryDataSource(
GroupByQuery.builder()
.setDataSource("foo")
.setVirtualColumns(expressionVirtualColumn("v0", "1", ColumnType.LONG))
.setDimensions(
new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT),
new DefaultDimensionSpec("v0", "d1", ColumnType.LONG)
new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT)
)
.setContext(queryContext)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.setPostAggregatorSpecs(
ImmutableList.of(new ExpressionPostAggregator(
"a0",
"1",
null, ExprMacroTable.nil()
)
)
)
.build()

),
"j0.",
"(floor(100) == \"j0.d0\")",
"(CAST(floor(100), 'DOUBLE') == \"j0.d0\")",
JoinType.LEFT
)
)
.setAggregatorSpecs(
new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
isNull("j0.d1"),
isNull("j0.a0"),
"a0"
)
)
Expand Down
Loading

0 comments on commit afe2290

Please sign in to comment.