Skip to content

Commit

Permalink
Built-in SQL. (apache#3682)
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm authored and fjy committed Dec 17, 2016
1 parent 2bfcc8a commit dd63f54
Show file tree
Hide file tree
Showing 75 changed files with 12,132 additions and 11 deletions.
6 changes: 6 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ This product contains a modified version of Metamarkets bytebuffer-collections l
* https://github.com/metamx/bytebuffer-collections
* COMMIT TAG:
* https://github.com/metamx/bytebuffer-collections/commit/3d1e7c8

This product contains SQL query planning code adapted from Apache Calcite
* LICENSE:
* https://github.com/apache/calcite/blob/master/LICENSE (Apache License, Version 2.0)
* HOMEPAGE:
* https://calcite.apache.org/
17 changes: 17 additions & 0 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.github.wnameless</groupId>
<artifactId>json-flattener</artifactId>
Expand Down
237 changes: 237 additions & 0 deletions benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.benchmark.query;

/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.column.ValueType;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.io.File;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Benchmark that compares the same groupBy query through the native query layer and through the SQL layer.
*/
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class SqlBenchmark
{
@Param({"10000", "100000", "200000"})
private int rowsPerSegment;

private static final Logger log = new Logger(SqlBenchmark.class);
private static final int RNG_SEED = 9999;

private File tmpDir;
private SpecificSegmentsQuerySegmentWalker walker;
private CalciteConnection calciteConnection;
private GroupByQuery groupByQuery;
private String sqlQuery;

@Setup(Level.Trial)
public void setup() throws Exception
{
tmpDir = Files.createTempDir();
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", tmpDir, rowsPerSegment);

if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}

final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + 1,
schemaInfo.getDataInterval(),
rowsPerSegment
);

final List<InputRow> rows = Lists.newArrayList();
for (int i = 0; i < rowsPerSegment; i++) {
final InputRow row = dataGenerator.nextRow();
if (i % 20000 == 0) {
log.info("%,d/%,d rows generated.", i, rowsPerSegment);
}
rows.add(row);
}

log.info("%,d/%,d rows generated.", rows.size(), rowsPerSegment);

final PlannerConfig plannerConfig = new PlannerConfig();
walker = CalciteTests.createWalker(tmpDir, rows);
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
walker,
new TableDataSource("foo"),
plannerConfig,
ImmutableMap.of(
"__time", ValueType.LONG,
"dimSequential", ValueType.STRING,
"dimZipf", ValueType.STRING,
"dimUniform", ValueType.STRING
)
)
);
final Schema druidSchema = new AbstractSchema()
{
@Override
protected Map<String, Table> getTableMap()
{
return tableMap;
}
};
calciteConnection = Calcites.jdbc(druidSchema, plannerConfig);
groupByQuery = GroupByQuery
.builder()
.setDataSource("foo")
.setInterval(new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT))
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec("dimZipf", "d0"),
new DefaultDimensionSpec("dimSequential", "d1")
)
)
.setAggregatorSpecs(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("c")))
.setGranularity(QueryGranularities.ALL)
.build();

sqlQuery = "SELECT\n"
+ " dimZipf AS d0,"
+ " dimSequential AS d1,\n"
+ " COUNT(*) AS c\n"
+ "FROM druid.foo\n"
+ "GROUP BY dimZipf, dimSequential";
}

@TearDown(Level.Trial)
public void tearDown() throws Exception
{
if (walker != null) {
walker.close();
walker = null;
}

if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryNative(Blackhole blackhole) throws Exception
{
final Sequence<Row> resultSequence = groupByQuery.run(walker, Maps.<String, Object>newHashMap());
final ArrayList<Row> resultList = Sequences.toList(resultSequence, Lists.<Row>newArrayList());

for (Row row : resultList) {
blackhole.consume(row);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
{
final ResultSet resultSet = calciteConnection.createStatement().executeQuery(sqlQuery);
final ResultSetMetaData metaData = resultSet.getMetaData();

while (resultSet.next()) {
for (int i = 0; i < metaData.getColumnCount(); i++) {
blackhole.consume(resultSet.getObject(i + 1));
}
}
}
}
24 changes: 24 additions & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ See [groupBy server configuration](../querying/groupbyquery.html#server-configur
|--------|-----------|-------|
|`druid.query.segmentMetadata.defaultHistory`|When no interval is specified in the query, use a default interval of defaultHistory before the end time of the most recent segment, specified in ISO8601 format. This property also controls the duration of the default interval used by GET /druid/v2/datasources/{dataSourceName} interactions for retrieving datasource dimensions/metrics.|P1W|

#### SQL Server Configuration

The broker's [built-in SQL server](../querying/sql.html) can be configured through the following properties.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.sql.enable`|Whether to enable SQL at all, including background metadata fetching. If false, this overrides all other SQL-related properties and disables SQL metadata, serving, and planning completely.|false|
|`druid.sql.server.enableAvatica`|Whether to enable an Avatica server at `/druid/v2/sql/avatica/`.|false|
|`druid.sql.server.enableJsonOverHttp`|Whether to enable a simple JSON over HTTP route at `/druid/v2/sql/`.|true|

#### SQL Planner Configuration

The broker's [SQL planner](../querying/sql.html) can be configured through the following properties.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|`druid.sql.planner.selectPageSize`|Page size threshold for [Select queries](../querying/select-query.html). Select queries for larger resultsets will be issued back-to-back using pagination.|1000|
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinalty algorithm for `COUNT(DISTINCT foo)`.|true|
|`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.html) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.html) will be used instead.|true|
|`druid.sql.planner.useFallback`|Whether to evaluate operations on the broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false|

### Caching

You can optionally only configure caching to be enabled on the broker by setting caching configs here.
Expand Down
Loading

0 comments on commit dd63f54

Please sign in to comment.