Skip to content

Commit

Permalink
Use DimensionSpec in CardinalityAggregatorFactory (apache#3406)
Browse files Browse the repository at this point in the history
* Use DimensionSpec in CardinalityAggregatorFactory

* Address PR comments

* Fix requiredFields()
  • Loading branch information
jon-wei authored and gianm committed Aug 30, 2016
1 parent 4c2b8d2 commit 4e91330
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 50 deletions.
27 changes: 24 additions & 3 deletions docs/content/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ instead of the cardinality aggregator if you do not care about the individual va
{
"type": "cardinality",
"name": "<output_name>",
"fieldNames": [ <dimension1>, <dimension2>, ... ],
"fields": [ <dimension1>, <dimension2>, ... ],
"byRow": <false | true> # (optional, defaults to false)
}
```

Each individual element of the "fields" list can be a String or [DimensionSpec](../querying/dimensionspecs.html). A String dimension in the fields list is equivalent to a DefaultDimensionSpec (no transformations).

#### Cardinality by value

When setting `byRow` to `false` (the default) it computes the cardinality of the set composed of the union of all dimension values for all the given dimensions.
Expand Down Expand Up @@ -171,7 +173,7 @@ Determine the number of distinct countries people are living in or have come fro
{
"type": "cardinality",
"name": "distinct_countries",
"fieldNames": [ "coutry_of_origin", "country_of_residence" ]
"fields": [ "country_of_origin", "country_of_residence" ]
}
```

Expand All @@ -181,11 +183,30 @@ Determine the number of distinct people (i.e. combinations of first and last nam
{
"type": "cardinality",
"name": "distinct_people",
"fieldNames": [ "first_name", "last_name" ],
"fields": [ "first_name", "last_name" ],
"byRow" : true
}
```

Determine the number of distinct starting characters of last names

```json
{
"type": "cardinality",
"name": "distinct_last_name_first_char",
"fields": [
{
"type" : "extraction",
"dimension" : "last_name",
"outputName" : "last_name_first_char",
"extractionFn" : { "type" : "substring", "index" : 0, "length" : 1 }
}
],
"byRow" : true
}
```


### HyperUnique aggregator

Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension that has been aggregated as a "hyperUnique" metric at indexing time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.StringUtils;
Expand All @@ -35,17 +36,53 @@
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import org.apache.commons.codec.binary.Base64;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

public class CardinalityAggregatorFactory extends AggregatorFactory
{
private static List<String> makeRequiredFieldNamesFromFields(List<DimensionSpec> fields)
{
return ImmutableList.copyOf(
Lists.transform(
fields,
new Function<DimensionSpec, String>()
{
@Override
public String apply(DimensionSpec input)
{
return input.getDimension();
}
}
)
);
}

private static List<DimensionSpec> makeFieldsFromFieldNames(List<String> fieldNames)
{
return ImmutableList.copyOf(
Lists.transform(
fieldNames,
new Function<String, DimensionSpec>()
{
@Override
public DimensionSpec apply(String input)
{
return new DefaultDimensionSpec(input, input);
}
}
)
);
}

public static Object estimateCardinality(Object object)
{
if (object == null) {
Expand All @@ -56,23 +93,43 @@ public static Object estimateCardinality(Object object)
}

private static final byte CACHE_TYPE_ID = (byte) 0x8;
private static final byte CACHE_KEY_SEPARATOR = (byte) 0xFF;

private final String name;
private final List<String> fieldNames;
private final List<DimensionSpec> fields;
private final boolean byRow;

@JsonCreator
public CardinalityAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldNames") final List<String> fieldNames,
@Deprecated @JsonProperty("fieldNames") final List<String> fieldNames,
@JsonProperty("fields") final List<DimensionSpec> fields,
@JsonProperty("byRow") final boolean byRow
)
{
this.name = name;
this.fieldNames = fieldNames;
// 'fieldNames' is deprecated, since CardinalityAggregatorFactory now accepts DimensionSpecs instead of Strings.
// The old 'fieldNames' is still supported for backwards compatibility, but the user is not allowed to specify both
// 'fields' and 'fieldNames'.
if (fields == null) {
Preconditions.checkArgument(fieldNames != null, "Must provide 'fieldNames' if 'fields' is null.");
this.fields = makeFieldsFromFieldNames(fieldNames);
} else {
Preconditions.checkArgument(fieldNames == null, "Cannot specify both 'fieldNames' and 'fields.");
this.fields = fields;
}
this.byRow = byRow;
}

public CardinalityAggregatorFactory(
String name,
final List<DimensionSpec> fields,
final boolean byRow
)
{
this(name, null, fields, byRow);
}

@Override
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
{
Expand Down Expand Up @@ -103,13 +160,12 @@ private List<DimensionSelector> makeDimensionSelectors(final ColumnSelectorFacto
return Lists.newArrayList(
Iterables.filter(
Iterables.transform(
fieldNames, new Function<String, DimensionSelector>()
fields, new Function<DimensionSpec, DimensionSelector>()
{
@Nullable
@Override
public DimensionSelector apply(@Nullable String input)
public DimensionSelector apply(DimensionSpec input)
{
return columnFactory.makeDimensionSelector(new DefaultDimensionSpec(input, input));
return columnFactory.makeDimensionSelector(input);
}
}
), Predicates.notNull()
Expand Down Expand Up @@ -158,13 +214,13 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre
public List<AggregatorFactory> getRequiredColumns()
{
return Lists.transform(
fieldNames,
new Function<String, AggregatorFactory>()
fields,
new Function<DimensionSpec, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(String input)
public AggregatorFactory apply(DimensionSpec input)
{
return new CardinalityAggregatorFactory(input, fieldNames, byRow);
return new CardinalityAggregatorFactory(input.getOutputName(), Collections.singletonList(input), byRow);
}
}
);
Expand Down Expand Up @@ -206,13 +262,13 @@ public String getName()
@Override
public List<String> requiredFields()
{
return fieldNames;
return makeRequiredFieldNamesFromFields(fields);
}

@JsonProperty
public List<String> getFieldNames()
public List<DimensionSpec> getFields()
{
return fieldNames;
return fields;
}

@JsonProperty
Expand All @@ -224,13 +280,22 @@ public boolean isByRow()
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(Joiner.on("\u0001").join(fieldNames));
List<byte[]> dimSpecKeys = new ArrayList<>();
int dimSpecKeysLength = fields.size();
for (DimensionSpec dimSpec : fields) {
byte[] dimSpecKey = dimSpec.getCacheKey();
dimSpecKeysLength += dimSpecKey.length;
dimSpecKeys.add(dimSpec.getCacheKey());
}

return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)(byRow ? 1 : 0))
.array();
ByteBuffer retBuf = ByteBuffer.allocate(2 + dimSpecKeysLength);
retBuf.put(CACHE_TYPE_ID);
for (byte[] dimSpecKey : dimSpecKeys) {
retBuf.put(dimSpecKey);
retBuf.put(CACHE_KEY_SEPARATOR);
}
retBuf.put((byte) (byRow ? 1 : 0));
return retBuf.array();
}

@Override
Expand Down Expand Up @@ -263,25 +328,22 @@ public boolean equals(Object o)

CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;

if (byRow != that.byRow) {
return false;
}
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) {
if (isByRow() != that.isByRow()) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
if (!getName().equals(that.getName())) {
return false;
}
return getFields().equals(that.getFields());

return true;
}

@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (byRow ? 1 : 0);
int result = getName().hashCode();
result = 31 * result + getFields().hashCode();
result = 31 * result + (isByRow() ? 1 : 0);
return result;
}

Expand All @@ -290,7 +352,7 @@ public String toString()
{
return "CardinalityAggregatorFactory{" +
"name='" + name + '\'' +
", fieldNames='" + fieldNames + '\'' +
", fields='" + fields + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentSpec;
Expand Down Expand Up @@ -158,7 +160,7 @@ public TableDataSource apply(@Nullable String input)
);
public static final CardinalityAggregatorFactory qualityCardinality = new CardinalityAggregatorFactory(
"cardinality",
Arrays.asList("quality"),
Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("quality", "quality")),
false
);
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.DimensionSelector;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -75,7 +77,7 @@ public String[] apply(Integer input)


final CardinalityAggregatorTest.TestDimensionSelector dim1 =
new CardinalityAggregatorTest.TestDimensionSelector(values);
new CardinalityAggregatorTest.TestDimensionSelector(values, null);

selectorList = Lists.newArrayList(
(DimensionSelector) dim1
Expand All @@ -88,7 +90,7 @@ public String[] apply(Integer input)

CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory(
"billy",
Lists.newArrayList("dim1"),
Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("dim1", "dim1")),
byRow
);

Expand Down
Loading

0 comments on commit 4e91330

Please sign in to comment.