Skip to content

Commit

Permalink
Allow configurable temp directory for query processing (apache#3893)
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-wei authored and fjy committed Feb 2, 2017
1 parent a73f1c9 commit 182261f
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|

The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|

The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/realtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The realtime node uses several of the global configs in [Configuration](../confi
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|

The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ public boolean isFifo()
{
return false;
}

@Config(value = "${base_path}.tmpDir")
public String getTmpDir() {
return System.getProperty("java.io.tmpdir");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
private final int concurrencyHint;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper spillMapper;
private final String processingTmpDir;

public GroupByMergingQueryRunnerV2(
GroupByQueryConfig config,
Expand All @@ -89,7 +90,8 @@ public GroupByMergingQueryRunnerV2(
Iterable<QueryRunner<Row>> queryables,
int concurrencyHint,
BlockingPool<ByteBuffer> mergeBufferPool,
ObjectMapper spillMapper
ObjectMapper spillMapper,
String processingTmpDir
)
{
this.config = config;
Expand All @@ -99,6 +101,7 @@ public GroupByMergingQueryRunnerV2(
this.concurrencyHint = concurrencyHint;
this.mergeBufferPool = mergeBufferPool;
this.spillMapper = spillMapper;
this.processingTmpDir = processingTmpDir;
}

@Override
Expand Down Expand Up @@ -130,7 +133,7 @@ public Sequence<Row> run(final Query queryParam, final Map responseContext)
}

final File temporaryStorageDirectory = new File(
System.getProperty("java.io.tmpdir"),
processingTmpDir,
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public static Sequence<Row> process(
final Map<String, ValueType> rowSignature,
final GroupByQueryConfig config,
final BlockingPool<ByteBuffer> mergeBufferPool,
final ObjectMapper spillMapper
final ObjectMapper spillMapper,
final String processingTmpDir
)
{
final GroupByQuery query = (GroupByQuery) queryParam;
Expand All @@ -78,8 +79,9 @@ public static Sequence<Row> process(
aggregatorFactories[i] = query.getAggregatorSpecs().get(i);
}


final File temporaryStorageDirectory = new File(
System.getProperty("java.io.tmpdir"),
processingTmpDir,
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public Sequence<Row> processSubqueryResult(
GroupByQueryHelper.rowSignatureFor(subquery),
configSupplier.get(),
mergeBufferPool,
spillMapper
spillMapper,
processingConfig.getTmpDir()
);
return mergeResults(new QueryRunner<Row>()
{
Expand All @@ -236,7 +237,8 @@ public QueryRunner<Row> mergeRunners(
queryRunners,
processingConfig.getNumThreads(),
mergeBufferPool,
spillMapper
spillMapper,
processingConfig.getTmpDir()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void testDeserialization() throws Exception
}
Assert.assertEquals(0, config.columnCacheSizeBytes());
Assert.assertFalse(config.isFifo());
Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir());

//with non-defaults
Properties props = new Properties();
Expand All @@ -57,6 +58,7 @@ public void testDeserialization() throws Exception
props.setProperty("druid.processing.numThreads", "5");
props.setProperty("druid.processing.columnCache.sizeBytes", "1");
props.setProperty("druid.processing.fifo", "true");
props.setProperty("druid.processing.tmpDir", "/test/path");

factory = Config.createFactory(props);
config = factory.buildWithReplacements(DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
Expand All @@ -66,5 +68,6 @@ public void testDeserialization() throws Exception
Assert.assertEquals(5, config.getNumThreads());
Assert.assertEquals(1, config.columnCacheSizeBytes());
Assert.assertTrue(config.isFifo());
Assert.assertEquals("/test/path", config.getTmpDir());
}
}

0 comments on commit 182261f

Please sign in to comment.