Skip to content

Commit

Permalink
More fine-grained DI for management node types. Don't allocate proces…
Browse files Browse the repository at this point in the history
…sing resources on Router (apache#4429)

* Remove DruidProcessingModule, QueryableModule and QueryRunnerFactoryModule from DI for coordinator, overlord, middle-manager. Add RouterDruidProcessing not to allocate processing resources on router

* Fix examples

* Fixes

* Revert Peon configs and add comments

* Remove qualifier
  • Loading branch information
leventov authored and gianm committed Jun 28, 2017
1 parent 82140c2 commit 2fa4b10
Show file tree
Hide file tree
Showing 54 changed files with 935 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.benchmark.query.QueryBenchmarkUtil;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -348,15 +350,15 @@ public void setup() throws IOException
}
}

StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 250_000_000),
2
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;

import io.druid.collections.NonBlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void teardown()
public static class BenchmarkPool
{
private final AtomicLong numPools = new AtomicLong(0L);
private final StupidPool<Object> pool = new StupidPool<>(
private final NonBlockingPool<Object> pool = new StupidPool<>(
"simpleObject pool",
new Supplier<Object>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -392,15 +394,15 @@ public void setup() throws IOException
}
}

StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 250_000_000),
2
);
Expand Down
269 changes: 6 additions & 263 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,59 +19,11 @@

package io.druid.collections;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
*/
public class BlockingPool<T>
public interface BlockingPool<T>
{
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

private final ArrayDeque<T> objects;
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;

public BlockingPool(
Supplier<T> generator,
int limit
)
{
this.objects = new ArrayDeque<>(limit);
this.maxSize = limit;

for (int i = 0; i < limit; i++) {
objects.add(generator.get());
}

this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
}

public int maxSize()
{
return maxSize;
}

@VisibleForTesting
public int getPoolSize()
{
return objects.size();
}
int maxSize();

/**
* Take a resource from the pool, waiting up to the
Expand All @@ -81,91 +33,14 @@ public int getPoolSize()
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
ReferenceCountingResourceHolder<T> take(long timeoutMs);

/**
* Take a resource from the pool, waiting if necessary until an element becomes available.
*
* @return a resource
*/
public ReferenceCountingResourceHolder<T> take()
{
checkInitialized();
try {
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
}

private T pollObject()
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
return objects.isEmpty() ? null : objects.pop();
} finally {
lock.unlock();
}
}

private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
} finally {
lock.unlock();
}
}

private T takeObject() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
notEnough.await();
}
return objects.pop();
} finally {
lock.unlock();
}
}
ReferenceCountingResourceHolder<T> take();

/**
* Take resources from the pool, waiting up to the
Expand All @@ -176,17 +51,7 @@ private T takeObject() throws InterruptedException
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum, long timeoutMs);

/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
Expand All @@ -195,127 +60,5 @@ public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum,
*
* @return a resource
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
return wrapObjects(takeObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private ReferenceCountingResourceHolder<List<T>> wrapObjects(List<T> theObjects)
{
return theObjects == null ? null : new ReferenceCountingResourceHolder<>(
theObjects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(theObjects);
}
}
);
}

private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
}
} finally {
lock.unlock();
}
}

private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
notEnough.await();
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private void checkInitialized()
{
Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take.");
}

private void offer(T theObject)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() < maxSize) {
objects.push(theObject);
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}

private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum);
}
Loading

0 comments on commit 2fa4b10

Please sign in to comment.