Skip to content

Commit

Permalink
Merge pull request apache#1809 from metamx/fifoPriorityExecutorService
Browse files Browse the repository at this point in the history
Make PrioritizedExecutorService optionally FIFO
  • Loading branch information
xvrl committed Oct 27, 2015
2 parents ea2267e + ecdafa8 commit 59872bd
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 87 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|

#### General Query Configuration

Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|

#### General Query Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ public int columnCacheSizeBytes()
{
return 0;
}

@Config(value = "${base_path}.fifo")
public boolean isFifo()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@

public interface PrioritizedCallable<V> extends Callable<V>
{
public int getPriority();
int getPriority();
}
207 changes: 129 additions & 78 deletions processing/src/main/java/io/druid/query/PrioritizedExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
package io.druid.query;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
Expand All @@ -39,10 +39,11 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

public class PrioritizedExecutorService extends AbstractExecutorService implements ListeningExecutorService
{
public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
public static PrioritizedExecutorService create(Lifecycle lifecycle, DruidProcessingConfig config)
{
final PrioritizedExecutorService service = new PrioritizedExecutorService(
new ThreadPoolExecutor(
Expand All @@ -52,7 +53,8 @@ public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorSer
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build()
)
),
config
);

lifecycle.addHandler(
Expand All @@ -74,28 +76,35 @@ public void stop()
return service;
}

private final AtomicLong queuePosition = new AtomicLong(Long.MAX_VALUE);
private final ListeningExecutorService delegate;
private final BlockingQueue<Runnable> delegateQueue;
private final boolean allowRegularTasks;
private final int defaultPriority;
private final DruidProcessingConfig config;
final ThreadPoolExecutor threadPoolExecutor; // Used in unit tests

public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor
ThreadPoolExecutor threadPoolExecutor,
DruidProcessingConfig config
)
{
this(threadPoolExecutor, false, 0);
this(threadPoolExecutor, false, 0, config);
}

public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor,
boolean allowRegularTasks,
int defaultPriority
int defaultPriority,
DruidProcessingConfig config
)
{
this.threadPoolExecutor = threadPoolExecutor;
this.delegate = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(threadPoolExecutor));
this.delegateQueue = threadPoolExecutor.getQueue();
this.allowRegularTasks = allowRegularTasks;
this.defaultPriority = defaultPriority;
this.config = config;
}

@Override
Expand All @@ -109,7 +118,8 @@ protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Runnable runnable, T
ListenableFutureTask.create(runnable, value),
runnable instanceof PrioritizedRunnable
? ((PrioritizedRunnable) runnable).getPriority()
: defaultPriority
: defaultPriority,
config.isFifo() ? queuePosition.decrementAndGet() : 0
);
}

Expand All @@ -121,9 +131,11 @@ protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Callable<T> callable
"task does not implement PrioritizedCallable"
);
return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority()
: defaultPriority
ListenableFutureTask.create(callable),
callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority()
: defaultPriority,
config.isFifo() ? queuePosition.decrementAndGet() : 0
);
}

Expand Down Expand Up @@ -185,89 +197,128 @@ public int getQueueSize()
{
return delegateQueue.size();
}
}


public static class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
ListenableFuture<V>,
PrioritizedRunnable,
Comparable<PrioritizedListenableFutureTask>
class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
ListenableFuture<V>,
PrioritizedRunnable,
Comparable<PrioritizedListenableFutureTask>
{
// NOTE: For priority HIGHER numeric value means more priority. As such we swap left and right in the compares
private static final Comparator<PrioritizedListenableFutureTask> PRIORITY_COMPARATOR = new Ordering<PrioritizedListenableFutureTask>()
{
public static <V> PrioritizedListenableFutureTask<V> create(PrioritizedRunnable task, @Nullable V result)
@Override
public int compare(
PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right
)
{
return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(task, result), task.getPriority());
return Integer.compare(right.getPriority(), left.getPriority());
}
}.compound(
new Ordering<PrioritizedListenableFutureTask>()
{
@Override
public int compare(PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right)
{
return Long.compare(right.getInsertionPlace(), left.getInsertionPlace());
}
}
);

public static <V> PrioritizedListenableFutureTask<?> create(PrioritizedCallable<V> callable)
{
return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(callable), callable.getPriority());
}
public static <V> PrioritizedListenableFutureTask<V> create(
PrioritizedRunnable task,
@Nullable V result,
long position
)
{
return new PrioritizedListenableFutureTask<>(
ListenableFutureTask.create(task, result),
task.getPriority(),
position
);
}

public static <V> PrioritizedListenableFutureTask<V> create(ListenableFutureTask<V> task, int priority)
{
return new PrioritizedListenableFutureTask<>(task, priority);
}
public static <V> PrioritizedListenableFutureTask<?> create(PrioritizedCallable<V> callable, long position)
{
return new PrioritizedListenableFutureTask<>(
ListenableFutureTask.create(callable),
callable.getPriority(),
position
);
}

private final ListenableFutureTask<V> delegate;
private final int priority;
public static <V> PrioritizedListenableFutureTask<V> create(ListenableFutureTask<V> task, int priority, long position)
{
return new PrioritizedListenableFutureTask<>(task, priority, position);
}

PrioritizedListenableFutureTask(ListenableFutureTask<V> delegate, int priority)
{
this.delegate = delegate;
this.priority = priority;
}
private final ListenableFutureTask<V> delegate;
private final int priority;
private final long insertionPlace;

@Override
public void run()
{
delegate.run();
}
PrioritizedListenableFutureTask(ListenableFutureTask<V> delegate, int priority, long position)
{
this.delegate = delegate;
this.priority = priority;
this.insertionPlace = position; // Long.MAX_VALUE will always be "highest"
}

@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public void run()
{
delegate.run();
}

@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isDone()
{
return delegate.isDone();
}
@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}

@Override
public V get() throws InterruptedException, ExecutionException
{
return delegate.get();
}
@Override
public boolean isDone()
{
return delegate.isDone();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}
@Override
public V get() throws InterruptedException, ExecutionException
{
return delegate.get();
}

@Override
public void addListener(Runnable listener, Executor executor)
{
delegate.addListener(listener, executor);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}

@Override
public int getPriority()
{
return priority;
}
@Override
public void addListener(Runnable listener, Executor executor)
{
delegate.addListener(listener, executor);
}

@Override
public int compareTo(PrioritizedListenableFutureTask otherTask)
{
return Ints.compare(otherTask.getPriority(), getPriority());
}
@Override
public int getPriority()
{
return priority;
}

protected long getInsertionPlace()
{
return insertionPlace;
}

@Override
public int compareTo(PrioritizedListenableFutureTask otherTask)
{
return PRIORITY_COMPARATOR.compare(this, otherTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

public interface PrioritizedRunnable extends Runnable
{
public int getPriority();
int getPriority();
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setup()
public void testQueryCancellation() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
new Lifecycle(), new DruidProcessingConfig()
{
@Override
public String getFormatString()
Expand Down Expand Up @@ -189,7 +189,7 @@ public void run()
public void testQueryTimeout() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
new Lifecycle(), new DruidProcessingConfig()
{
@Override
public String getFormatString()
Expand Down
Loading

0 comments on commit 59872bd

Please sign in to comment.