Skip to content

Commit

Permalink
Add virtual columns to timeseries, topN, and groupBy. (apache#3941)
Browse files Browse the repository at this point in the history
* Add virtual columns to timeseries, topN, and groupBy.

* Fix GroupByTimeseriesQueryRunnerTest.

* Updates from review comments.
  • Loading branch information
gianm authored and fjy committed Feb 22, 2017
1 parent 7200dce commit 372b849
Show file tree
Hide file tree
Showing 27 changed files with 478 additions and 186 deletions.
23 changes: 21 additions & 2 deletions processing/src/main/java/io/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,20 @@ public static class TimeseriesQueryBuilder
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private boolean descending;
private VirtualColumns virtualColumns;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs;
private Map<String, Object> context;

private boolean descending;

private TimeseriesQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
descending = false;
virtualColumns = null;
dimFilter = null;
granularity = QueryGranularities.ALL;
aggregatorSpecs = Lists.newArrayList();
Expand All @@ -356,6 +358,7 @@ public TimeseriesQuery build()
dataSource,
querySegmentSpec,
descending,
virtualColumns,
dimFilter,
granularity,
aggregatorSpecs,
Expand Down Expand Up @@ -460,6 +463,22 @@ public TimeseriesQueryBuilder intervals(List<Interval> l)
return this;
}

public TimeseriesQueryBuilder virtualColumns(VirtualColumns virtualColumns)
{
this.virtualColumns = virtualColumns;
return this;
}

public TimeseriesQueryBuilder virtualColumns(List<VirtualColumn> virtualColumns)
{
return virtualColumns(VirtualColumns.create(virtualColumns));
}

public TimeseriesQueryBuilder virtualColumns(VirtualColumn... virtualColumns)
{
return virtualColumns(VirtualColumns.create(Arrays.asList(virtualColumns)));
}

public TimeseriesQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value, null);
Expand Down
105 changes: 67 additions & 38 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;
import org.joda.time.Interval;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -88,6 +92,7 @@ public static Builder builder()
return new Builder();
}

