Skip to content

Commit

Permalink
SQL: Resolve column type conflicts in favor of newer segments. (apach…
Browse files Browse the repository at this point in the history
…e#3930)

* SQL: Resolve column type conflicts in favor of newer segments.

Helps with schema evolution from e.g. long -> float, which is supported
on the query side.

* Take columns from highest timestamp instead of max segment id.

* Fixes and docs.
  • Loading branch information
gianm authored and jon-wei committed Feb 16, 2017
1 parent 16ef513 commit ca6053d
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 40 deletions.
4 changes: 2 additions & 2 deletions docs/content/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ You can provide [connection context parameters](#connection-context) by adding a

### Metadata

Druid brokers cache column type metadata for each dataSource and use it to plan SQL queries. This cache is updated
on broker startup and also periodically in the background through
Druid brokers infer table and column metadata for each dataSource from segments loaded in the cluster, and use this to
plan SQL queries. This metadata is cached on broker startup and also updated periodically in the background through
[SegmentMetadata queries](../querying/segmentmetadataquery.html). Background metadata refreshing is triggered by
segments entering and exiting the cluster, and can also be throttled through configuration.

Expand Down
59 changes: 40 additions & 19 deletions sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -34,6 +33,7 @@
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.client.TimelineServerView;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -280,9 +280,9 @@ private DruidTable computeTable(final String dataSource)
new TableDataSource(dataSource),
null,
null,
true,
null,
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
ImmutableMap.<String, Object>of("useCache", false, "populateCache", false),
EnumSet.of(SegmentMetadataQuery.AnalysisType.INTERVAL),
null,
true
);
Expand All @@ -293,26 +293,47 @@ private DruidTable computeTable(final String dataSource)
return null;
}

final Map<String, ColumnAnalysis> columnMetadata = Iterables.getOnlyElement(results).getColumns();
final RowSignature.Builder rowSignature = RowSignature.builder();
final Map<String, ValueType> columnTypes = Maps.newLinkedHashMap();

for (Map.Entry<String, ColumnAnalysis> entry : columnMetadata.entrySet()) {
if (entry.getValue().isError()) {
// Ignore columns with metadata consistency errors.
continue;
}
// Resolve conflicts by taking the latest metadata. This aids in gradual schema evolution.
long maxTimestamp = JodaUtils.MIN_INSTANT;

for (SegmentAnalysis analysis : results) {
final long timestamp;

ValueType valueType;
try {
valueType = ValueType.valueOf(entry.getValue().getType().toUpperCase());
if (analysis.getIntervals() != null && analysis.getIntervals().size() > 0) {
timestamp = analysis.getIntervals().get(analysis.getIntervals().size() - 1).getEndMillis();
} else {
timestamp = JodaUtils.MIN_INSTANT;
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;

for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}

if (!columnTypes.containsKey(entry.getKey()) || timestamp >= maxTimestamp) {
ValueType valueType;
try {
valueType = ValueType.valueOf(entry.getValue().getType().toUpperCase());
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;
}

columnTypes.put(entry.getKey(), valueType);

maxTimestamp = timestamp;
}
}
}

rowSignature.add(entry.getKey(), valueType);
final RowSignature.Builder rowSignature = RowSignature.builder();
for (Map.Entry<String, ValueType> entry : columnTypes.entrySet()) {
rowSignature.add(entry.getKey(), entry.getValue());
}

return new DruidTable(
Expand Down
113 changes: 94 additions & 19 deletions sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,138 @@

package io.druid.sql.calcite.schema;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.IndexBuilder;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.calcite.jdbc.CalciteConnection;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.sql.Connection;
import java.sql.DriverManager;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class DruidSchemaTest
{
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();

public static final List<InputRow> ROWS1 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")),
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-02", "m1", "2.0", "dim1", "10.1")),
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-03", "m1", "3.0", "dim1", "2"))
);

public static final List<InputRow> ROWS2 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "4.0", "dim2", ImmutableList.of("a"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "5.0", "dim2", ImmutableList.of("abc"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0"))
);

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

private SpecificSegmentsQuerySegmentWalker walker = null;
private DruidSchema schema = null;
private Connection connection = null;

@Before
public void setUp() throws Exception
{
Calcites.setSystemProperties();
walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());

Properties props = new Properties();
props.setProperty("caseSensitive", "true");
props.setProperty("unquotedCasing", "UNCHANGED");
connection = DriverManager.getConnection("jdbc:calcite:", props);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
final File tmpDir = temporaryFolder.newFolder();
final QueryableIndex index1 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.indexMerger(TestHelper.getTestIndexMergerV9())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
}
)
.withRollup(false)
.build()
)
.rows(ROWS1)
.buildMMappedIndex();

final QueryableIndex index2 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "2"))
.indexMerger(TestHelper.getTestIndexMergerV9())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory("m1", "m1")
}
)
.withRollup(false)
.build()
)
.rows(ROWS2)
.buildMMappedIndex();

walker = new SpecificSegmentsQuerySegmentWalker(CalciteTests.queryRunnerFactoryConglomerate()).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
.interval(new Interval("2000/P1Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index1
).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
.interval(new Interval("2001/P1Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index2
).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE2)
.interval(index2.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index2
);

schema = new DruidSchema(
walker,
new TestServerInventoryView(walker.getSegments()),
PLANNER_CONFIG_DEFAULT
);

calciteConnection.getRootSchema().add("s", schema);
schema.start();
schema.awaitInitialization();
}
Expand All @@ -84,7 +160,6 @@ public void tearDown() throws Exception
{
schema.stop();
walker.close();
connection.close();
}

@Test
Expand All @@ -110,13 +185,13 @@ public void testGetTableMap()
Assert.assertEquals("dim1", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(2).getType().getSqlTypeName());

Assert.assertEquals("dim2", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(3).getType().getSqlTypeName());

Assert.assertEquals("m1", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.FLOAT, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(4).getType().getSqlTypeName());

Assert.assertEquals("unique_dim1", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName());
Assert.assertEquals("dim2", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(5).getType().getSqlTypeName());
}
}

0 comments on commit ca6053d

Please sign in to comment.