Skip to content

Commit

Permalink
Ignore chunkPeriod for groupBy v2, fix chunkPeriod for irregular peri…
Browse files Browse the repository at this point in the history
…ods. (apache#4004)

* Ignore chunkPeriod for groupBy v2, fix chunkPeriod for irregular periods.

Includes two fixes:
- groupBy v2 now ignores chunkPeriod, since it wouldn't have helped anyway (its mergeResults
returns a lazy sequence) and it generates incorrect results.
- Fix chunkPeriod handling for periods of irregular length, like "P1M" or "P1Y".

Also includes doc and test fixes:
- groupBy v1 was no longer being tested by GroupByQueryRunnerTest since apache#3953, now it
  is once again.
- chunkPeriod documentation was misleading due to its checkered past. Updated it to
  be more accurate.

* Remove unused import.

* Restore buffer size.
  • Loading branch information
gianm authored and himanshug committed Mar 6, 2017
1 parent 7b9e6c2 commit 4ca5270
Show file tree
Hide file tree
Showing 13 changed files with 410 additions and 85 deletions.
15 changes: 15 additions & 0 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ merging is always single-threaded. Because the broker merges results using the i
the full result set before returning any results. On both the data nodes and the broker, the merging index is fully
on-heap by default, but it can optionally store aggregated values off-heap.

#### Differences between v1 and v2

Query API and results are compatible between the two engines; however, there are some differences from a cluster
configuration perspective:

- groupBy v1 merges results in heap, whereas groupBy v2 merges results off-heap. As a result, optimal configuration for
your Druid nodes may involve less heap (-Xmx, -Xms) and more direct memory (-XX:MaxDirectMemorySize).
- groupBy v1 imposes no limit on the number of concurrently running queries, whereas groupBy v2 controls memory usage
by using a finite-sized merge buffer pool. By default, the number of merge buffers is 1/4 the number of processing
threads. You can adjust this as necessary to balance concurrency and memory usage.
- groupBy v1 supports caching on either the broker or historical nodes, whereas groupBy v2 only supports caching on
historical nodes.
- groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2
ignores chunkPeriod.

#### Alternatives

There are some situations where other query types may be a better choice than groupBy.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The query context is used for various query configuration parameters. The follow
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |

In addition, some query types offer context parameters specific to that query type.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,41 @@ public class ApproximateHistogramGroupByQueryTest
private GroupByQueryRunnerFactory factory;
private String testName;

@Parameterized.Parameters(name="{0}")
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
final GroupByQueryConfig defaultConfig = new GroupByQueryConfig()
final GroupByQueryConfig v1Config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V1;
}

@Override
public String toString()
{
return "default";
return "v1";
}
};
final GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig()
final GroupByQueryConfig v1SingleThreadedConfig = new GroupByQueryConfig()
{
@Override
public boolean isSingleThreaded()
{
return true;
}

@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V1;
}

@Override
public String toString()
{
return "singleThreaded";
return "v1SingleThreaded";
}
};
final GroupByQueryConfig v2Config = new GroupByQueryConfig()
Expand All @@ -93,13 +105,13 @@ public String toString()
}
};

defaultConfig.setMaxIntermediateRows(10000);
singleThreadedConfig.setMaxIntermediateRows(10000);
v1Config.setMaxIntermediateRows(10000);
v1SingleThreadedConfig.setMaxIntermediateRows(10000);

