Skip to content

Commit

Permalink
Update the randomize_start argument to randomize_start_duration to ac…
Browse files Browse the repository at this point in the history
…cept an integer specifying the maximum number of seconds to delay the start of each task. (#199)
  • Loading branch information
justHungryMan authored May 28, 2024
1 parent 6f6deeb commit 0f2c69f
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Some options common to all executors:
- `pipeline` a list consisting of the pipeline steps that should be run
- `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
- `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour
- `randomize_start` (_bool_, `False` by default) randomizes the start time of each task within a job by approximately 3 minutes to prevent all tasks from starting simultaneously and potentially overloading the system.
- `randomize_start_duration` (_int_, `0` by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.

Call an executor's `run` method to execute its pipeline.

Expand Down
10 changes: 5 additions & 5 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PipelineExecutor(ABC):
logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
"""

@abstractmethod
Expand All @@ -38,12 +38,12 @@ def __init__(
pipeline: list[PipelineStep | Callable],
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
randomize_start: bool = False,
randomize_start_duration: int = 0,
):
self.pipeline: list[PipelineStep | Callable] = pipeline
self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}")
self.skip_completed = skip_completed
self.randomize_start = randomize_start
self.randomize_start_duration = randomize_start_duration

@abstractmethod
def run(self):
Expand Down Expand Up @@ -80,8 +80,8 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
logfile = add_task_logger(self.logging_dir, rank, local_rank)
log_pipeline(self.pipeline)

if self.randomize_start:
time.sleep(random.randint(0, 60 * 3))
if self.randomize_start_duration > 0:
time.sleep(random.randint(0, self.randomize_start_duration))
try:
# pipe data from one step to the next
pipelined_data = None
Expand Down
6 changes: 3 additions & 3 deletions src/datatrove/executor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class LocalPipelineExecutor(PipelineExecutor):
Tasks [local_rank_offset, local_rank_offset + local_tasks] will be run.
depends: another LocalPipelineExecutor that should run
before this one
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
"""

def __init__(
Expand All @@ -44,9 +44,9 @@ def __init__(
start_method: str = "forkserver",
local_tasks: int = -1,
local_rank_offset: int = 0,
randomize_start: bool = False,
randomize_start_duration: int = 0,
):
super().__init__(pipeline, logging_dir, skip_completed, randomize_start)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration)
self.tasks = tasks
self.workers = workers if workers != -1 else tasks
self.start_method = start_method
Expand Down
8 changes: 4 additions & 4 deletions src/datatrove/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SlurmPipelineExecutor(PipelineExecutor):
stagger_max_array_jobs: when max_array_launch_parallel is True, this determines how many seconds to wait
between launching each of the parallel jobs
run_on_dependency_fail: start executing when a job we depend on finishes even if it has failed
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
requeue_signals: requeue the job and exit when one of these signals is received. Useful for when an instance
is being reclaimed and jobs must be stopped for example. Set to None to disable
mail_type: see https://slurm.schedmd.com/sbatch.html. Common values are (NONE, BEGIN, END, FAIL, REQUEUE, ALL)
Expand Down Expand Up @@ -105,15 +105,15 @@ def __init__(
max_array_launch_parallel: bool = False,
stagger_max_array_jobs: int = 0,
run_on_dependency_fail: bool = False,
randomize_start: bool = False,
randomize_start_duration: int = 0,
requeue_signals: tuple[str] | None = ("SIGUSR1",),
mail_type: str = "ALL",
mail_user: str = None,
requeue: bool = True,
srun_args: dict = None,
tasks_per_job: int = 1,
):
super().__init__(pipeline, logging_dir, skip_completed, randomize_start)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration)
self.tasks = tasks
self.workers = workers
self.partition = partition
Expand All @@ -133,7 +133,7 @@ def __init__(
self.max_array_launch_parallel = max_array_launch_parallel
self.stagger_max_array_jobs = stagger_max_array_jobs
self.run_on_dependency_fail = run_on_dependency_fail
self.randomize_start = randomize_start
self.randomize_start_duration = randomize_start_duration
self.job_id = None
self.requeue_signals = requeue_signals
self.mail_type = mail_type
Expand Down

0 comments on commit 0f2c69f

Please sign in to comment.