Skip to content

Commit

Permalink
Fine grained buffer management for groupby (apache#3863)
Browse files Browse the repository at this point in the history
* Fine-grained buffer management for group by queries

* Remove maxQueryCount from GroupByRules

* Fix code style

* Merge master

* Fix compilation failure

* Address comments

* Address comments

- Revert Sequence
- Add isInitialized() to Grouper
- Initialize the grouper in RowBasedGrouperHelper.Accumulator
- Simple refactoring RowBasedGrouperHelper.Accumulator
- Add tests for checking the number of used merge buffers
- Improve docs

* Revert unnecessary changes

* change to visible to testing

* fix misspelling
  • Loading branch information
jihoonson authored and gianm committed Feb 14, 2017
1 parent 78b0d13 commit a459db6
Show file tree
Hide file tree
Showing 19 changed files with 1,185 additions and 188 deletions.
7 changes: 7 additions & 0 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.collections;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

Expand Down Expand Up @@ -78,4 +79,10 @@ public void close() throws IOException
}
);
}

@VisibleForTesting
protected int getQueueSize()
{
return objects.size();
}
}
11 changes: 6 additions & 5 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ inner query's results stream with off-heap fact map and on-heap string dictionar
strategy perform the outer query on the broker in a single-threaded fashion.

Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
safe and do not suffer from this issue.
This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.

#### Server configuration

Expand All @@ -185,7 +185,8 @@ When using the "v2" strategy, the following runtime properties apply:

Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that
does so. By default, Druid is configured without any merging buffer pool, so to use the "v2" strategy you must also
set `druid.processing.numMergeBuffers` to some non-zero number.
set `druid.processing.numMergeBuffers` to some non-zero number. Furthermore, if you want to execute deeply nested groupBys,
you must set `druid.processing.numMergeBuffers` to at least 2.