private final VirtualColumns virtualColumns;
private final LimitSpec limitSpec;
private final HavingSpec havingSpec;
private final DimFilter dimFilter;
Expand All @@ -102,6 +107,7 @@ public static Builder builder()
public GroupByQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
Expand All @@ -113,6 +119,7 @@ public GroupByQuery(
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
Expand Down Expand Up @@ -173,6 +180,7 @@ public boolean apply(Row input)
private GroupByQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
VirtualColumns virtualColumns,
DimFilter dimFilter,
QueryGranularity granularity,
List<DimensionSpec> dimensions,
Expand All @@ -186,6 +194,7 @@ private GroupByQuery(
{
super(dataSource, querySegmentSpec, false, context);

this.virtualColumns = virtualColumns;
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
Expand All @@ -196,6 +205,12 @@ private GroupByQuery(
this.limitFn = limitFn;
}

@JsonProperty
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}

@JsonProperty("filter")
public DimFilter getDimFilter()
{
Expand Down Expand Up @@ -393,6 +408,7 @@ public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride)
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
granularity,
dimensions,
Expand All @@ -411,6 +427,7 @@ public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec)
return new GroupByQuery(
getDataSource(),
spec,
virtualColumns,
dimFilter,
granularity,
dimensions,
Expand All @@ -428,6 +445,7 @@ public GroupByQuery withDimFilter(final DimFilter dimFilter)
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
getGranularity(),
getDimensions(),
Expand All @@ -446,6 +464,7 @@ public Query<Row> withDataSource(DataSource dataSource)
return new GroupByQuery(
dataSource,
getQuerySegmentSpec(),
virtualColumns,
dimFilter,
granularity,
dimensions,
Expand All @@ -463,6 +482,7 @@ public GroupByQuery withDimensionSpecs(final List<DimensionSpec> dimensionSpecs)
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
dimensionSpecs,
Expand All @@ -480,6 +500,7 @@ public GroupByQuery withLimitSpec(final LimitSpec limitSpec)
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
Expand All @@ -496,6 +517,7 @@ public GroupByQuery withAggregatorSpecs(final List<AggregatorFactory> aggregator
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
Expand All @@ -513,6 +535,7 @@ public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggre
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
virtualColumns,
getDimFilter(),
getGranularity(),
getDimensions(),
Expand Down Expand Up @@ -555,6 +578,7 @@ public static class Builder
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private VirtualColumns virtualColumns;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<DimensionSpec> dimensions;
Expand All @@ -576,6 +600,7 @@ public Builder(GroupByQuery query)
{
dataSource = query.getDataSource();
querySegmentSpec = query.getQuerySegmentSpec();
virtualColumns = query.getVirtualColumns();
limitSpec = query.getLimitSpec();
dimFilter = query.getDimFilter();
granularity = query.getGranularity();
Expand All @@ -590,6 +615,7 @@ public Builder(Builder builder)
{
dataSource = builder.dataSource;
querySegmentSpec = builder.querySegmentSpec;
virtualColumns = builder.virtualColumns;
limitSpec = builder.limitSpec;
dimFilter = builder.dimFilter;
granularity = builder.granularity;
Expand Down Expand Up @@ -640,6 +666,24 @@ public Builder setInterval(String interval)
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
}

public Builder setVirtualColumns(VirtualColumns virtualColumns)
{
this.virtualColumns = Preconditions.checkNotNull(virtualColumns, "virtualColumns");
return this;
}

public Builder setVirtualColumns(List<VirtualColumn> virtualColumns)
{
this.virtualColumns = VirtualColumns.create(virtualColumns);
return this;
}

public Builder setVirtualColumns(VirtualColumn... virtualColumns)
{
this.virtualColumns = VirtualColumns.create(Arrays.asList(virtualColumns));
return this;
}

public Builder limit(int limit)
{
ensureExplicitLimitNotSet();
Expand Down Expand Up @@ -802,6 +846,7 @@ public GroupByQuery build()
return new GroupByQuery(
dataSource,
querySegmentSpec,
virtualColumns,
dimFilter,
granularity,
dimensions,
Expand All @@ -820,6 +865,7 @@ public String toString()
return "GroupByQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", virtualColumns=" + virtualColumns +
", limitSpec=" + limitSpec +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
Expand All @@ -831,7 +877,7 @@ public String toString()
}

@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
Expand All @@ -842,47 +888,30 @@ public boolean equals(Object o)
if (!super.equals(o)) {
return false;
}

GroupByQuery that = (GroupByQuery) o;

if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) {
return false;
}
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
return false;
}
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
return false;
}
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
return false;
}
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
: that.postAggregatorSpecs != null) {
return false;
}

return true;
final GroupByQuery that = (GroupByQuery) o;
return Objects.equals(virtualColumns, that.virtualColumns) &&
Objects.equals(limitSpec, that.limitSpec) &&
Objects.equals(havingSpec, that.havingSpec) &&
Objects.equals(dimFilter, that.dimFilter) &&
Objects.equals(granularity, that.granularity) &&
Objects.equals(dimensions, that.dimensions) &&
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
}

@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (limitSpec != null ? limitSpec.hashCode() : 0);
result = 31 * result + (havingSpec != null ? havingSpec.hashCode() : 0);
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
return Objects.hash(
super.hashCode(),
virtualColumns,
limitSpec,
havingSpec,
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
Expand Down Expand Up @@ -102,7 +101,7 @@ public Sequence<Row> process(final GroupByQuery query, final StorageAdapter stor
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
filter,
intervals.get(0),
VirtualColumns.EMPTY,
query.getVirtualColumns(),
query.getGranularity(),
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ public byte[] computeCacheKey(GroupByQuery query)
.appendCacheable(query.getDimFilter())
.appendCacheablesIgnoringOrder(query.getAggregatorSpecs())
.appendCacheablesIgnoringOrder(query.getDimensions())
.appendCacheable(query.getVirtualColumns())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.VirtualColumns;
import io.druid.segment.filter.Filters;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -106,7 +105,7 @@ public static Sequence<Row> process(
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
Filters.toFilter(query.getDimFilter()),
intervals.get(0),
VirtualColumns.EMPTY,
query.getVirtualColumns(),
query.getGranularity(),
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ public static Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>>
valueTypes
);
final ThreadLocal<Row> columnSelectorRow = new ThreadLocal<>();
final ColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
columnSelectorRow,
rawInputRowSignature
final ColumnSelectorFactory columnSelectorFactory = query.getVirtualColumns().wrap(
RowBasedColumnSelectorFactory.create(
columnSelectorRow,
rawInputRowSignature
)
);
final Grouper<RowBasedKey> grouper;
if (concurrencyHint == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Sequence<Row> mergeResults(
new GroupByQuery(
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getVirtualColumns(),
query.getDimFilter(),
query.getGranularity(),
query.getDimensions(),
Expand Down
Loading

0 comments on commit 372b849

Please sign in to comment.