Skip to content

Commit

Permalink
consolidate different theta sketch representations into SketchHolder (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshug authored and fjy committed Nov 11, 2016
1 parent 52a74cf commit ddc0789
Show file tree
Hide file tree
Showing 18 changed files with 342 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void reset()
@Override
public Object get()
{
return SketchOperations.EMPTY_SKETCH;
return SketchHolder.EMPTY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void aggregate(ByteBuffer buf, int position)
@Override
public Object get(ByteBuffer buf, int position)
{
return SketchOperations.EMPTY_SKETCH;
return SketchHolder.EMPTY;
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@
package io.druid.query.aggregation.datasketches.theta;

import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;

import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;

import java.util.List;

public class SketchAggregator implements Aggregator
{
private static final Logger logger = new Logger(SketchAggregator.class);

private final ObjectColumnSelector selector;

private Union union;
Expand Down Expand Up @@ -71,7 +65,7 @@ public Object get()
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return union.getResult(true, null);
return SketchHolder.of(union.getResult(true, null));
}

@Override
Expand Down Expand Up @@ -100,12 +94,8 @@ public void close()

static void updateUnion(Union union, Object update)
{
if (update instanceof Memory) {
union.update((Memory) update);
} else if (update instanceof Sketch) {
union.update((Sketch) update);
} else if (update instanceof Union) {
union.update(((Union) update).getResult(false, null));
if (update instanceof SketchHolder) {
((SketchHolder) update).updateUnion(union);
} else if (update instanceof String) {
union.update((String) update);
} else if (update instanceof byte[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Ints;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
Expand All @@ -51,19 +44,6 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
protected final int size;
private final byte cacheId;

public static final Comparator<Object> COMPARATOR = Ordering.from(
new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
Sketch s1 = SketchAggregatorFactory.toSketch(o1);
Sketch s2 = SketchAggregatorFactory.toSketch(o2);
return Doubles.compare(s1.getEstimate(), s2.getEstimate());
}
}
).nullsFirst();

public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId)
{
this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Expand Down Expand Up @@ -102,48 +82,19 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
@Override
public Object deserialize(Object object)
{
return SketchOperations.deserialize(object);
return SketchHolder.deserialize(object);
}

@Override
public Comparator<Object> getComparator()
{
return COMPARATOR;
return SketchHolder.COMPARATOR;
}

@Override
public Object combine(Object lhs, Object rhs)
{
final Union union;
if (lhs instanceof Union) {
union = (Union) lhs;
updateUnion(union, rhs);
} else if (rhs instanceof Union) {
union = (Union) rhs;
updateUnion(union, lhs);
} else {
union = (Union) SetOperation.builder().build(size, Family.UNION);
updateUnion(union, lhs);
updateUnion(union, rhs);
}


return union;
}

private void updateUnion(Union union, Object obj)
{
if (obj == null) {
return;
} else if (obj instanceof Memory) {
union.update((Memory) obj);
} else if (obj instanceof Sketch) {
union.update((Sketch) obj);
} else if (obj instanceof Union) {
union.update(((Union) obj).getResult(false, null));
} else {
throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName());
}
return SketchHolder.combine(lhs, rhs, size);
}

@Override
Expand Down Expand Up @@ -188,17 +139,6 @@ public byte[] getCacheKey()
.array();
}

public final static Sketch toSketch(Object obj)
{
if (obj instanceof Sketch) {
return (Sketch) obj;
} else if (obj instanceof Union) {
return ((Union) obj).getResult(true, null);
} else {
throw new IAE("Can't convert to Sketch object [%s]", obj.getClass());
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union;

import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;

Expand All @@ -36,8 +34,6 @@

public class SketchBufferAggregator implements BufferAggregator
{
private static final Logger logger = new Logger(SketchAggregator.class);

private final ObjectColumnSelector selector;
private final int size;
private final int maxIntermediateSize;
Expand Down Expand Up @@ -84,7 +80,7 @@ public Object get(ByteBuffer buf, int position)
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return getUnion(buf, position).getResult(true, null);
return SketchHolder.of(getUnion(buf, position).getResult(true, null));
}

//Note that this is not threadsafe and I don't think it needs to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
import java.util.Map;
import java.util.Set;

public class SketchEstimatePostAggregator implements PostAggregator
{

Expand Down Expand Up @@ -82,16 +80,11 @@ public int compare(Object o1, Object o2)
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
Sketch sketch = SketchAggregatorFactory.toSketch(field.compute(combinedAggregators));
SketchHolder holder = (SketchHolder)field.compute(combinedAggregators);
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
errorBoundsStdDev);
return result;
return holder.getEstimateWithErrorBounds(errorBoundsStdDev);
} else {
return sketch.getEstimate();
return holder.getEstimate();
}
}

Expand Down
Loading

0 comments on commit ddc0789

Please sign in to comment.