This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
Expand Down
9 changes: 4 additions & 5 deletions docs/content/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source)
```

Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
safe and do not suffer from this issue. If you like, you can forbid deeper nesting by setting
`druid.sql.planner.maxQueryCount = 2`.
This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.

#### Semi-joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,41 @@
package io.druid.java.util.common.guava;

/**
* A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
* A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
* a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines
* what happens with the data.
*
* This inversion of control is in place to allow the Sequence to do resource management. It can enforce that close()
* methods get called and other resources get cleaned up whenever processing is complete. Without this inversion
* <p>
* This inversion of control is in place to allow the Sequence to do resource management. It can enforce that close()
* methods get called and other resources get cleaned up whenever processing is complete. Without this inversion
* it is very easy to unintentionally leak resources when iterating over something that is backed by a resource.
*
* <p>
* Sequences also expose {#see com.metamx.common.guava.Yielder} Yielder objects which allow you to implement a
* continuation over the Sequence. Yielder do not offer the same guarantees of automagic resource management
* continuation over the Sequence. Yielder do not offer the same guarantees of automatic resource management
* as the accumulate method, but they are Closeable and will do the proper cleanup when close() is called on them.
*/
public interface Sequence<T>
{
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
/**
* Accumulate this sequence using the given accumulator.
*
* @param initValue the initial value to pass along to start the accumulation.
* @param accumulator the accumulator which is responsible for accumulating input values.
* @param <OutType> the type of accumulated value.
*
* @return accumulated value.
*/
<OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);

/**
* Return an Yielder for accumulated sequence.
*
* @param initValue the initial value to pass along to start the accumulation.
* @param accumulator the accumulator which is responsible for accumulating input values.
* @param <OutType> the type of accumulated value.
*
* @return an Yielder for accumulated sequence.
*
* @see Yielder
*/
<OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.query.groupby.epinephelinae;

import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -64,17 +65,19 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
private static final int HASH_SIZE = Ints.BYTES;

private final ByteBuffer buffer;
private final Supplier<ByteBuffer> bufferSupplier;
private final KeySerde<KeyType> keySerde;
private final int keySize;
private final BufferAggregator[] aggregators;
private final int[] aggregatorOffsets;
private final int initialBuckets;
private final int bucketSize;
private final int tableArenaSize;
private final int bufferGrouperMaxSize; // Integer.MAX_VALUE in production, only used for unit tests
private final float maxLoadFactor;

private ByteBuffer buffer;
private int tableArenaSize = -1;

// Buffer pointing to the current table (it moves around as the table grows)
private ByteBuffer tableBuffer;

Expand All @@ -90,8 +93,10 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
// Maximum number of elements in the table before it must be resized
private int maxSize;

private boolean initialized = false;

public BufferGrouper(
final ByteBuffer buffer,
final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
Expand All @@ -100,7 +105,7 @@ public BufferGrouper(
final int initialBuckets
)
{
this.buffer = buffer;
this.bufferSupplier = bufferSupplier;
this.keySerde = keySerde;
this.keySize = keySerde.keySize();
this.aggregators = new BufferAggregator[aggregatorFactories.length];
Expand All @@ -121,9 +126,23 @@ public BufferGrouper(
}

this.bucketSize = offset;
this.tableArenaSize = (buffer.capacity() / (bucketSize + Ints.BYTES)) * bucketSize;
}

@Override
public void init()
{
if (!initialized) {
this.buffer = bufferSupplier.get();
this.tableArenaSize = (buffer.capacity() / (bucketSize + Ints.BYTES)) * bucketSize;
reset();
initialized = true;
}
}

reset();
@Override
public boolean isInitialized()
{
return initialized;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import io.druid.java.util.common.ISE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
Expand All @@ -34,7 +36,7 @@

/**
* Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe.
*
* <p>
* The passed-in buffer is cut up into concurrencyHint slices, and each slice is passed to a different underlying
* grouper. Access to each slice is separately synchronized. As long as the result set fits in memory, keys are
* partitioned between buffers based on their hash, and multiple threads can write into the same buffer. When
Expand All @@ -50,8 +52,21 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private volatile boolean closed = false;
private final Comparator<KeyType> keyObjComparator;

private final Supplier<ByteBuffer> bufferSupplier;
private final ColumnSelectorFactory columnSelectorFactory;
private final AggregatorFactory[] aggregatorFactories;
private final int bufferGrouperMaxSize;
private final float bufferGrouperMaxLoadFactor;
private final int bufferGrouperInitialBuckets;
private final LimitedTemporaryStorage temporaryStorage;
private final ObjectMapper spillMapper;
private final int concurrencyHint;
private final KeySerdeFactory<KeyType> keySerdeFactory;

private volatile boolean initialized = false;

public ConcurrentGrouper(
final ByteBuffer buffer,
final Supplier<ByteBuffer> bufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
Expand All @@ -75,34 +90,67 @@ protected SpillingGrouper<KeyType> initialValue()
}
};

final int sliceSize = (buffer.capacity() / concurrencyHint);

for (int i = 0; i < concurrencyHint; i++) {
final ByteBuffer slice = buffer.duplicate();
slice.position(sliceSize * i);
slice.limit(slice.position() + sliceSize);
groupers.add(
new SpillingGrouper<>(
slice.slice(),
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets,
temporaryStorage,
spillMapper,
false
)
);
this.bufferSupplier = bufferSupplier;
this.columnSelectorFactory = columnSelectorFactory;
this.aggregatorFactories = aggregatorFactories;
this.bufferGrouperMaxSize = bufferGrouperMaxSize;
this.bufferGrouperMaxLoadFactor = bufferGrouperMaxLoadFactor;
this.bufferGrouperInitialBuckets = bufferGrouperInitialBuckets;
this.temporaryStorage = temporaryStorage;
this.spillMapper = spillMapper;
this.concurrencyHint = concurrencyHint;
this.keySerdeFactory = keySerdeFactory;
this.keyObjComparator = keySerdeFactory.objectComparator();
}

@Override
public void init()
{
if (!initialized) {
synchronized (bufferSupplier) {
if (!initialized) {
final ByteBuffer buffer = bufferSupplier.get();
final int sliceSize = (buffer.capacity() / concurrencyHint);

for (int i = 0; i < concurrencyHint; i++) {
final ByteBuffer slice = buffer.duplicate();
slice.position(sliceSize * i);
slice.limit(slice.position() + sliceSize);
final SpillingGrouper<KeyType> grouper = new SpillingGrouper<>(
Suppliers.ofInstance(slice.slice()),
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets,
temporaryStorage,
spillMapper,
false
);
grouper.init();
groupers.add(grouper);
}

initialized = true;
}
}
}
}

this.keyObjComparator = keySerdeFactory.objectComparator();
@Override
public boolean isInitialized()
{
return initialized;
}

@Override
public boolean aggregate(KeyType key, int keyHash)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}

if (closed) {
throw new ISE("Grouper is closed");
}
Expand Down Expand Up @@ -139,6 +187,10 @@ public boolean aggregate(KeyType key)
@Override
public void reset()
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}

if (closed) {
throw new ISE("Grouper is closed");
}
Expand All @@ -153,6 +205,10 @@ public void reset()
@Override
public Iterator<Entry<KeyType>> iterator(final boolean sorted)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}

if (closed) {
throw new ISE("Grouper is closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -69,7 +70,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class GroupByMergingQueryRunnerV2 implements QueryRunner
public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByMergingQueryRunnerV2.class);
private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
Expand Down Expand Up @@ -181,14 +182,15 @@ public CloseableGrouperIterator<RowBasedKey, Row> make()
false,
null,
config,
mergeBufferHolder.get(),
Suppliers.ofInstance(mergeBufferHolder.get()),
concurrencyHint,
temporaryStorage,
spillMapper,
combiningAggregatorFactories
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
grouper.init();

final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
Expand Down Expand Up @@ -259,7 +260,7 @@ public Row next()
}

final Grouper<ByteBuffer> grouper = new BufferGrouper<>(
buffer,
Suppliers.ofInstance(buffer),
keySerde,
cursor,
query.getAggregatorSpecs()
Expand All @@ -268,6 +269,7 @@ public Row next()
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets()
);
grouper.init();

outer:
while (!cursor.isDone()) {
Expand Down
Loading

0 comments on commit a459db6

Please sign in to comment.