Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
criccomini committed Aug 7, 2017
2 parents ccf3ccc + ddc5026 commit fcf31d8
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 97 deletions.
51 changes: 40 additions & 11 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import traceback
import time
import psutil
import re

import airflow
from airflow import api
Expand All @@ -49,7 +50,8 @@
from airflow.executors import GetDefaultExecutor
from airflow.models import (DagModel, DagBag, TaskInstance,
DagPickle, DagRun, Variable, DagStat,
Connection)
Connection, DAG)

from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
Expand Down Expand Up @@ -127,6 +129,19 @@ def get_dag(args):
return dagbag.dags[args.dag_id]


def get_dags(args):
if not args.dag_regex:
return [get_dag(args)]
dagbag = DagBag(process_subdir(args.subdir))
matched_dags = [dag for dag in dagbag.dags.values() if re.search(
args.dag_id, dag.dag_id)]
if not matched_dags:
raise AirflowException(
'dag_id could not be found with regex: {}. Either the dag did not exist '
'or it failed to parse.'.format(args.dag_id))
return matched_dags


def backfill(args, dag=None):
logging.basicConfig(
level=settings.LOGGING_LEVEL,
Expand Down Expand Up @@ -164,7 +179,8 @@ def backfill(args, dag=None):
conf.getboolean('core', 'donot_pickle')),
ignore_first_depends_on_past=args.ignore_first_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
pool=args.pool)
pool=args.pool,
delay_on_limit_secs=args.delay_on_limit)


