Skip to content

Commit

Permalink
Extract HLL related code to separate module (apache#3900)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaimonPl authored and gianm committed Feb 3, 2017
1 parent 8f4394c commit 93b71e2
Show file tree
Hide file tree
Showing 28 changed files with 256 additions and 142 deletions.
86 changes: 86 additions & 0 deletions hll/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.3-SNAPSHOT</version>
</parent>

<artifactId>druid-hll</artifactId>
<name>druid-hll</name>
<description>Druid HyperLogLog implementation</description>

<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
</license>
</licenses>

<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>java-util</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

/**
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

import java.nio.ByteBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

import java.nio.ByteBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.primitives.UnsignedBytes;
Expand Down Expand Up @@ -112,6 +112,16 @@ public static HyperLogLogCollector makeCollector(ByteBuffer buffer)
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
}

/**
* Creates new collector which shares others collector buffer (by using {@link ByteBuffer#duplicate()})
*
* @param otherCollector collector which buffer will be shared
* @return collector
*/
public static HyperLogLogCollector makeCollectorSharingStorage(HyperLogLogCollector otherCollector) {
return makeCollector(otherCollector.getStorageBuffer().duplicate());
}

public static int getLatestNumBytesForDenseStorage()
{
return HLLCV1.NUM_BYTES_FOR_DENSE_STORAGE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

import com.google.caliper.Param;
import com.google.caliper.Runner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
Expand All @@ -34,7 +34,6 @@
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Random;

Expand Down Expand Up @@ -696,125 +695,6 @@ public void testHighBits() throws Exception
Assert.assertEquals(Double.MAX_VALUE, collector.estimateCardinality(), 1000);
}

@Test
public void testCompare1() throws Exception
{
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
collector1.add(fn.hashLong(0).asBytes());
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 100; i = i + 2) {
collector1.add(fn.hashLong(i).asBytes());
collector2.add(fn.hashLong(i + 1).asBytes());
Assert.assertEquals(1, comparator.compare(collector1, collector2));
Assert.assertEquals(1, Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()));
}
}

@Test
public void testCompare2() throws Exception
{
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 1000; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(50);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}

HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 1 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}

Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}

for (int i = 1; i < 100; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(500);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}

HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 2 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}

Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}

for (int i = 1; i < 10; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(100000);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}

HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 20000 + rand.nextInt(100000);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}

Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
}

@Test
public void testCompareToShouldBehaveConsistentlyWithEstimatedCardinalitiesEvenInToughCases() throws Exception {
// given
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();

for (int i = 0; i < 1000; ++i) {
// given
HyperLogLogCollector leftCollector = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(9000) + 5000;
for (int l = 0; l < j; ++l) {
leftCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}

HyperLogLogCollector rightCollector = HyperLogLogCollector.makeLatestCollector();
int k = rand.nextInt(9000) + 5000;
for (int l = 0; l < k; ++l) {
rightCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}

// when
final int orderedByCardinality = Double.compare(leftCollector.estimateCardinality(),
rightCollector.estimateCardinality());
final int orderedByComparator = comparator.compare(leftCollector, rightCollector);

// then, assert hyperloglog comparator behaves consistently with estimated cardinalities
Assert.assertEquals(
String.format("orderedByComparator=%d, orderedByCardinality=%d,\n" +
"Left={cardinality=%f, hll=%s},\n" +
"Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality,
leftCollector.estimateCardinality(), leftCollector,
rightCollector.estimateCardinality(), rightCollector),
orderedByCardinality,
orderedByComparator
);
}
}

@Test
public void testMaxOverflow() {
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;

import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.hll.HyperLogLogCollector;
import io.druid.jackson.AggregatorsModule;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Smile;
import io.druid.hll.HyperLogLogCollector;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskLock;
Expand All @@ -60,7 +61,6 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.DruidMetrics;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<module>java-util</module>
<module>bytebuffer-collections</module>
<module>extendedset</module>
<module>hll</module>
<!-- Core extensions -->
<module>extensions-core/avro-extensions</module>
<module>extensions-core/datasketches</module>
Expand Down
5 changes: 5 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-hll</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>bytebuffer-collections</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.StringUtils;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
Expand All @@ -34,7 +35,6 @@
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
Expand Down
Loading

0 comments on commit 93b71e2

Please sign in to comment.