Skip to content

Commit

Permalink
Finalize fields in postaggs (apache#3957)
Browse files Browse the repository at this point in the history
* initial commits for finalizeFieldAccess apache#2433

* fix some bugs to run a query

* change name of method Queries.verifyAggregations to Queries.prepareAggregations

* add Uts

* fix Ut failures

* rebased to master

* address comments and add a Ut for arithmetic post aggregators

* rebased to the master

* address the comment of injection within arithmetic post aggregator

* address comments and introduce decorate() in the PostAggregator interface.

* Address comments. 1. Implements getComparator in FinalizingFieldAccessPostAggregator and add Uts for it 2. Some minor changes like renaming a method name.

* Fix a code style mismatch.

* Rebased to the master
  • Loading branch information
gianm authored Feb 22, 2017
1 parent a47206e commit 985203b
Show file tree
Hide file tree
Showing 30 changed files with 639 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -97,6 +98,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public PostAggregator getField()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Sets;
import com.yahoo.sketches.Util;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -93,6 +94,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public String getFunc()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -65,6 +67,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(bucketSize, offset);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float getBucketSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -61,6 +63,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(breaks);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float[] getBreaks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -62,6 +64,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(numBuckets);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public int getNumBuckets()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -70,6 +72,12 @@ public Object compute(Map<String, Object> values)
return ah.getMax();
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -70,6 +72,12 @@ public Object compute(Map<String, Object> values)
return ah.getMin();
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -81,6 +83,12 @@ public Object compute(Map<String, Object> values)
return ah.getQuantiles(new float[]{probability})[0];
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float getProbability()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -74,6 +76,12 @@ public Object compute(Map<String, Object> values)
return new Quantiles(probabilities, ah.getQuantiles(probabilities), ah.getMin(), ah.getMax());
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float[] getProbabilities()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
Expand Down Expand Up @@ -82,6 +83,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("fieldName")
public String getFieldName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.query.aggregation.post.DoubleLeastPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
import io.druid.query.aggregation.post.LongGreatestPostAggregator;
import io.druid.query.aggregation.post.LongLeastPostAggregator;
Expand Down Expand Up @@ -103,6 +104,7 @@ public static interface AggregatorFactoryMixin
@JsonSubTypes.Type(name = "expression", value = ExpressionPostAggregator.class),
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "finalizingFieldAccess", value = FinalizingFieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class),
Expand Down
34 changes: 28 additions & 6 deletions processing/src/main/java/io/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,64 @@
package io.druid.query;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
*/
public class Queries
{
public static void verifyAggregations(
public static List<PostAggregator> decoratePostAggregators(List<PostAggregator> postAggs,
Map<String, AggregatorFactory> aggFactories)
{
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (PostAggregator aggregator : postAggs) {
decorated.add(aggregator.decorate(aggFactories));
}
return decorated;
}

public static List<PostAggregator> prepareAggregations(
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
Preconditions.checkNotNull(aggFactories, "aggregations cannot be null");

final Set<String> aggNames = Sets.newHashSet();
final Map<String, AggregatorFactory> aggsFactoryMap = Maps.newHashMap();
for (AggregatorFactory aggFactory : aggFactories) {
Preconditions.checkArgument(aggNames.add(aggFactory.getName()), "[%s] already defined", aggFactory.getName());
Preconditions.checkArgument(!aggsFactoryMap.containsKey(aggFactory.getName()),
"[%s] already defined", aggFactory.getName());
aggsFactoryMap.put(aggFactory.getName(), aggFactory);
}

if (postAggs != null && !postAggs.isEmpty()) {
final Set<String> combinedAggNames = Sets.newHashSet(aggNames);
final Set<String> combinedAggNames = Sets.newHashSet(aggsFactoryMap.keySet());

for (PostAggregator postAgg : postAggs) {
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (final PostAggregator postAgg : postAggs) {
final Set<String> dependencies = postAgg.getDependentFields();
final Set<String> missing = Sets.difference(dependencies, combinedAggNames);

Preconditions.checkArgument(
missing.isEmpty(),
"Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
);
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName());
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()),
"[%s] already defined", postAgg.getName());

decorated.add(postAgg.decorate(aggsFactoryMap));
}
return decorated;
}

return postAggs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,13 @@ public interface PostAggregator extends Cacheable
Object compute(Map<String, Object> combinedAggregators);

String getName();

/**
* Returns a richer post aggregator which are built from the given aggregators with their names and some accessible
* environmental variables such as ones in the object scope.
*
* @param aggregators A map of aggregator factories with their names.
*
*/
PostAggregator decorate(Map<String, AggregatorFactory> aggregators);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -86,6 +87,12 @@ public String getName()
return name;
}

@Override
public HyperUniqueFinalizingPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("fieldName")
public String getFieldName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -53,6 +55,7 @@ public int compare(Object o, Object o1)
private final Ops op;
private final Comparator comparator;
private final String ordering;
private Map<String, AggregatorFactory> aggFactoryMap;

public ArithmeticPostAggregator(
String name,
Expand Down Expand Up @@ -124,6 +127,12 @@ public String getName()
return name;
}

@Override
public ArithmeticPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new ArithmeticPostAggregator(name, fnName, Queries.decoratePostAggregators(fields, aggregators), ordering);
}

@Override
public byte[] getCacheKey()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -80,6 +81,12 @@ public String getName()
return name;
}

@Override
public ConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("value")
public Number getConstantValue()
{
Expand Down
Loading

0 comments on commit 985203b

Please sign in to comment.