def trigger_dag(args):
Expand Down Expand Up @@ -599,15 +615,17 @@ def clear(args):
logging.basicConfig(
level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
dag = get_dag(args)
dags = get_dags(args)

if args.task_regex:
dag = dag.sub_dag(
task_regex=args.task_regex,
include_downstream=args.downstream,
include_upstream=args.upstream,
)
dag.clear(
for idx, dag in enumerate(dags):
dags[idx] = dag.sub_dag(
task_regex=args.task_regex,
include_downstream=args.downstream,
include_upstream=args.upstream)

DAG.clear_dags(
dags,
start_date=args.start_date,
end_date=args.end_date,
only_failed=args.only_failed,
Expand Down Expand Up @@ -1217,6 +1235,14 @@ class CLIFactory(object):
"DO respect depends_on_past)."),
"store_true"),
'pool': Arg(("--pool",), "Resource pool to use"),
'delay_on_limit': Arg(
("--delay_on_limit",),
help=("Amount of time in seconds to wait when the limit "
"on maximum active dag runs (max_active_runs) has "
"been reached before trying to execute a dag run "
"again."),
type=float,
default=1.0),
# list_tasks
'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
# list_dags
Expand All @@ -1237,6 +1263,9 @@ class CLIFactory(object):
'exclude_subdags': Arg(
("-x", "--exclude_subdags"),
"Exclude subdags", "store_true"),
'dag_regex': Arg(
("-dx", "--dag_regex"),
"Search dag_id as regex instead of exact string", "store_true"),
# trigger_dag
'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"),
'conf': Arg(
Expand Down Expand Up @@ -1471,7 +1500,7 @@ class CLIFactory(object):
'dag_id', 'task_regex', 'start_date', 'end_date',
'mark_success', 'local', 'donot_pickle', 'include_adhoc',
'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
'subdir', 'pool', 'dry_run')
'subdir', 'pool', 'delay_on_limit', 'dry_run')
}, {
'func': list_tasks,
'help': "List the tasks within a DAG",
Expand All @@ -1482,7 +1511,7 @@ class CLIFactory(object):
'args': (
'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
'upstream', 'downstream', 'no_confirm', 'only_failed',
'only_running', 'exclude_subdags'),
'only_running', 'exclude_subdags', 'dag_regex'),
}, {
'func': pause,
'help': "Pause a DAG",
Expand Down
144 changes: 86 additions & 58 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1837,26 +1837,29 @@ def __init__(self,
not_ready=None,
deadlocked=None,
active_runs=None,
executed_dag_run_dates=None,
finished_runs=0,
total_runs=0,
):
"""
:param to_run: Tasks to run in the backfill
:type to_run: dict
:type to_run: dict[Tuple[String, String, DateTime], TaskInstance]
:param started: Maps started task instance key to task instance object
:type started: dict
:type started: dict[Tuple[String, String, DateTime], TaskInstance]
:param skipped: Tasks that have been skipped
:type skipped: set
:type skipped: set[Tuple[String, String, DateTime]]
:param succeeded: Tasks that have succeeded so far
:type succeeded: set
:type succeeded: set[Tuple[String, String, DateTime]]
:param failed: Tasks that have failed
:type failed: set
:type failed: set[Tuple[String, String, DateTime]]
:param not_ready: Tasks not ready for execution
:type not_ready: set
:type not_ready: set[Tuple[String, String, DateTime]]
:param deadlocked: Deadlocked tasks
:type deadlocked: set
:param active_runs: Active tasks at a certain point in time
:type active_runs: list
:type deadlocked: set[Tuple[String, String, DateTime]]
:param active_runs: Active dag runs at a certain point in time
:type active_runs: list[DagRun]
:param executed_dag_run_dates: Datetime objects for the executed dag runs
:type executed_dag_run_dates: set[Datetime]
:param finished_runs: Number of finished runs so far
:type finished_runs: int
:param total_runs: Number of total dag runs able to run
Expand All @@ -1870,6 +1873,7 @@ def __init__(self,
self.not_ready = not_ready or set()
self.deadlocked = deadlocked or set()
self.active_runs = active_runs or list()
self.executed_dag_run_dates = executed_dag_run_dates or set()
self.finished_runs = finished_runs
self.total_runs = total_runs

Expand All @@ -1884,6 +1888,7 @@ def __init__(
ignore_first_depends_on_past=False,
ignore_task_deps=False,
pool=None,
delay_on_limit_secs=1.0,
*args, **kwargs):
self.dag = dag
self.dag_id = dag.dag_id
Expand All @@ -1895,6 +1900,7 @@ def __init__(
self.ignore_first_depends_on_past = ignore_first_depends_on_past
self.ignore_task_deps = ignore_task_deps
self.pool = pool
self.delay_on_limit_secs = delay_on_limit_secs
super(BackfillJob, self).__init__(*args, **kwargs)

def _update_counters(self, ti_status):
Expand Down Expand Up @@ -1975,32 +1981,51 @@ def _manage_executor_state(self, started):
def _get_dag_run(self, run_date, session=None):
"""
Returns a dag run for the given run date, which will be matched to an existing
dag run if available or create a new dag run otherwise.
dag run if available or create a new dag run otherwise. If the max_active_runs
limit is reached, this function will return None.
:param run_date: the execution date for the dag run
:type run_date: datetime
:param session: the database session object
:type session: Session
:return: the dag run for the run date
:return: a DagRun in state RUNNING or None
"""
run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())

# consider max_active_runs but ignore when running subdags
respect_dag_max_active_limit = (True
if (self.dag.schedule_interval and
not self.dag.is_subdag)
else False)

current_active_dag_count = self.dag.get_num_active_runs(external_trigger=False)

# check if we are scheduling on top of a already existing dag_run
# we could find a "scheduled" run instead of a "backfill"
run = DagRun.find(dag_id=self.dag.dag_id,
execution_date=run_date,
session=session)

if run is None or len(run) == 0:
run = self.dag.create_dagrun(
run_id=run_id,
execution_date=run_date,
start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False,
session=session
)
else:
if run is not None and len(run) > 0:
run = run[0]
if run.state == State.RUNNING:
respect_dag_max_active_limit = False
else:
run = None

# enforce max_active_runs limit for dag, special cases already
# handled by respect_dag_max_active_limit
if (respect_dag_max_active_limit and
current_active_dag_count >= self.dag.max_active_runs):
return None

run = run or self.dag.create_dagrun(
run_id=run_id,
execution_date=run_date,
start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False,
session=session
)

# set required transient field
run.dag = self.dag
Expand Down Expand Up @@ -2308,23 +2333,25 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
:type start_date: datetime
:param session: the current session object
:type session: Session
:return: list of execution dates of the dag runs that were executed.
:rtype: list
"""
for next_run_date in run_dates:
dag_run = self._get_dag_run(next_run_date, session=session)
tis_map = self._task_instances_for_dag_run(dag_run,
session=session)
if dag_run is None:
continue

ti_status.active_runs.append(dag_run)
ti_status.to_run.update(tis_map or {})

ti_status.total_runs = len(ti_status.active_runs)
processed_dag_run_dates = self._process_backfill_task_instances(
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
start_date=start_date,
session=session)

return self._process_backfill_task_instances(ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
start_date=start_date,
session=session)
ti_status.executed_dag_run_dates.update(processed_dag_run_dates)

def _execute(self):
"""
Expand All @@ -2334,22 +2361,6 @@ def _execute(self):
session = settings.Session()
ti_status = BackfillJob._DagRunTaskStatus()

# consider max_active_runs but ignore when running subdags
# "parent.child" as a dag_id is by convention a subdag
if self.dag.schedule_interval and not self.dag.is_subdag:
all_active_runs = DagRun.find(
dag_id=self.dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)

# return if already reached maximum active runs
if len(all_active_runs) >= self.dag.max_active_runs:
self.logger.info("Dag {} has reached maximum amount of {} dag runs"
.format(self.dag.dag_id, self.dag.max_active_runs))
return

start_date = self.bf_start_date

# Get intervals between the start/end dates, which will turn into dag runs
Expand All @@ -2372,20 +2383,37 @@ def _execute(self):
executor = self.executor
executor.start()

self._execute_for_run_dates(run_dates=run_dates,
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
start_date=start_date,
session=session)
ti_status.total_runs = len(run_dates) # total dag runs in backfill

executor.end()
session.commit()
session.close()

err = self._collect_errors(ti_status=ti_status, session=session)
if err:
raise AirflowException(err)
try:
remaining_dates = ti_status.total_runs
while remaining_dates > 0:
dates_to_process = [run_date for run_date in run_dates
if run_date not in ti_status.executed_dag_run_dates]

self._execute_for_run_dates(run_dates=dates_to_process,
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
start_date=start_date,
session=session)

remaining_dates = (
ti_status.total_runs - len(ti_status.executed_dag_run_dates)
)
err = self._collect_errors(ti_status=ti_status, session=session)
if err:
raise AirflowException(err)

if remaining_dates > 0:
self.logger.info(("max_active_runs limit for dag {} has been reached "
" - waiting for other dag runs to finish")
.format(self.dag_id))
time.sleep(self.delay_on_limit_secs)
finally:
executor.end()
session.commit()
session.close()

self.logger.info("Backfill done. Exiting.")

Expand Down
Loading

0 comments on commit fcf31d8

Please sign in to comment.