Skip to content

Commit

Permalink
Speed up filter tests with adapter cache (apache#3103)
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-wei authored and gianm committed Jun 8, 2016
1 parent 4faa298 commit 37c8a8f
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 65 deletions.
152 changes: 107 additions & 45 deletions processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
Expand Down Expand Up @@ -53,15 +52,16 @@
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -77,14 +77,31 @@ public abstract class BaseFilterTest
protected StorageAdapter adapter;
protected Closeable closeable;
protected boolean optimize;
protected final String testName;

// JUnit creates a new test instance for every test method call.
// For filter tests, the test setup creates a segment.
// Creating a new segment for every test method call is pretty slow, so cache the StorageAdapters.
// Each thread gets its own map.
protected static ThreadLocal<Map<String, Map<String, Pair<StorageAdapter, Closeable>>>> adapterCache =
new ThreadLocal<Map<String, Map<String, Pair<StorageAdapter, Closeable>>>>()
{
@Override
protected Map<String, Map<String, Pair<StorageAdapter, Closeable>>> initialValue()
{
return new HashMap<>();
}
};

public BaseFilterTest(
String testName,
List<InputRow> rows,
IndexBuilder indexBuilder,
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
boolean optimize
)
{
this.testName = testName;
this.rows = rows;
this.indexBuilder = indexBuilder;
this.finisher = finisher;
Expand All @@ -94,17 +111,37 @@ public BaseFilterTest(
@Before
public void setUp() throws Exception
{
final Pair<StorageAdapter, Closeable> pair = finisher.apply(
indexBuilder.tmpDir(temporaryFolder.newFolder()).add(rows)
);
String className = getClass().getName();
Map<String, Pair<StorageAdapter, Closeable>> adaptersForClass = adapterCache.get().get(className);
if (adaptersForClass == null) {
adaptersForClass = new HashMap<>();
adapterCache.get().put(className, adaptersForClass);
}

Pair<StorageAdapter, Closeable> pair = adaptersForClass.get(testName);
if (pair == null) {
pair = finisher.apply(
indexBuilder.tmpDir(temporaryFolder.newFolder()).add(rows)
);
adaptersForClass.put(testName, pair);
}

this.adapter = pair.lhs;
this.closeable = pair.rhs;

}

@After
public void tearDown() throws Exception
public static void tearDown(String className) throws Exception
{
closeable.close();
Map<String, Pair<StorageAdapter, Closeable>> adaptersForClass = adapterCache.get().get(className);

if (adaptersForClass != null) {
for (Map.Entry<String, Pair<StorageAdapter, Closeable>> entry : adaptersForClass.entrySet()) {
Closeable closeable = entry.getValue().rhs;
closeable.close();
}
adapterCache.get().put(className, null);
}
}

@Parameterized.Parameters(name = "{0}")
Expand Down Expand Up @@ -215,41 +252,6 @@ public void close() throws IOException
return constructors;
}

/**
* Selects elements from "selectColumn" from rows matching a filter. selectColumn must be a single valued dimension.
*/
protected List<String> selectColumnValuesMatchingFilter(final DimFilter filter, final String selectColumn)
{
final Cursor cursor = makeCursor(Filters.toFilter(maybeOptimize(filter)));
final List<String> values = Lists.newArrayList();
final DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(selectColumn, selectColumn)
);

for (; !cursor.isDone(); cursor.advance()) {
final IndexedInts row = selector.getRow();
Preconditions.checkState(row.size() == 1);
values.add(selector.lookupName(row.get(0)));
}

return values;
}

protected long selectCountUsingFilteredAggregator(final DimFilter filter)
{
final Cursor cursor = makeCursor(null);
final Aggregator agg = new FilteredAggregatorFactory(
new CountAggregatorFactory("count"),
maybeOptimize(filter)
).factorize(cursor);

for (; !cursor.isDone(); cursor.advance()) {
agg.aggregate();
}

return agg.getLong();
}

private DimFilter maybeOptimize(final DimFilter dimFilter)
{
if (dimFilter == null) {
Expand All @@ -258,14 +260,74 @@ private DimFilter maybeOptimize(final DimFilter dimFilter)
return optimize ? dimFilter.optimize() : dimFilter;
}

private Cursor makeCursor(final Filter filter)
private Sequence<Cursor> makeCursorSequence(final Filter filter)
{
final Sequence<Cursor> cursors = adapter.makeCursors(
filter,
new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT),
QueryGranularities.ALL,
false
);
return Iterables.getOnlyElement(Sequences.toList(cursors, Lists.<Cursor>newArrayList()));

return cursors;
}

/**
* Selects elements from "selectColumn" from rows matching a filter. selectColumn must be a single valued dimension.
*/
protected List<String> selectColumnValuesMatchingFilter(final DimFilter filter, final String selectColumn)
{
final Sequence<Cursor> cursors = makeCursorSequence(Filters.toFilter(maybeOptimize(filter)));
Sequence<List<String>> seq = Sequences.map(
cursors,
new Function<Cursor, List<String>>()
{
@Override
public List<String> apply(Cursor input)
{
final DimensionSelector selector = input.makeDimensionSelector(
new DefaultDimensionSpec(selectColumn, selectColumn)
);

final List<String> values = Lists.newArrayList();

while (!input.isDone()) {
IndexedInts row = selector.getRow();
Preconditions.checkState(row.size() == 1);
values.add(selector.lookupName(row.get(0)));
input.advance();
}

return values;
}
}
);
return Sequences.toList(seq, new ArrayList<List<String>>()).get(0);
}

protected long selectCountUsingFilteredAggregator(final DimFilter filter)
{
final Sequence<Cursor> cursors = makeCursorSequence(Filters.toFilter(maybeOptimize(filter)));
Sequence<Aggregator> aggSeq = Sequences.map(
cursors,
new Function<Cursor, Aggregator>()
{
@Override
public Aggregator apply(Cursor input)
{
Aggregator agg = new FilteredAggregatorFactory(
new CountAggregatorFactory("count"),
maybeOptimize(filter)
).factorize(input);

for (; !input.isDone(); input.advance()) {
agg.aggregate();
}

return agg;
}
}
);
return Sequences.toList(aggSeq, new ArrayList<Aggregator>()).get(0).getLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -74,7 +75,13 @@ public BoundFilterTest(
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}

@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(BoundFilterTest.class.getName());
}

@Test
Expand Down
19 changes: 5 additions & 14 deletions processing/src/test/java/io/druid/segment/filter/InFilterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -79,30 +80,20 @@ public class InFilterTest extends BaseFilterTest
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "f", "dim1", "abc"))
);

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
return makeConstructors();
}

public InFilterTest(
String testName,
IndexBuilder indexBuilder,
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}

@Before
public void setUp() throws IOException
@AfterClass
public static void tearDown() throws Exception
{
final Pair<StorageAdapter, Closeable> pair = finisher.apply(
indexBuilder.tmpDir(temporaryFolder.newFolder()).add(ROWS)
);
this.adapter = pair.lhs;
this.closeable = pair.rhs;
BaseFilterTest.tearDown(InFilterTest.class.getName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -80,7 +81,13 @@ public JavaScriptFilterTest(
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}

@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(JavaScriptFilterTest.class.getName());
}

private final String jsNullFilter = "function(x) { return(x === null) }";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -72,7 +73,13 @@ public NotFilterTest(
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}

@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(NotFilterTest.class.getName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -78,7 +79,13 @@ public RegexFilterTest(
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}

@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(RegexFilterTest.class.getName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -81,7 +82,13 @@ public SearchQueryFilterTest(
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}

@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(SearchQueryFilterTest.class.getName());
}

private SearchQuerySpec specForValue(String value)
Expand Down
Loading

0 comments on commit 37c8a8f

Please sign in to comment.