Skip to content

Commit

Permalink
Merge pull request apache#1822 from himanshug/outer_query_multiple_in…
Browse files Browse the repository at this point in the history
…terval_fix

support multiple non-consecutive intervals in outer query of nested group-by
  • Loading branch information
xvrl committed Oct 13, 2015
2 parents 273ec21 + 490de1f commit febf264
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -173,14 +175,30 @@ private Sequence<Row> mergeGroupByResults(
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();
IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult);
final IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult);

//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
//is ensured by QuerySegmentSpec.
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
//and concatenate the results.
return new ResourceClosingSequence<>(
outerQuery.applyLimit(
engine.process(
outerQuery,
new IncrementalIndexStorageAdapter(
index
Sequences.concat(
Sequences.map(
Sequences.simple(outerQuery.getIntervals()),
new Function<Interval, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Interval interval)
{
return engine.process(
outerQuery.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
new IncrementalIndexStorageAdapter(index)
);
}
}
)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2398,6 +2398,72 @@ public void testIdenticalSubquery()
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testSubqueryWithMultipleIntervalsInOuterQuery()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
new Interval("2011-04-01T00:00:00.000Z/2011-04-01T23:58:00.000Z"),
new Interval("2011-04-02T00:00:00.000Z/2011-04-03T00:00:00.000Z")
)
)
)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),

GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);

// Subqueries are handled by the ToolChest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testDifferentGroupingSubquery()
{
Expand Down

0 comments on commit febf264

Please sign in to comment.