Skip to content

Commit

Permalink
Support filtering on long columns (including __time) (apache#3180)
Browse files Browse the repository at this point in the history
* Support filtering on __time column

* Rename DruidPredicate

* Add docs for ValueMatcherFactory, add comment on getColumnCapabilities

* Combine ValueMatcherFactory predicate methods to accept DruidCompositePredicate

* Address PR comments (support filter on all long columns)

* Use predicate factory instead of composite predicate

* Address PR comments

* Lazily initialize long handling in selector/in filter

* Move long value parsing from InFilter to InDimFilter, make long value parsing thread-safe

* Add multithreaded selector/in filter test

* Fix non-final lock object in SelectorDimFilter
  • Loading branch information
jon-wei authored and fjy committed Jul 21, 2016
1 parent cd7337f commit a42ccb6
Show file tree
Hide file tree
Showing 34 changed files with 2,091 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.metamx.collections.bitmap.RoaringBitmapFactory;
import com.metamx.collections.spatial.ImmutableRTree;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.GenericIndexed;
Expand Down Expand Up @@ -63,16 +65,35 @@ public class DimensionPredicateFilterBenchmark

private static final DimensionPredicateFilter IS_EVEN = new DimensionPredicateFilter(
"foo",
new Predicate<String>()
new DruidPredicateFactory()
{
@Override
public boolean apply(String input)
public Predicate<String> makeStringPredicate()
{
if (input == null) {
return false;
}
return new Predicate<String>()
{
@Override
public boolean apply(String input)
{
if (input == null) {
return false;
}
return Integer.parseInt(input.toString()) % 2 == 0;
}
};
}

return Integer.parseInt(input) % 2 == 0;
@Override
public DruidLongPredicate makeLongPredicate()
{
return new DruidLongPredicate()
{
@Override
public boolean applyLong(long input)
{
return false;
}
};
}
},
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
import io.druid.query.extraction.JavaScriptExtractionFn;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
import io.druid.query.filter.Filter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.SelectorDimFilter;
Expand All @@ -56,9 +59,11 @@
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.AndFilter;
import io.druid.segment.filter.BoundFilter;
import io.druid.segment.filter.DimensionPredicateFilter;
import io.druid.segment.filter.Filters;
import io.druid.segment.filter.OrFilter;
Expand All @@ -67,6 +72,7 @@
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -109,6 +115,10 @@ public class FilterPartitionBenchmark
private QueryableIndex qIndex;
private File indexFile;

private Filter timeFilterNone;
private Filter timeFilterHalf;
private Filter timeFilterAll;

private BenchmarkSchemaInfo schemaInfo;

private static String JS_FN = "function(str) { return 'super-' + str; }";
Expand Down Expand Up @@ -168,6 +178,38 @@ public void setup() throws IOException
new IndexSpec()
);
qIndex = INDEX_IO.loadIndex(indexFile);

Interval interval = schemaInfo.getDataInterval();
timeFilterNone = new BoundFilter(new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(Long.MAX_VALUE),
String.valueOf(Long.MAX_VALUE),
true,
true,
true,
null
));

long halfEnd = (interval.getEndMillis() + interval.getStartMillis()) / 2;
timeFilterHalf = new BoundFilter(new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(interval.getStartMillis()),
String.valueOf(halfEnd),
true,
true,
true,
null
));

timeFilterAll = new BoundFilter(new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(interval.getStartMillis()),
String.valueOf(interval.getEndMillis()),
true,
true,
true,
null
));
}

private IncrementalIndex makeIncIndex()
Expand Down Expand Up @@ -215,6 +257,51 @@ public void longRead(Blackhole blackhole) throws Exception
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeFilterNone(Blackhole blackhole) throws Exception
{
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
Sequence<Cursor> cursors = sa.makeCursors(timeFilterNone, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);

Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
for (Long st : strings) {
blackhole.consume(st);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeFilterHalf(Blackhole blackhole) throws Exception
{
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
Sequence<Cursor> cursors = sa.makeCursors(timeFilterHalf, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);

Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
for (Long st : strings) {
blackhole.consume(st);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeFilterAll(Blackhole blackhole) throws Exception
{
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
Sequence<Cursor> cursors = sa.makeCursors(timeFilterAll, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);

Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
for (Long st : strings) {
blackhole.consume(st);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -442,7 +529,6 @@ private Sequence<List<Long>> readCursorsLong(Sequence<Cursor> cursors, final Bla
public List<Long> apply(Cursor input)
{
List<Long> longvals = new ArrayList<Long>();

LongColumnSelector selector = input.makeLongColumnSelector("sumLongSequential");
while (!input.isDone()) {
long rowval = selector.get();
Expand Down Expand Up @@ -476,11 +562,11 @@ private class NoBitmapDimensionPredicateFilter extends DimensionPredicateFilter
{
public NoBitmapDimensionPredicateFilter(
final String dimension,
final Predicate<String> predicate,
final DruidPredicateFactory predicateFactory,
final ExtractionFn extractionFn
)
{
super(dimension, predicate, extractionFn);
super(dimension, predicateFactory, extractionFn);
}

@Override
Expand Down Expand Up @@ -510,21 +596,37 @@ public Filter toFilter()
return new NoBitmapSelectorFilter(dimension, value);
} else {
final String valueOrNull = Strings.emptyToNull(value);
final Predicate<String> predicate = new Predicate<String>()

final DruidPredicateFactory predicateFactory = new DruidPredicateFactory()
{
@Override
public boolean apply(String input)
public Predicate<String> makeStringPredicate()
{
return Objects.equals(valueOrNull, input);
return new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return Objects.equals(valueOrNull, input);
}
};
}

@Override
public String toString()
public DruidLongPredicate makeLongPredicate()
{
return value;
return new DruidLongPredicate()
{
@Override
public boolean applyLong(long input)
{
return false;
}
};
}
};
return new NoBitmapDimensionPredicateFilter(dimension, predicate, extractionFn);

return new NoBitmapDimensionPredicateFilter(dimension, predicateFactory, extractionFn);
}
}
}
Expand Down
Loading

0 comments on commit a42ccb6

Please sign in to comment.