Skip to content

Commit

Permalink
Better groupBy error messages and docs around resource limits. (apach…
Browse files Browse the repository at this point in the history
…e#4162)

* Better groupBy error messages and docs around resource limits.

* Fix BufferGrouper test from datasketches.

* Further clarify.
  • Loading branch information
gianm authored and fjy committed Apr 13, 2017
1 parent 2e95892 commit b2954d5
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 99 deletions.
38 changes: 36 additions & 2 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ on-heap by default, but it can optionally store aggregated values off-heap.
Query API and results are compatible between the two engines; however, there are some differences from a cluster
configuration perspective:

- groupBy v1 merges results in heap, whereas groupBy v2 merges results off-heap. As a result, optimal configuration for
your Druid nodes may involve less heap (-Xmx, -Xms) and more direct memory (-XX:MaxDirectMemorySize).
- groupBy v1 controls resource usage using a row-based limit (maxResults) whereas groupBy v2 uses bytes-based limits.
In addition, groupBy v1 merges results on-heap, whereas groupBy v2 merges results off-heap. These factors mean that
memory tuning and resource limits behave differently between v1 and v2. In particular, due to this, some queries
that can complete successfully in one engine may exceed resource limits and fail with the other engine. See the
"Memory tuning and resource limits" section for more details.
- groupBy v1 imposes no limit on the number of concurrently running queries, whereas groupBy v2 controls memory usage
by using a finite-sized merge buffer pool. By default, the number of merge buffers is 1/4 the number of processing
threads. You can adjust this as necessary to balance concurrency and memory usage.
Expand All @@ -151,6 +154,37 @@ historical nodes.
- groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2
ignores chunkPeriod.

#### Memory tuning and resource limits

When using groupBy v2, three parameters control resource usage and limits:

- druid.processing.buffer.sizeBytes: size of the off-heap hash table used for aggregation, per query, in bytes. At
most druid.processing.numMergeBuffers of these will be created at once, which also serves as an upper limit on the
number of concurrently running groupBy queries.
- druid.query.groupBy.maxMergingDictionarySize: size of the on-heap dictionary used when grouping on strings, per query,
in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size.
- druid.query.groupBy.maxOnDiskStorage: amount of space on disk used for aggregation, per query, in bytes. By default,
this is 0, which means aggregation will not use disk.

If maxOnDiskStorage is 0 (the default) then a query that exceeds either the on-heap dictionary limit, or the off-heap
aggregation table limit, will fail with a "Resource limit exceeded" error describing the limit that was exceeded.

If maxOnDiskStorage is greater than 0, queries that exceed the in-memory limits will start using disk for aggregation.
In this case, when either the on-heap dictionary or off-heap hash table fills up, partially aggregated records will be
sorted and flushed to disk. Then, both in-memory structures will be cleared out for further aggregation. Queries that
then go on to exceed maxOnDiskStorage will fail with a "Resource limit exceeded" error indicating that they ran out of
disk space.

With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers).

When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed
this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster
operators should make sure that the on-heap aggregations will not exceed available JVM heap space for the expected
concurrent query load.

#### Alternatives

There are some situations where other query types may be a better choice than groupBy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public void testGrowingBufferGrouper()
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));

for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i));
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
}

updateSketch.update(3);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));

for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i));
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
}

Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,4 @@ public static Map<String, ValueType> rowSignatureFor(final GroupByQuery query)
// Don't include post-aggregators since we don't know what types they are.
return types.build();
}

