Skip to content

Commit

Permalink
Make the tasks run with only a single directory (apache#14063)
Browse files Browse the repository at this point in the history
* Make the tasks run with only a single directory

There was a change that tried to get indexing to run on multiple disks
It made a bunch of changes to how tasks run, effectively hiding the
"safe" directory for tasks to write files into from the task code itself
making it extremely difficult to do anything correctly inside of a task.

This change reverts those changes inside of the tasks and makes it so that
only the task runners are the ones that make decisions about which
mount points should be used for storing task-related files.

It adds the config druid.worker.baseTaskDirs which can be used by the
task runners to know which directories they should schedule tasks inside of.
The TaskConfig remains the authoritative source of configuration for where
and how an individual task should be operating.
  • Loading branch information
imply-cheddar authored Apr 13, 2023
1 parent 179e2e8 commit aaa6cc1
Show file tree
Hide file tree
Showing 64 changed files with 823 additions and 1,127 deletions.
10 changes: 5 additions & 5 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
|`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the MiddleManager. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.indexer.runner.minWorkerVersion`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons.|0|
|`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of CPUs on the machine - 1|
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. Example: `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`|

#### Peon Processing
Expand Down Expand Up @@ -1510,8 +1511,7 @@ Additional peon configs include:
|--------|-----------|-------|
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Deprecated. Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.baseTaskDirPaths`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of `${druid.indexer.task.baseTaskDir}`. If a null or empty value is provided, `baseTaskDir` is used. Otherwise, it overrides the value of `baseTaskDir`. Example: `druid.indexer.task.baseTaskDirPaths=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.batchProcessingMode`| Batch ingestion tasks have three operating modes to control construction and tracking for intermediary segments: `OPEN_SEGMENTS`, `CLOSED_SEGMENTS`, and `CLOSED_SEGMENT_SINKS`. `OPEN_SEGMENTS` uses the streaming ingestion code path and performs a `mmap` on intermediary segments to build a timeline to make these segments available to realtime queries. Batch ingestion doesn't require intermediary segments, so the default mode, `CLOSED_SEGMENTS`, eliminates `mmap` of intermediary segments. `CLOSED_SEGMENTS` mode still tracks the entire set of segments in heap. The `CLOSED_SEGMENTS_SINKS` mode is the most aggressive configuration and should have the smallest memory footprint. It eliminates in-memory tracking and `mmap` of intermediary segments produced during segment creation. `CLOSED_SEGMENTS_SINKS` mode isn't as well tested as other modes so is currently considered experimental. You can use `OPEN_SEGMENTS` mode if problems occur with the 2 newer modes. |`CLOSED_SEGMENTS`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
Expand All @@ -1536,7 +1536,7 @@ If the peon is running in remote mode, there must be an Overlord up and running.
When new segments are created, Druid temporarily stores some preprocessed data in some buffers. Currently three types of
*medium* exist for those buffers: *temporary files*, *off-heap memory*, and *on-heap memory*.

*Temporary files* (`tmpFile`) are stored under the task working directory (see `druid.indexer.task.baseTaskDirPaths`
*Temporary files* (`tmpFile`) are stored under the task working directory (see `druid.worker.baseTaskDirs`
configuration above) and thus share it's mounting properties, e. g. they could be backed by HDD, SSD or memory (tmpfs).
This type of medium may do unnecessary disk I/O and requires some disk space to be available.

Expand Down Expand Up @@ -1577,11 +1577,11 @@ then the value from the configuration below is used:
|--------|-----------|-------|
|`druid.worker.version`|Version identifier for the Indexer.|0|
|`druid.worker.capacity`|Maximum number of tasks the Indexer can accept.|Number of available processors - 1|
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. Example: `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available for ingestion processing. This is applied by automatically setting the `maxBytesInMemory` property on tasks.|60% of configured JVM heap|
|`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge operations that can run concurrently across all tasks.|`druid.worker.capacity` / 2, rounded down|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Deprecated. Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|`druid.indexer.task.baseTaskDirPaths`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of `${druid.indexer.task.baseTaskDir}`. If a null or empty value is provided, `baseTaskDir` is used. Otherwise, it overrides the value of `baseTaskDir`. Example: `druid.indexer.task.baseTaskDirPaths=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
Expand Down
2 changes: 1 addition & 1 deletion docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ Logs are created by ingestion tasks as they run. You can configure Druid to pus

Once the task has been submitted to the Overlord it remains `WAITING` for locks to be acquired. Worker slot allocation is then `PENDING` until the task can actually start executing.

The task then starts creating logs in a local directory of the middle manager (or indexer) in a `log` directory for the specific `taskId` at [`druid.indexer.task.baseTaskDirPaths`] (../configuration/index.md#additional-peon-configuration).
The task then starts creating logs in a local directory of the middle manager (or indexer) in a `log` directory for the specific `taskId` at [`druid.worker.baseTaskDirs`] (../configuration/index.md#middlemanager-configuration).

When the task completes - whether it succeeds or fails - the middle manager (or indexer) will push the task log file into the location specified in [`druid.indexer.logs`](../configuration/index.md#task-logging).

Expand Down
2 changes: 1 addition & 1 deletion examples/conf/druid/auto/indexer/runtime.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ druid.plaintextPort=8091
# determined based on available processor.

# Task launch parameters
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Processing threads and buffers on Indexer
# Determined automatically based on available memory. For details on how to manually set parameters:
Expand Down
2 changes: 1 addition & 1 deletion examples/conf/druid/auto/middleManager/runtime.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ druid.plaintextPort=8091

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Processing threads and buffers on Peons
# Determined automatically based on available memory. For details on how to manually set parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ druid.plaintextPort=8091
druid.worker.capacity=4

# Task launch parameters
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=4
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=8
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=4
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=2
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=2
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms256m","-Xmx256m","-XX:MaxDirectMemorySize=300m","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=3
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=50
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=16
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]

# HTTP server threads
druid.server.http.numThreads=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -109,7 +108,7 @@ public Job fromTask(Task task) throws IOException
PeonCommandContext context = new PeonCommandContext(
generateCommand(task),
javaOpts(task),
new File(taskConfig.getBaseTaskDirPaths().get(0)),
taskConfig.getBaseTaskDir(),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
Expand Down Expand Up @@ -371,7 +370,7 @@ private List<String> generateCommand(Task task)
{
final List<String> command = new ArrayList<>();
command.add("/peon.sh");
command.add(new File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath());
command.add(taskConfig.getBaseTaskDir().getAbsolutePath());
command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.

String nodeType = task.getNodeType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private Collection<EnvVar> getEnv(Task task)
return ImmutableList.of(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(new File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath())
.withValue(taskConfig.getBaseDir())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_ID_ENV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
Expand Down Expand Up @@ -72,23 +72,7 @@ public void setup()
true,
false
);
taskConfig = new TaskConfig(
"/tmp",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("/tmp")
);
taskConfig = new TaskConfigBuilder().setBaseDir("/tmp").build();
properties = new Properties();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
Expand All @@ -31,6 +30,7 @@
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
Expand Down Expand Up @@ -93,23 +93,7 @@ public void setup()
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
}

@Disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.api.client.util.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
Expand All @@ -40,6 +40,7 @@
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
Expand Down Expand Up @@ -92,29 +93,30 @@ public K8sTaskAdapterTest()
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
}

@Test
void testAddingLabelsAndAnnotations() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
TestKubernetesClient testClient = new TestKubernetesClient(client)
{
@SuppressWarnings("unchecked")
@Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{
return (T) new Pod()
{
@Override
public PodSpec getSpec()
{
return podSpec;
}
};
}
};

KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
config.annotations.put("annotation_key", "annotation_value");
Expand All @@ -128,11 +130,8 @@ void testAddingLabelsAndAnnotations() throws IOException
jsonMapper
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
task,
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"))
);
Job jobFromSpec = adapter.fromTask(task);

assertTrue(jobFromSpec.getMetadata().getAnnotations().containsKey("annotation_key"));
assertTrue(jobFromSpec.getMetadata().getAnnotations().containsKey(DruidK8sConstants.TASK_ID));
assertFalse(jobFromSpec.getMetadata().getAnnotations().containsKey("label_key"));
Expand Down
Loading

0 comments on commit aaa6cc1

Please sign in to comment.