Skip to content

Commit

Permalink
Monomorphic processing of TopN queries with 1 and 2 aggregators (key …
Browse files Browse the repository at this point in the history
…part of apache#3798) (apache#3889)

* Monomorphic processing: add HotLoopCallee, CalledFromHotLoop, RuntimeShapeInspector, SpecializationService. Specialize topN queries with 1 or 2 aggregators. Add Cursor.advanceUninterruptibly() and isDoneOrInterrupted() for exception-free query processing.

* Use Execs.singleThreaded()

* RuntimeShapeInspector to support nullable fields

* Make CalledFromHotLoop annotation Inherited

* Remove unnecessary conversion of array of ColumnSelectorPluses to list and back to array in CardinalityAggregatorFactory

* Close InputStream in SpecializationService

* Formatting

* Test specialized PooledTopNScanners

* Set flags in PooledTopNAlgorithm directly

* Fix tests, dependent on CountAggragatorFactory toString() form

* Fix

* Revert CountAggregatorFactory changes

* Implement inspectRuntimeShape() for LongWrappingDimensionSelector and FloatWrappingDimensionSelector

* Remove duplicate RoaringBitmap dependency in the extendedset pom.xml

* Fix

* Treat ByteBuffers specially in StringRuntimeShape

* Doc fix

* Annotate BufferAggregator.init() with CalledFromHotLoop

* Make triggerSpecializationIterationsThreshold an int

* Remove SpecializationService.PerPrototypeClassState.of()

* Add comments

* Limit the amount of specializations that SpecializationService could make

* Add default implementation for BufferAggregator.inspectRuntimeShape(), for compatibility with extensions

* Use more efficient ConcurrentMap's idioms in SpecializationService
  • Loading branch information
leventov authored and himanshug committed Mar 17, 2017
1 parent 3ec1877 commit 84fe91b
Show file tree
Hide file tree
Showing 104 changed files with 2,497 additions and 580 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
DimensionSelector selector = makeDimensionSelector(columnFactory);
if (selector == null) {
return new EmptyDistinctCountBufferAggregator();
return EmptyDistinctCountBufferAggregator.instance();
} else {
return new DistinctCountBufferAggregator(makeDimensionSelector(columnFactory));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.bitmap.WrappedRoaringBitmap;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class DistinctCountBufferAggregator implements BufferAggregator
{
private final DimensionSelector selector;
private final Map<Integer, MutableBitmap> mutableBitmapCollection = new HashMap<>();
private final Int2ObjectMap<MutableBitmap> mutableBitmapCollection = new Int2ObjectOpenHashMap<>();

public DistinctCountBufferAggregator(
DimensionSelector selector
Expand Down Expand Up @@ -89,4 +90,10 @@ public void close()
{
mutableBitmapCollection.clear();
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@
package io.druid.query.aggregation.distinctcount;

import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;

import java.nio.ByteBuffer;

public class EmptyDistinctCountBufferAggregator implements BufferAggregator
/**
* The difference from {@link io.druid.query.aggregation.NoopBufferAggregator} is that
* EmptyDistinctCountBufferAggregator returns 0 instead of null from {@link #get(ByteBuffer, int)}.
*/
public final class EmptyDistinctCountBufferAggregator implements BufferAggregator
{
private static final EmptyDistinctCountBufferAggregator INSTANCE = new EmptyDistinctCountBufferAggregator();

static EmptyDistinctCountBufferAggregator instance()
{
return INSTANCE;
}

public EmptyDistinctCountBufferAggregator()
private EmptyDistinctCountBufferAggregator()
{
}

Expand Down Expand Up @@ -62,4 +73,9 @@ public long getLong(ByteBuffer buf, int position)
public void close()
{
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.query.aggregation;

import io.druid.data.input.impl.TimestampSpec;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -79,7 +80,14 @@ public long getLong(ByteBuffer buf, int position)
}

@Override
public void close() {
public void close()
{
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("comparator", comparator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@
package io.druid.query.aggregation.datasketches.theta;

import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;

import java.nio.ByteBuffer;

public class EmptySketchBufferAggregator implements BufferAggregator
public final class EmptySketchBufferAggregator implements BufferAggregator
{
public EmptySketchBufferAggregator()
private static final EmptySketchBufferAggregator INSTANCE = new EmptySketchBufferAggregator();

public static EmptySketchBufferAggregator instance()
{
return INSTANCE;
}

private EmptySketchBufferAggregator()
{
}

Expand Down Expand Up @@ -61,4 +69,9 @@ public long getLong(ByteBuffer buf, int position)
public void close()
{
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new EmptySketchBufferAggregator();
return EmptySketchBufferAggregator.instance();
} else {
return new SketchBufferAggregator(selector, size, getMaxIntermediateSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -113,4 +114,9 @@ public void close()
unions.clear();
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.query.aggregation.histogram;

import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -99,4 +100,10 @@ public void close()
{
// no resources to cleanup
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.query.aggregation.histogram;

import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -102,4 +103,10 @@ public void close()
{
// no resources to cleanup
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;

import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;
Expand Down Expand Up @@ -95,7 +95,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopAggregator();
return NoopAggregator.instance();
}

if ("float".equalsIgnoreCase(inputType)) {
Expand All @@ -115,7 +115,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopBufferAggregator();
return NoopBufferAggregator.instance();
}
if ("float".equalsIgnoreCase(inputType)) {
return new VarianceBufferAggregator.FloatVarianceAggregator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
Expand Down Expand Up @@ -102,6 +103,12 @@ public void aggregate(ByteBuffer buf, int position)
buf.putDouble(position + NVARIANCE_OFFSET, variance);
}
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

public static final class LongVarianceAggregator extends VarianceBufferAggregator
Expand All @@ -128,6 +135,12 @@ public void aggregate(ByteBuffer buf, int position)
buf.putDouble(position + NVARIANCE_OFFSET, variance);
}
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

public static final class ObjectVarianceAggregator extends VarianceBufferAggregator
Expand Down Expand Up @@ -167,5 +180,11 @@ public void aggregate(ByteBuffer buf, int position)
buf.putDouble(position + SUM_OFFSET, sum);
buf.putDouble(position + NVARIANCE_OFFSET, nvariance);
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
package io.druid.query.aggregation.variance;

import com.google.common.collect.Lists;

import io.druid.java.util.common.Pair;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TestFloatColumnSelector;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -142,7 +141,7 @@ public void testVariance()
}
}

private static class FloatHandOver implements FloatColumnSelector
private static class FloatHandOver extends TestFloatColumnSelector
{
float v;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class VarianceTopNQueryTest
@Parameterized.Parameters(name="{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return TopNQueryRunnerTest.constructorFeeder();
return QueryRunnerTestHelper.transformToConstructionFeeder(TopNQueryRunnerTest.queryRunners());
}

private final QueryRunner runner;
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,16 @@
<artifactId>RoaringBitmap</artifactId>
<version>0.5.18</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.2</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
<version>5.2</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
Expand Down Expand Up @@ -815,10 +825,11 @@
<version>1.0</version>
</signature>
<ignores>
<!-- Some of our code uses DirectBuffer & Cleaner directly, which are not part of
<!-- Some of our code uses sun.* classes directly, which are not part of
the JDK signature (although they are there anyway). -->
<ignore>sun.nio.ch.DirectBuffer</ignore>
<ignore>sun.misc.Cleaner</ignore>
<ignore>sun.misc.Unsafe</ignore>
</ignores>
</configuration>
</execution>
Expand Down
8 changes: 8 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
7 changes: 7 additions & 0 deletions processing/src/main/java/io/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ private static <T> boolean parseBoolean(Query<T> query, String key, boolean defa
}
}

public static void checkInterrupted()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
}

public static final String QUERYID = "queryId";
private final DataSource dataSource;
private final boolean descending;
Expand Down
Loading

0 comments on commit 84fe91b

Please sign in to comment.