Skip to content

Commit

Permalink
Make timeout behavior consistent to document (apache#4134)
Browse files Browse the repository at this point in the history
* Make timeout behavior consistent to document

* Refactoring BlockingPool and add more methods to QueryContexts

* remove unused imports

* Addressed comments

* Address comments

* remove unused method

* Make default query timeout configurable

* Fix test failure

* Change timeout from period to millis
  • Loading branch information
jihoonson authored and gianm committed Apr 19, 2017
1 parent db656c5 commit 5b69f2e
Show file tree
Hide file tree
Showing 37 changed files with 521 additions and 255 deletions.
132 changes: 84 additions & 48 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,40 +74,57 @@ public int getPoolSize()
}

/**
* Take a resource from the pool.
* Take a resource from the pool, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout.
* @param timeoutMs maximum time to wait for a resource, in milliseconds.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeout)
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
final T theObject;
try {
if (timeout > -1) {
theObject = timeout > 0 ? poll(timeout) : poll();
} else {
theObject = take();
}
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private T poll()
/**
* Take a resource from the pool, waiting if necessary until an element becomes available.
*
* @return a resource
*/
public ReferenceCountingResourceHolder<T> take()
{
checkInitialized();
try {
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
}

private T pollObject()
{
final ReentrantLock lock = this.lock;
lock.lock();
Expand All @@ -118,9 +135,9 @@ private T poll()
}
}

private T poll(long timeout) throws InterruptedException
private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
long nanos = TIME_UNIT.toNanos(timeoutMs);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
Expand All @@ -136,7 +153,7 @@ private T poll(long timeout) throws InterruptedException
}
}

private T take() throws InterruptedException
private T takeObject() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
Expand All @@ -151,41 +168,60 @@ private T take() throws InterruptedException
}

/**
* Take a resource from the pool.
* Take resources from the pool, waiting up to the
* specified wait time if necessary for elements of the given number to become available.
*
* @param elementNum number of resources to take
* @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout.
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeout)
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
final List<T> objects;
try {
if (timeout > -1) {
objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum);
} else {
objects = takeBatch(elementNum);
}
return objects == null ? null : new ReferenceCountingResourceHolder<>(
objects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(objects);
}
}
);
return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private List<T> pollBatch(int elementNum) throws InterruptedException
/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a resource
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
return wrapObjects(takeObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private ReferenceCountingResourceHolder<List<T>> wrapObjects(List<T> theObjects)
{
return theObjects == null ? null : new ReferenceCountingResourceHolder<>(
theObjects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(theObjects);
}
}
);
}

private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
Expand All @@ -204,9 +240,9 @@ private List<T> pollBatch(int elementNum) throws InterruptedException
}
}

private List<T> pollBatch(int elementNum, long timeout) throws InterruptedException
private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
long nanos = TIME_UNIT.toNanos(timeoutMs);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
Expand All @@ -226,7 +262,7 @@ private List<T> pollBatch(int elementNum, long timeout) throws InterruptedExcept
}
}

private List<T> takeBatch(int elementNum) throws InterruptedException
private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Druid uses Jetty to serve HTTP requests.
|--------|-----------|-------|
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|10|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Druid uses Jetty to serve HTTP requests.
|--------|-----------|-------|
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|10|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|

#### Processing

Expand Down
20 changes: 10 additions & 10 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ Query Context

The query context is used for various query configuration parameters. The following parameters apply to all queries.

|property |default | description |
|-----------------|---------------------|----------------------|
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
|property |default | description |
|-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |

In addition, some query types offer context parameters specific to that query type.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
Expand Down Expand Up @@ -65,6 +66,7 @@ public Sequence<ScanResultValue> process(
return Sequences.empty();
}
}
final boolean hasTimeout = QueryContexts.hasTimeout(query);
final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT);
final long start = System.currentTimeMillis();
final StorageAdapter adapter = segment.asStorageAdapter();
Expand Down Expand Up @@ -156,7 +158,7 @@ public boolean hasNext()
@Override
public ScanResultValue next()
{
if (System.currentTimeMillis() >= timeoutAt) {
if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
throw new QueryInterruptedException(new TimeoutException());
}
long lastOffset = offset;
Expand All @@ -173,10 +175,12 @@ public ScanResultValue next()
ScanQueryRunnerFactory.CTX_COUNT,
(long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
);
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() - start)
);
if (hasTimeout) {
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() - start)
);
}
return new ScanResultValue(segmentId, allColumns, events);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
Expand All @@ -36,6 +36,8 @@

public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
// This variable indicates when a running query should be expired,
// and is effective only when 'timeout' of queryContext has a positive value.
public static final String CTX_TIMEOUT_AT = "timeoutAt";
public static final String CTX_COUNT = "count";
private final ScanQueryQueryToolChest toolChest;
Expand Down Expand Up @@ -71,9 +73,9 @@ public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
)
{
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L)
? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue();
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query);
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
Sequences.map(
Expand Down Expand Up @@ -122,7 +124,7 @@ public Sequence<ScanResultValue> run(
final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT);
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
};
}
return engine.process((ScanQuery) query, segment, responseContext);
}
}
Expand Down
9 changes: 4 additions & 5 deletions processing/src/main/java/io/druid/query/AsyncQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AsyncQueryRunner(QueryRunner<T> baseRunner, ExecutorService executor, Que
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = BaseQuery.getContextPriority(query, 0);
final int priority = QueryContexts.getPriority(query);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
Expand All @@ -68,11 +68,10 @@ public Sequence<T> call() throws Exception
public Sequence<T> get()
{
try {
Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT);
if (timeout == null) {
return future.get();
if (QueryContexts.hasTimeout(query)) {
return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
return future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
return future.get();
}
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
throw Throwables.propagate(ex);
Expand Down
Loading

0 comments on commit 5b69f2e

Please sign in to comment.