Skip to content

Commit

Permalink
Support limit push down for GroupBy (apache#3873)
Browse files Browse the repository at this point in the history
* Support limit push down for GroupBy V2

* Use orderBy spec ordering when applying limit push down

* PR Comments

* Remove unused var

* Checkstyle fixes

* Fix test

* Add comment on non-final variables, fix checkstyle

* Address PR comments

* PR comments

* Remove unnecessary buffer reset

* Fix missing @JsonProperty annotation
  • Loading branch information
jon-wei authored Jun 2, 2017
1 parent 6daddf9 commit b90c28e
Show file tree
Hide file tree
Showing 23 changed files with 3,969 additions and 455 deletions.
1 change: 1 addition & 0 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ When using the "v2" strategy, the following query context parameters apply:
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|

When using the "v1" strategy, the following query context parameters apply:

Expand Down
266 changes: 265 additions & 1 deletion processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -53,6 +54,9 @@
import io.druid.query.groupby.orderby.LimitSpec;
import io.druid.query.groupby.orderby.NoopLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.query.ordering.StringComparator;
import io.druid.query.ordering.StringComparators;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumn;
Expand All @@ -65,6 +69,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -97,6 +102,8 @@ public static Builder builder()
private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs;

private final Function<Sequence<Row>, Sequence<Row>> limitFn;
private final boolean applyLimitPushDown;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;

@JsonCreator
Expand Down Expand Up @@ -190,6 +197,45 @@ private GroupByQuery(
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);

this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn();

// Check if limit push down configuration is valid and check if limit push down will be applied
this.applyLimitPushDown = determineApplyLimitPushDown();

// On an inner query, we may sometimes get a LimitSpec so that row orderings can be determined for limit push down
// However, it's not necessary to build the real limitFn from it at this stage.
Function<Sequence<Row>, Sequence<Row>> postProcFn;
if (getContextBoolean(GroupByStrategyV2.CTX_KEY_OUTERMOST, true)) {
postProcFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
} else {
postProcFn = NoopLimitSpec.INSTANCE.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
}

if (havingSpec != null) {
postProcFn = Functions.compose(
postProcFn,
new Function<Sequence<Row>, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Sequence<Row> input)
{
GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
return Sequences.filter(
input,
new Predicate<Row>()
{
@Override
public boolean apply(Row input)
{
return GroupByQuery.this.havingSpec.eval(input);
}
}
);
}
}
);
}

limitFn = postProcFn;
}

@JsonProperty
Expand Down Expand Up @@ -264,6 +310,12 @@ public boolean getContextSortByDimsFirst()
return getContextBoolean(CTX_KEY_SORT_BY_DIMS_FIRST, false);
}

@JsonIgnore
public boolean isApplyLimitPushDown()
{
return applyLimitPushDown;
}

@Override
public Ordering getResultOrdering()
{
Expand All @@ -281,10 +333,177 @@ public Ordering getResultOrdering()
);
}

public Ordering<Row> getRowOrdering(final boolean granular)
private boolean validateAndGetForceLimitPushDown()
{
final boolean forcePushDown = getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, false);
if (forcePushDown) {
if (!(limitSpec instanceof DefaultLimitSpec)) {
throw new IAE("When forcing limit push down, a limit spec must be provided.");
}

if (((DefaultLimitSpec) limitSpec).getLimit() == Integer.MAX_VALUE) {
throw new IAE("When forcing limit push down, the provided limit spec must have a limit.");
}

for (OrderByColumnSpec orderBySpec : ((DefaultLimitSpec) limitSpec).getColumns()) {
if (OrderByColumnSpec.getPostAggIndexForOrderBy(orderBySpec, postAggregatorSpecs) > -1) {
throw new UnsupportedOperationException("Limit push down when sorting by a post aggregator is not supported.");
}
}
}
return forcePushDown;
}

public boolean determineApplyLimitPushDown()
{
final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();

if (limitSpec instanceof DefaultLimitSpec) {
DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec;

// If only applying an orderby without a limit, don't try to push down
if (defaultLimitSpec.getLimit() == Integer.MAX_VALUE) {
return false;
}

if (forceLimitPushDown) {
return true;
}

// If the sorting order only uses columns in the grouping key, we can always push the limit down
// to the buffer grouper without affecting result accuracy
boolean sortHasNonGroupingFields = DefaultLimitSpec.sortingOrderHasNonGroupingFields(
(DefaultLimitSpec) limitSpec,
getDimensions()
);

return !sortHasNonGroupingFields;
}

return false;
}