final List<Object[]> constructors = Lists.newArrayList();
final List<GroupByQueryConfig> configs = ImmutableList.of(
defaultConfig,
singleThreadedConfig,
v1Config,
v1SingleThreadedConfig,
v2Config
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

Expand All @@ -42,6 +43,8 @@
*/
public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
{
private static final DateTime EPOCH = new DateTime(0L);

private final QueryRunner<T> baseRunner;

private final QueryToolChest<T, Query<T>> toolChest;
Expand All @@ -65,7 +68,9 @@ public IntervalChunkingQueryRunner(
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final Period chunkPeriod = getChunkPeriod(query);
if (chunkPeriod.toStandardDuration().getMillis() == 0) {

// Check for non-empty chunkPeriod, avoiding toStandardDuration since that cannot handle periods like P1M.
if (EPOCH.plus(chunkPeriod).getMillis() == EPOCH.getMillis()) {
return baseRunner.run(query, responseContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,47 +316,52 @@ public TypeReference<Row> getResultTypeReference()
public QueryRunner<Row> preMergeQueryDecoration(final QueryRunner<Row> runner)
{
return new SubqueryQueryRunner<>(
intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
GroupByQuery groupByQuery = (GroupByQuery) query;
if (groupByQuery.getDimFilter() != null) {
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
}
final GroupByQuery delegateGroupByQuery = groupByQuery;
ArrayList<DimensionSpec> dimensionSpecs = new ArrayList<>();
Set<String> optimizedDimensions = ImmutableSet.copyOf(
Iterables.transform(
extractionsToRewrite(delegateGroupByQuery),
new Function<DimensionSpec, String>()
{
@Override
public String apply(DimensionSpec input)
{
return input.getDimension();
}
}
)
);
for (DimensionSpec dimensionSpec : delegateGroupByQuery.getDimensions()) {
if (optimizedDimensions.contains(dimensionSpec.getDimension())) {
dimensionSpecs.add(
new DefaultDimensionSpec(dimensionSpec.getDimension(), dimensionSpec.getOutputName())
);
} else {
dimensionSpecs.add(dimensionSpec);
}
}
return runner.run(
delegateGroupByQuery.withDimensionSpecs(dimensionSpecs),
responseContext
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
GroupByQuery groupByQuery = (GroupByQuery) query;
if (groupByQuery.getDimFilter() != null) {
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
}
final GroupByQuery delegateGroupByQuery = groupByQuery;
ArrayList<DimensionSpec> dimensionSpecs = new ArrayList<>();
Set<String> optimizedDimensions = ImmutableSet.copyOf(
Iterables.transform(
extractionsToRewrite(delegateGroupByQuery),
new Function<DimensionSpec, String>()
{
@Override
public String apply(DimensionSpec input)
{
return input.getDimension();
}
}
)
);
for (DimensionSpec dimensionSpec : delegateGroupByQuery.getDimensions()) {
if (optimizedDimensions.contains(dimensionSpec.getDimension())) {
dimensionSpecs.add(
new DefaultDimensionSpec(dimensionSpec.getDimension(), dimensionSpec.getOutputName())
);
} else {
dimensionSpecs.add(dimensionSpec);
}
}, this
)
}

return strategySelector.strategize(delegateGroupByQuery)
.createIntervalChunkingRunner(
intervalChunkingQueryRunnerDecorator,
runner,
GroupByQueryQueryToolChest.this
)
.run(
delegateGroupByQuery.withDimensionSpecs(dimensionSpecs),
responseContext
);
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.data.input.Row;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.segment.StorageAdapter;

Expand All @@ -51,6 +53,15 @@ public interface GroupByStrategy
*/
boolean isCacheable(boolean willMergeRunners);

/**
* Decorate a runner with an interval chunking decorator.
*/
QueryRunner<Row> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
final QueryRunner<Row> runner,
final GroupByQueryQueryToolChest toolChest
);

Sequence<Row> mergeResults(
QueryRunner<Row> baseRunner,
GroupByQuery query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.GroupByMergedQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -90,6 +91,16 @@ public boolean isCacheable(boolean willMergeRunners)
return true;
}

@Override
public QueryRunner<Row> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
final QueryRunner<Row> runner,
final GroupByQueryQueryToolChest toolChest
)
{
return decorator.decorate(runner, toolChest);
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.druid.query.DataSource;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.InsufficientResourcesException;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryDataSource;
Expand All @@ -56,6 +57,7 @@
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
Expand Down Expand Up @@ -177,6 +179,21 @@ public boolean isCacheable(boolean willMergeRunners)
return willMergeRunners;
}

@Override
public QueryRunner<Row> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
final QueryRunner<Row> runner,
final GroupByQueryQueryToolChest toolChest
)
{
// No chunkPeriod-based interval chunking for groupBy v2.
// 1) It concats query chunks for consecutive intervals, which won't generate correct results.
// 2) Merging instead of concating isn't a good idea, since it requires all chunks to run simultaneously,
// which may take more resources than the cluster has.
// See also https://github.com/druid-io/druid/pull/4004
return runner;
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,23 @@ public void testChunking() {

EasyMock.replay(executors);
EasyMock.replay(toolChest);


QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(query, Collections.EMPTY_MAP);

EasyMock.verify(executors);
}

@Test
public void testChunkingOnMonths() {
Query query = queryBuilder.intervals("2015-01-01T00:00:00.000/2015-02-11T00:00:00.000").context(ImmutableMap.<String, Object>of("chunkPeriod", "P1M")).build();

executors.execute(EasyMock.anyObject(Runnable.class));
EasyMock.expectLastCall().times(2);

EasyMock.replay(executors);
EasyMock.replay(toolChest);

QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(query, Collections.EMPTY_MAP);

Expand Down
12 changes: 12 additions & 0 deletions processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.core.NoopEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.java.util.common.UOE;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
Expand Down Expand Up @@ -574,6 +577,15 @@ public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
};
}

public static IntervalChunkingQueryRunnerDecorator sameThreadIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(
MoreExecutors.sameThreadExecutor(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new ServiceEmitter("dummy", "dummy", new NoopEmitter())
);
}

public static Map<String, Object> of(Object... keyvalues)
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
Expand Down Expand Up @@ -588,23 +587,6 @@ private List readQueryResultArrayFromString(String str) throws Exception
return result;
}

public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest) {
return new QueryRunner<T>() {
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return delegate.run(query, responseContext);
}
};
}
};
}

public ObjectMapper getObjectMapper()
{
return mapper;
Expand Down
Loading

0 comments on commit 4ca5270

Please sign in to comment.