/**
* Throw a {@link ResourceLimitExceededException}. Only used by groupBy v2 when accumulation resources
* are exceeded, triggered by false return from {@link io.druid.query.groupby.epinephelinae.Grouper#aggregate(Object)}.
*
* @return nothing will ever be returned; this return type is for your convenience, similar to
* Throwables.propagate in Guava.
*/
public static ResourceLimitExceededException throwAccumulationResourceLimitExceededException()
{
throw new ResourceLimitExceededException(
"Not enough resources to execute this query. Try increasing druid.query.groupBy.maxOnDiskStorage, "
+ "druid.query.groupBy.maxMergingDictionarySize, or druid.processing.buffer.sizeBytes.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.groupby.epinephelinae;

import java.util.Objects;

public class AggregateResult
{
private static final AggregateResult OK = new AggregateResult(true, null);

private final boolean ok;
private final String reason;

public static AggregateResult ok()
{
return OK;
}

public static AggregateResult failure(final String reason)
{
return new AggregateResult(false, reason);
}

private AggregateResult(final boolean ok, final String reason)
{
this.ok = ok;
this.reason = reason;
}

public boolean isOk()
{
return ok;
}

public String getReason()
{
return reason;
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AggregateResult that = (AggregateResult) o;
return ok == that.ok &&
Objects.equals(reason, that.reason);
}

@Override
public int hashCode()
{
return Objects.hash(ok, reason);
}

@Override
public String toString()
{
return "AggregateResult{" +
"ok=" + ok +
", reason='" + reason + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@
public class BufferGrouper<KeyType> implements Grouper<KeyType>
{
private static final Logger log = new Logger(BufferGrouper.class);
private static final AggregateResult DICTIONARY_FULL = AggregateResult.failure(
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
private static final AggregateResult HASHTABLE_FULL = AggregateResult.failure(
"Not enough aggregation table space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);

private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
Expand Down Expand Up @@ -146,11 +156,13 @@ public boolean isInitialized()
}

@Override
public boolean aggregate(KeyType key, int keyHash)
public AggregateResult aggregate(KeyType key, int keyHash)
{
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer == null) {
return false;
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return DICTIONARY_FULL;
}

if (keyBuffer.remaining() != keySize) {
Expand Down Expand Up @@ -178,7 +190,9 @@ public boolean aggregate(KeyType key, int keyHash)
}

if (bucket < 0) {
return false;
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return HASHTABLE_FULL;
}
}

Expand All @@ -203,11 +217,11 @@ public boolean aggregate(KeyType key, int keyHash)
aggregators[i].aggregate(tableBuffer, offset + aggregatorOffsets[i]);
}

return true;
return AggregateResult.ok();
}

@Override
public boolean aggregate(final KeyType key)
public AggregateResult aggregate(final KeyType key)
{
return aggregate(key, Groupers.hash(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public boolean isInitialized()
}

@Override
public boolean aggregate(KeyType key, int keyHash)
public AggregateResult aggregate(KeyType key, int keyHash)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
Expand All @@ -160,8 +160,8 @@ public boolean aggregate(KeyType key, int keyHash)

synchronized (hashBasedGrouper) {
if (!spilling) {
if (hashBasedGrouper.aggregate(key, keyHash)) {
return true;
if (hashBasedGrouper.aggregate(key, keyHash).isOk()) {
return AggregateResult.ok();
} else {
spilling = true;
}
Expand All @@ -179,7 +179,7 @@ public boolean aggregate(KeyType key, int keyHash)
}

@Override
public boolean aggregate(KeyType key)
public AggregateResult aggregate(KeyType key)
{
return aggregate(key, Groupers.hash(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;

import java.io.Closeable;
Expand Down Expand Up @@ -107,7 +107,7 @@ public GroupByMergingQueryRunnerV2(
}

@Override
public Sequence<Row> run(final Query queryParam, final Map responseContext)
public Sequence<Row> run(final Query<Row> queryParam, final Map<String, Object> responseContext)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
Expand Down Expand Up @@ -180,7 +180,7 @@ public CloseableGrouperIterator<RowBasedKey, Row> make()
throw new QueryInterruptedException(e);
}

Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
false,
null,
Expand All @@ -192,43 +192,46 @@ public CloseableGrouperIterator<RowBasedKey, Row> make()
combiningAggregatorFactories
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
grouper.init();

final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
resources.add(grouperHolder);

ListenableFuture<List<Boolean>> futures = Futures.allAsList(
ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
new Function<QueryRunner<Row>, ListenableFuture<AggregateResult>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
public ListenableFuture<AggregateResult> apply(final QueryRunner<Row> input)
{
if (input == null) {
throw new ISE(
"Null queryRunner! Looks to be some segment unmapping action happening"
);
}

ListenableFuture<Boolean> future = exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
ListenableFuture<AggregateResult> future = exec.submit(
new AbstractPrioritizedCallable<AggregateResult>(priority)
{
@Override
public Boolean call() throws Exception
public AggregateResult call() throws Exception
{
try (
Releaser bufferReleaser = mergeBufferHolder.increment();
Releaser grouperReleaser = grouperHolder.increment()
) {
final Object retVal = input.run(queryForRunners, responseContext)
.accumulate(grouper, accumulator);
final AggregateResult retVal = input.run(queryForRunners, responseContext)
.accumulate(
AggregateResult.ok(),
accumulator
);

// Return true if OK, false if resources were exhausted.
return retVal == grouper;
return retVal;
}
catch (QueryInterruptedException e) {
throw e;
Expand Down Expand Up @@ -295,7 +298,7 @@ public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake)

private void waitForFutureCompletion(
GroupByQuery query,
ListenableFuture<List<Boolean>> future,
ListenableFuture<List<AggregateResult>> future,
long timeout
)
{
Expand All @@ -308,12 +311,12 @@ private void waitForFutureCompletion(
throw new TimeoutException();
}

final List<Boolean> results = future.get(timeout, TimeUnit.MILLISECONDS);
final List<AggregateResult> results = future.get(timeout, TimeUnit.MILLISECONDS);

for (Boolean result : results) {
if (!result) {
for (AggregateResult result : results) {
if (!result.isOk()) {
future.cancel(true);
throw GroupByQueryHelper.throwAccumulationResourceLimitExceededException();
throw new ResourceLimitExceededException(result.getReason());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public Row next()
// Aggregate additional grouping for this row
if (doAggregate) {
keyBuffer.rewind();
if (!grouper.aggregate(keyBuffer)) {
if (!grouper.aggregate(keyBuffer).isOk()) {
// Buffer full while aggregating; break out and resume later
currentRowWasPartiallyAggregated = true;
break outer;
Expand Down
Loading

0 comments on commit b2954d5

Please sign in to comment.