/**
* When limit push down is applied, the partial results would be sorted by the ordering specified by the
* limit/order spec (unlike non-push down case where the results always use the default natural ascending order),
* so when merging these partial result streams, the merge needs to use the same ordering to get correct results.
*/
private Ordering<Row> getRowOrderingForPushDown(
final boolean granular,
final DefaultLimitSpec limitSpec
)
{
final boolean sortByDimsFirst = getContextSortByDimsFirst();

final List<String> orderedFieldNames = new ArrayList<>();
final Set<Integer> dimsInOrderBy = new HashSet<>();
final List<Boolean> needsReverseList = new ArrayList<>();
final List<Boolean> isNumericField = new ArrayList<>();
final List<StringComparator> comparators = new ArrayList<>();

for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
DimensionSpec dim = dimensions.get(dimIndex);
orderedFieldNames.add(dim.getOutputName());
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ValueType type = dimensions.get(dimIndex).getOutputType();
isNumericField.add(type == ValueType.LONG || type == ValueType.FLOAT);
comparators.add(orderSpec.getDimensionComparator());
}
}

for (int i = 0; i < dimensions.size(); i++) {
if (!dimsInOrderBy.contains(i)) {
orderedFieldNames.add(dimensions.get(i).getOutputName());
needsReverseList.add(false);
final ValueType type = dimensions.get(i).getOutputType();
isNumericField.add(type == ValueType.LONG || type == ValueType.FLOAT);
comparators.add(StringComparators.LEXICOGRAPHIC);
}
}

final Comparator<Row> timeComparator = getTimeComparator(granular);

if (timeComparator == null) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
comparators,
lhs,
rhs
);
}
}
);
} else if (sortByDimsFirst) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
final int cmp = compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
comparators,
lhs,
rhs
);
if (cmp != 0) {
return cmp;
}

return timeComparator.compare(lhs, rhs);
}
}
);
} else {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
final int timeCompare = timeComparator.compare(lhs, rhs);

if (timeCompare != 0) {
return timeCompare;
}

return compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
comparators,
lhs,
rhs
);
}
}
);
}
}

public Ordering<Row> getRowOrdering(final boolean granular)
{
if (applyLimitPushDown) {
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) {
return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec);
}
}

final boolean sortByDimsFirst = getContextSortByDimsFirst();
final Comparator<Row> timeComparator = getTimeComparator(granular);

if (timeComparator == null) {
Expand Down Expand Up @@ -357,6 +576,51 @@ private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row rhs)
return 0;
}

private static int compareDimsForLimitPushDown(
final List<String> fields,
final List<Boolean> needsReverseList,
final List<Boolean> isNumericField,
final List<StringComparator> comparators,
Row lhs,
Row rhs
)
{
for (int i = 0; i < fields.size(); i++) {
final String fieldName = fields.get(i);
final StringComparator comparator = comparators.get(i);

final int dimCompare;

Object lhsObj;
Object rhsObj;
if (needsReverseList.get(i)) {
lhsObj = rhs.getRaw(fieldName);
rhsObj = lhs.getRaw(fieldName);
} else {
lhsObj = lhs.getRaw(fieldName);
rhsObj = rhs.getRaw(fieldName);
}

if (isNumericField.get(i)) {
if (comparator == StringComparators.NUMERIC) {
dimCompare = NATURAL_NULLS_FIRST.compare(
rhs.getRaw(fieldName),
lhs.getRaw(fieldName)
);
} else {
dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
}
} else {
dimCompare = comparator.compare((String) lhsObj, (String) rhsObj);
}

if (dimCompare != 0) {
return dimCompare;
}
}
return 0;
}

/**
* Apply the havingSpec and limitSpec. Because havingSpecs are not thread safe, and because they are applied during
* accumulation of the returned sequence, callers must take care to avoid accumulating two different Sequences
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class GroupByQueryConfig
{
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
Expand Down Expand Up @@ -66,6 +67,12 @@ public class GroupByQueryConfig
// Max on-disk temporary storage, per-query; when exceeded, the query fails
private long maxOnDiskStorage = 0L;

@JsonProperty
private boolean forcePushDownLimit = false;

@JsonProperty
private Class<? extends GroupByQueryMetricsFactory> queryMetricsFactory;

public String getDefaultStrategy()
{
return defaultStrategy;
Expand Down Expand Up @@ -126,6 +133,21 @@ public long getMaxOnDiskStorage()
return maxOnDiskStorage;
}

public boolean isForcePushDownLimit()
{
return forcePushDownLimit;
}

public Class<? extends GroupByQueryMetricsFactory> getQueryMetricsFactory()
{
return queryMetricsFactory != null ? queryMetricsFactory : DefaultGroupByQueryMetricsFactory.class;
}

public void setQueryMetricsFactory(Class<? extends GroupByQueryMetricsFactory> queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}

public GroupByQueryConfig withOverrides(final GroupByQuery query)
{
final GroupByQueryConfig newConfig = new GroupByQueryConfig();
Expand Down Expand Up @@ -159,6 +181,7 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
((Number) query.getContextValue(CTX_KEY_MAX_MERGING_DICTIONARY_SIZE, getMaxMergingDictionarySize())).longValue(),
getMaxMergingDictionarySize()
);
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
return newConfig;
}
}
Loading

0 comments on commit b90c28e

Please sign in to comment.