Skip to content

Commit

Permalink
Atomic merge buffer acquisition for groupBys (apache#3939)
Browse files Browse the repository at this point in the history
* Atomic merge buffer acquisition for groupBys

* documentation

* documentation

* address comments

* address comments

* fix test failure

* Addressed comments

- Add InsufficientResourcesException
- Renamed GroupByQueryBrokerResource to GroupByQueryResource

* addressed comments

* Add takeBatch() to BlockingPool
  • Loading branch information
jihoonson authored and himanshug committed Feb 22, 2017
1 parent e7d01b6 commit 7200dce
Show file tree
Hide file tree
Showing 20 changed files with 1,017 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,7 @@ public String getFormatString()
factory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ public String getFormatString()
factory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
)
);
Expand Down
243 changes: 220 additions & 23 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,55 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

import io.druid.java.util.common.logger.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
*/
public class BlockingPool<T>
{
private static final Logger log = new Logger(BlockingPool.class);
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

private final BlockingQueue<T> objects;
private final ArrayDeque<T> objects;
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;

public BlockingPool(
Supplier<T> generator,
int limit
)
{
this.objects = limit > 0 ? new ArrayBlockingQueue<T>(limit) : null;
this.objects = new ArrayDeque<>(limit);
this.maxSize = limit;

for (int i = 0; i < limit; i++) {
objects.add(generator.get());
}

this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
}

public int maxSize()
{
return maxSize;
}

@VisibleForTesting
public int getPoolSize()
{
return objects.size();
}

/**
Expand All @@ -58,31 +79,207 @@ public BlockingPool(
* @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeout)
{
checkInitialized();
final T theObject;
try {
if (timeout > -1) {
theObject = timeout > 0 ? poll(timeout) : poll();
} else {
theObject = take();
}
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private T poll()
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
return objects.isEmpty() ? null : objects.pop();
} finally {
lock.unlock();
}
}

private T poll(long timeout) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
} finally {
lock.unlock();
}
}

private T take() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
notEnough.await();
}
return objects.pop();
} finally {
lock.unlock();
}
}

/**
* Take a resource from the pool.
*
* @param elementNum number of resources to take
* @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout.
*
* @throws InterruptedException if interrupted while waiting for a resource to become available
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeout) throws InterruptedException
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeout)
{
Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take.");
final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take();
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
checkInitialized();
final List<T> objects;
try {
if (timeout > -1) {
objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum);
} else {
objects = takeBatch(elementNum);
}
return objects == null ? null : new ReferenceCountingResourceHolder<>(
objects,
new Closeable()
{
if (!objects.offer(theObject)) {
log.error("WTF?! Queue offer failed, uh oh...");
@Override
public void close() throws IOException
{
offerBatch(objects);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private List<T> pollBatch(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
);
return list;
}
} finally {
lock.unlock();
}
}

@VisibleForTesting
protected int getQueueSize()
private List<T> pollBatch(int elementNum, long timeout) throws InterruptedException
{
return objects.size();
long nanos = TIME_UNIT.toNanos(timeout);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private List<T> takeBatch(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
notEnough.await();
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private void checkInitialized()
{
Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take.");
}

private void offer(T theObject)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() < maxSize) {
objects.push(theObject);
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}

private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static <T> CombiningSequence<T> create(
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;

public CombiningSequence(
private CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
Expand Down
Loading

0 comments on commit 7200dce

Please sign in to comment.