diff --git a/airflow/jobs.py b/airflow/jobs.py
index e2f8c945010d4..668973e3f282f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1817,6 +1817,62 @@ class BackfillJob(BaseJob):
'polymorphic_identity': 'BackfillJob'
}
+ class _DagRunTaskStatus(object):
+ """
+ Internal status of the backfill job. This class is intended to be instantiated
+ only within a BackfillJob instance and will track the execution of tasks,
+ e.g. started, skipped, succeeded, failed, etc. Information about the dag runs
+ related to the backfill job are also being tracked in this structure,
+ .e.g finished runs, etc. Any other status related information related to the
+ execution of dag runs / tasks can be included in this structure since it makes
+ it easier to pass it around.
+ """
+ # TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts
+ def __init__(self,
+ to_run=None,
+ started=None,
+ skipped=None,
+ succeeded=None,
+ failed=None,
+ not_ready=None,
+ deadlocked=None,
+ active_runs=None,
+ finished_runs=0,
+ total_runs=0,
+ ):
+ """
+ :param to_run: Tasks to run in the backfill
+ :type to_run: dict
+ :param started: Maps started task instance key to task instance object
+ :type started: dict
+ :param skipped: Tasks that have been skipped
+ :type skipped: set
+ :param succeeded: Tasks that have succeeded so far
+ :type succeeded: set
+ :param failed: Tasks that have failed
+ :type failed: set
+ :param not_ready: Tasks not ready for execution
+ :type not_ready: set
+ :param deadlocked: Deadlocked tasks
+ :type deadlocked: set
+ :param active_runs: Active tasks at a certain point in time
+ :type active_runs: list
+ :param finished_runs: Number of finished runs so far
+ :type finished_runs: int
+ :param total_runs: Number of total dag runs able to run
+ :type total_runs: int
+ """
+ self.to_run = to_run or dict()
+ self.started = started or dict()
+ self.skipped = skipped or set()
+ self.succeeded = succeeded or set()
+ self.failed = failed or set()
+ self.not_ready = not_ready or set()
+ self.deadlocked = deadlocked or set()
+ self.active_runs = active_runs or list()
+ self.finished_runs = finished_runs
+ self.total_runs = total_runs
+
def __init__(
self,
dag,
@@ -1841,41 +1897,38 @@ def __init__(
self.pool = pool
super(BackfillJob, self).__init__(*args, **kwargs)
- def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run):
+ def _update_counters(self, ti_status):
"""
Updates the counters per state of the tasks that were running. Can re-add
to tasks to run in case required.
- :param started:
- :param succeeded:
- :param skipped:
- :param failed:
- :param tasks_to_run:
+ :param ti_status: the internal status of the backfill job tasks
+ :type ti_status: BackfillJob._DagRunTaskStatus
"""
- for key, ti in list(started.items()):
+ for key, ti in list(ti_status.started.items()):
ti.refresh_from_db()
if ti.state == State.SUCCESS:
- succeeded.add(key)
+ ti_status.succeeded.add(key)
self.logger.debug("Task instance {} succeeded. "
"Don't rerun.".format(ti))
- started.pop(key)
+ ti_status.started.pop(key)
continue
elif ti.state == State.SKIPPED:
- skipped.add(key)
+ ti_status.skipped.add(key)
self.logger.debug("Task instance {} skipped. "
"Don't rerun.".format(ti))
- started.pop(key)
+ ti_status.started.pop(key)
continue
elif ti.state == State.FAILED:
self.logger.error("Task instance {} failed".format(ti))
- failed.add(key)
- started.pop(key)
+ ti_status.failed.add(key)
+ ti_status.started.pop(key)
continue
# special case: if the task needs to run again put it back
elif ti.state == State.UP_FOR_RETRY:
self.logger.warning("Task instance {} is up for retry"
.format(ti))
- started.pop(key)
- tasks_to_run[key] = ti
+ ti_status.started.pop(key)
+ ti_status.to_run[key] = ti
# special case: The state of the task can be set to NONE by the task itself
# when it reaches concurrency limits. It could also happen when the state
# is changed externally, e.g. by clearing tasks from the ui. We need to cover
@@ -1888,8 +1941,8 @@ def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run):
session = settings.Session()
ti.set_state(State.SCHEDULED, session=session)
session.close()
- started.pop(key)
- tasks_to_run[key] = ti
+ ti_status.started.pop(key)
+ ti_status.to_run[key] = ti
def _manage_executor_state(self, started):
"""
@@ -1918,119 +1971,141 @@ def _manage_executor_state(self, started):
self.logger.error(msg)
ti.handle_failure(msg)
- def _execute(self):
+ @provide_session
+ def _get_dag_run(self, run_date, session=None):
"""
- Runs a dag for a specified date range.
+ Returns a dag run for the given run date, which will be matched to an existing
+ dag run if available or create a new dag run otherwise.
+ :param run_date: the execution date for the dag run
+ :type run_date: datetime
+ :param session: the database session object
+ :type session: Session
+ :return: the dag run for the run date
"""
- session = settings.Session()
- DagRun = models.DagRun
-
- # consider max_active_runs but ignore when running subdags
- # "parent.child" as a dag_id is by convention a subdag
- if self.dag.schedule_interval and not self.dag.is_subdag:
- active_runs = DagRun.find(
- dag_id=self.dag.dag_id,
+ run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())
+
+ # check if we are scheduling on top of a already existing dag_run
+ # we could find a "scheduled" run instead of a "backfill"
+ run = DagRun.find(dag_id=self.dag.dag_id,
+ execution_date=run_date,
+ session=session)
+
+ if run is None or len(run) == 0:
+ run = self.dag.create_dagrun(
+ run_id=run_id,
+ execution_date=run_date,
+ start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False,
session=session
)
+ else:
+ run = run[0]
- # return if already reached maximum active runs
- if len(active_runs) >= self.dag.max_active_runs:
- self.logger.info("Dag {} has reached maximum amount of {} dag runs"
- .format(self.dag.dag_id, self.dag.max_active_runs))
- return
-
- start_date = self.bf_start_date
- end_date = self.bf_end_date
-
- # picklin'
- pickle_id = None
- if not self.donot_pickle and self.executor.__class__ not in (
- executors.LocalExecutor, executors.SequentialExecutor):
- pickle = models.DagPickle(self.dag)
- session.add(pickle)
- session.commit()
- pickle_id = pickle.id
+ # set required transient field
+ run.dag = self.dag
- executor = self.executor
- executor.start()
+ # explicitly mark as backfill and running
+ run.state = State.RUNNING
+ run.run_id = run_id
+ run.verify_integrity(session=session)
+ return run
- # Build a list of all instances to run
+ @provide_session
+ def _task_instances_for_dag_run(self, dag_run, session=None):
+ """
+ Returns a map of task instance key to task instance object for the tasks to
+ run in the given dag run.
+ :param dag_run: the dag run to get the tasks from
+ :type dag_run: models.DagRun
+ :param session: the database session object
+ :type session: Session
+ """
tasks_to_run = {}
- failed = set()
- succeeded = set()
- started = {}
- skipped = set()
- not_ready = set()
- deadlocked = set()
-
- # create dag runs
- dr_start_date = start_date or min([t.start_date for t in self.dag.tasks])
- end_date = end_date or datetime.now()
- # next run date for a subdag isn't relevant (schedule_interval for subdags
- # is ignored) so we use the dag run's start date in the case of a subdag
- next_run_date = (self.dag.normalize_schedule(dr_start_date)
- if not self.dag.is_subdag else dr_start_date)
-
- active_dag_runs = []
- while next_run_date and next_run_date <= end_date:
- run_id = BackfillJob.ID_FORMAT_PREFIX.format(next_run_date.isoformat())
-
- # check if we are scheduling on top of a already existing dag_run
- # we could find a "scheduled" run instead of a "backfill"
- run = DagRun.find(dag_id=self.dag.dag_id,
- execution_date=next_run_date,
- session=session)
- if not run:
- run = self.dag.create_dagrun(
- run_id=run_id,
- execution_date=next_run_date,
- start_date=datetime.now(),
- state=State.RUNNING,
- external_trigger=False,
- session=session,
- )
- else:
- run = run[0]
- # set required transient field
- run.dag = self.dag
-
- # explictely mark running as we can fill gaps
- run.state = State.RUNNING
- run.run_id = run_id
- run.verify_integrity(session=session)
+ if dag_run is None:
+ return tasks_to_run
- self.reset_state_for_orphaned_tasks(filter_by_dag_run=run, session=session)
+ # check if we have orphaned tasks
+ self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)
- # for some reason if we dont refresh the reference to run is lost
- run.refresh_from_db()
- make_transient(run)
- active_dag_runs.append(run)
+ # for some reason if we don't refresh the reference to run is lost
+ dag_run.refresh_from_db()
+ make_transient(dag_run)
- for ti in run.get_task_instances():
- # all tasks part of the backfill are scheduled to run
- if ti.state == State.NONE:
- ti.set_state(State.SCHEDULED, session=session)
- tasks_to_run[ti.key] = ti
+ # TODO(edgarRd): AIRFLOW-1464 change to batch query to improve perf
+ for ti in dag_run.get_task_instances():
+ # all tasks part of the backfill are scheduled to run
+ if ti.state == State.NONE:
+ ti.set_state(State.SCHEDULED, session=session)
+ tasks_to_run[ti.key] = ti
+
+ return tasks_to_run
+
+ def _log_progress(self, ti_status):
+ msg = ' | '.join([
+ "[backfill progress]",
+ "finished run {0} of {1}",
+ "tasks waiting: {2}",
+ "succeeded: {3}",
+ "kicked_off: {4}",
+ "failed: {5}",
+ "skipped: {6}",
+ "deadlocked: {7}",
+ "not ready: {8}"
+ ]).format(
+ ti_status.finished_runs,
+ ti_status.total_runs,
+ len(ti_status.to_run),
+ len(ti_status.succeeded),
+ len(ti_status.started),
+ len(ti_status.failed),
+ len(ti_status.skipped),
+ len(ti_status.deadlocked),
+ len(ti_status.not_ready))
+ self.logger.info(msg)
+
+ self.logger.debug("Finished dag run loop iteration. "
+ "Remaining tasks {}"
+ .format(ti_status.to_run.values()))
- next_run_date = self.dag.following_schedule(next_run_date)
+ @provide_session
+ def _process_backfill_task_instances(self,
+ ti_status,
+ executor,
+ pickle_id,
+ start_date=None, session=None):
+ """
+ Process a set of task instances from a set of dag runs. Special handling is done
+ to account for different task instance states that could be present when running
+ them in a backfill process.
+ :param ti_status: the internal status of the job
+ :type ti_status: BackfillJob._DagRunTaskStatus
+ :param executor: the executor to run the task instances
+ :type executor: BaseExecutor
+ :param pickle_id: the pickle_id if dag is pickled, None otherwise
+ :type pickle_id: int
+ :param start_date: the start date of the backfill job
+ :type start_date: datetime
+ :param session: the current session object
+ :type session: Session
+ :return: the list of execution_dates for the finished dag runs
+ :rtype: list
+ """
- finished_runs = 0
- total_runs = len(active_dag_runs)
+ executed_run_dates = []
- # Triggering what is ready to get triggered
- while (len(tasks_to_run) > 0 or len(started) > 0) and not deadlocked:
+ while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
+ len(ti_status.deadlocked) == 0):
self.logger.debug("*** Clearing out not_ready list ***")
- not_ready.clear()
+ ti_status.not_ready.clear()
# we need to execute the tasks bottom to top
# or leaf to root, as otherwise tasks might be
# determined deadlocked while they are actually
# waiting for their upstream to finish
for task in self.dag.topological_sort():
- for key, ti in list(tasks_to_run.items()):
+ for key, ti in list(ti_status.to_run.items()):
if task.task_id != ti.task_id:
continue
@@ -2055,35 +2130,36 @@ def _execute(self):
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
- succeeded.add(key)
+ ti_status.succeeded.add(key)
self.logger.debug("Task instance {} succeeded. "
"Don't rerun.".format(ti))
- tasks_to_run.pop(key)
- if key in started:
- started.pop(key)
+ ti_status.to_run.pop(key)
+ if key in ti_status.started:
+ ti_status.started.pop(key)
continue
elif ti.state == State.SKIPPED:
- skipped.add(key)
+ ti_status.skipped.add(key)
self.logger.debug("Task instance {} skipped. "
"Don't rerun.".format(ti))
- tasks_to_run.pop(key)
- if key in started:
- started.pop(key)
+ ti_status.to_run.pop(key)
+ if key in ti_status.started:
+ ti_status.started.pop(key)
continue
elif ti.state == State.FAILED:
self.logger.error("Task instance {} failed".format(ti))
- failed.add(key)
- tasks_to_run.pop(key)
- if key in started:
- started.pop(key)
+ ti_status.failed.add(key)
+ ti_status.to_run.pop(key)
+ if key in ti_status.started:
+ ti_status.started.pop(key)
continue
elif ti.state == State.UPSTREAM_FAILED:
self.logger.error("Task instance {} upstream failed".format(ti))
- failed.add(key)
- tasks_to_run.pop(key)
- if key in started:
- started.pop(key)
+ ti_status.failed.add(key)
+ ti_status.to_run.pop(key)
+ if key in ti_status.started:
+ ti_status.started.pop(key)
continue
+
backfill_context = DepContext(
deps=RUN_DEPS,
ignore_depends_on_past=ignore_depends_on_past,
@@ -2113,31 +2189,31 @@ def _execute(self):
ignore_task_deps=self.ignore_task_deps,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool)
- started[key] = ti
- tasks_to_run.pop(key)
+ ti_status.started[key] = ti
+ ti_status.to_run.pop(key)
session.commit()
continue
if ti.state == State.UPSTREAM_FAILED:
self.logger.error("Task instance {} upstream failed".format(ti))
- failed.add(key)
- tasks_to_run.pop(key)
- if key in started:
- started.pop(key)
+ ti_status.failed.add(key)
+ ti_status.to_run.pop(key)
+ if key in ti_status.started:
+ ti_status.started.pop(key)
continue
# special case
if ti.state == State.UP_FOR_RETRY:
self.logger.debug("Task instance {} retry period not expired yet"
.format(ti))
- if key in started:
- started.pop(key)
- tasks_to_run[key] = ti
+ if key in ti_status.started:
+ ti_status.started.pop(key)
+ ti_status.to_run[key] = ti
continue
# all remaining tasks
self.logger.debug('Adding {} to not_ready'.format(ti))
- not_ready.add(key)
+ ti_status.not_ready.add(key)
# execute the tasks in the queue
self.heartbeat()
@@ -2146,68 +2222,45 @@ def _execute(self):
# If the set of tasks that aren't ready ever equals the set of
# tasks to run and there are no running tasks then the backfill
# is deadlocked
- if not_ready and not_ready == set(tasks_to_run) and len(started) == 0:
- self.logger.warning("Deadlock discovered for tasks_to_run={}"
- .format(tasks_to_run.values()))
- deadlocked.update(tasks_to_run.values())
- tasks_to_run.clear()
+ if (ti_status.not_ready and
+ ti_status.not_ready == set(ti_status.to_run) and
+ len(ti_status.started) == 0):
+ self.logger.warning("Deadlock discovered for ti_status.to_run={}"
+ .format(ti_status.to_run.values()))
+ ti_status.deadlocked.update(ti_status.to_run.values())
+ ti_status.to_run.clear()
# check executor state
- self._manage_executor_state(started)
+ self._manage_executor_state(ti_status.started)
# update the task counters
- self._update_counters(started=started, succeeded=succeeded,
- skipped=skipped, failed=failed,
- tasks_to_run=tasks_to_run)
+ self._update_counters(ti_status=ti_status)
# update dag run state
- _dag_runs = active_dag_runs[:]
+ _dag_runs = ti_status.active_runs[:]
for run in _dag_runs:
run.update_state(session=session)
if run.state in State.finished():
- finished_runs += 1
- active_dag_runs.remove(run)
+ ti_status.finished_runs += 1
+ ti_status.active_runs.remove(run)
+ executed_run_dates.append(run.execution_date)
if run.dag.is_paused:
models.DagStat.update([run.dag_id], session=session)
- msg = ' | '.join([
- "[backfill progress]",
- "finished run {0} of {1}",
- "tasks waiting: {2}",
- "succeeded: {3}",
- "kicked_off: {4}",
- "failed: {5}",
- "skipped: {6}",
- "deadlocked: {7}",
- "not ready: {8}"
- ]).format(
- finished_runs,
- total_runs,
- len(tasks_to_run),
- len(succeeded),
- len(started),
- len(failed),
- len(skipped),
- len(deadlocked),
- len(not_ready))
- self.logger.info(msg)
-
- self.logger.debug("Finished dag run loop iteration. "
- "Remaining tasks {}"
- .format(tasks_to_run.values()))
+ self._log_progress(ti_status)
- executor.end()
-
- session.commit()
- session.close()
+ # return updated status
+ return executed_run_dates
+ @provide_session
+ def _collect_errors(self, ti_status, session=None):
err = ''
- if failed:
+ if ti_status.failed:
err += (
"---------------------------------------------------\n"
- "Some task instances failed:\n{}\n".format(failed))
- if deadlocked:
+ "Some task instances failed:\n{}\n".format(ti_status.failed))
+ if ti_status.deadlocked:
err += (
'---------------------------------------------------\n'
'BackfillJob is deadlocked.')
@@ -2220,7 +2273,7 @@ def _execute(self):
dep_context=DepContext(ignore_depends_on_past=True),
session=session,
verbose=True)
- for t in deadlocked)
+ for t in ti_status.deadlocked)
if deadlocked_depends_on_past:
err += (
'Some of the deadlocked tasks were unable to run because '
@@ -2228,11 +2281,109 @@ def _execute(self):
'backfill with the option '
'"ignore_first_depends_on_past=True" or passing "-I" at '
'the command line.')
- err += ' These tasks have succeeded:\n{}\n'.format(succeeded)
- err += ' These tasks have started:\n{}\n'.format(started)
- err += ' These tasks have failed:\n{}\n'.format(failed)
- err += ' These tasks are skipped:\n{}\n'.format(skipped)
- err += ' These tasks are deadlocked:\n{}\n'.format(deadlocked)
+ err += ' These tasks have succeeded:\n{}\n'.format(ti_status.succeeded)
+ err += ' These tasks have started:\n{}\n'.format(ti_status.started)
+ err += ' These tasks have failed:\n{}\n'.format(ti_status.failed)
+ err += ' These tasks are skipped:\n{}\n'.format(ti_status.skipped)
+ err += ' These tasks are deadlocked:\n{}\n'.format(ti_status.deadlocked)
+
+ return err
+
+ @provide_session
+ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
+ start_date, session=None):
+ """
+ Computes the dag runs and their respective task instances for
+ the given run dates and executes the task instances.
+ Returns a list of execution dates of the dag runs that were executed.
+ :param run_dates: Execution dates for dag runs
+ :type run_dates: list
+ :param ti_status: internal BackfillJob status structure to tis track progress
+ :type ti_status: BackfillJob._DagRunTaskStatus
+ :param executor: the executor to use, it must be previously started
+ :type executor: BaseExecutor
+ :param pickle_id: numeric id of the pickled dag, None if not pickled
+ :type pickle_id: int
+ :param start_date: backfill start date
+ :type start_date: datetime
+ :param session: the current session object
+ :type session: Session
+ :return: list of execution dates of the dag runs that were executed.
+ :rtype: list
+ """
+ for next_run_date in run_dates:
+ dag_run = self._get_dag_run(next_run_date, session=session)
+ tis_map = self._task_instances_for_dag_run(dag_run,
+ session=session)
+ ti_status.active_runs.append(dag_run)
+ ti_status.to_run.update(tis_map or {})
+
+ ti_status.total_runs = len(ti_status.active_runs)
+
+ return self._process_backfill_task_instances(ti_status=ti_status,
+ executor=executor,
+ pickle_id=pickle_id,
+ start_date=start_date,
+ session=session)
+
+ def _execute(self):
+ """
+ Initializes all components required to run a dag for a specified date range and
+ calls helper method to execute the tasks.
+ """
+ session = settings.Session()
+ ti_status = BackfillJob._DagRunTaskStatus()
+
+ # consider max_active_runs but ignore when running subdags
+ # "parent.child" as a dag_id is by convention a subdag
+ if self.dag.schedule_interval and not self.dag.is_subdag:
+ all_active_runs = DagRun.find(
+ dag_id=self.dag.dag_id,
+ state=State.RUNNING,
+ external_trigger=False,
+ session=session
+ )
+
+ # return if already reached maximum active runs
+ if len(all_active_runs) >= self.dag.max_active_runs:
+ self.logger.info("Dag {} has reached maximum amount of {} dag runs"
+ .format(self.dag.dag_id, self.dag.max_active_runs))
+ return
+
+ start_date = self.bf_start_date
+
+ # Get intervals between the start/end dates, which will turn into dag runs
+ run_dates = self.dag.get_run_dates(start_date=start_date,
+ end_date=self.bf_end_date)
+ if len(run_dates) == 0:
+ self.logger.info("No run dates were found for the given dates and dag "
+ "interval.")
+ return
+
+ # picklin'
+ pickle_id = None
+ if not self.donot_pickle and self.executor.__class__ not in (
+ executors.LocalExecutor, executors.SequentialExecutor):
+ pickle = models.DagPickle(self.dag)
+ session.add(pickle)
+ session.commit()
+ pickle_id = pickle.id
+
+ executor = self.executor
+ executor.start()
+
+ self._execute_for_run_dates(run_dates=run_dates,
+ ti_status=ti_status,
+ executor=executor,
+ pickle_id=pickle_id,
+ start_date=start_date,
+ session=session)
+
+ executor.end()
+ session.commit()
+ session.close()
+
+ err = self._collect_errors(ti_status=ti_status, session=session)
if err:
raise AirflowException(err)
diff --git a/airflow/models.py b/airflow/models.py
index d1f8e59fe3430..cc54f36a3c981 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2883,6 +2883,37 @@ def previous_schedule(self, dttm):
elif isinstance(self._schedule_interval, timedelta):
return dttm - self._schedule_interval
+ def get_run_dates(self, start_date, end_date=None):
+ """
+ Returns a list of dates between the interval received as parameter using this
+ dag's schedule interval. Returned dates can be used for execution dates.
+ :param start_date: the start date of the interval
+ :type start_date: datetime
+ :param end_date: the end date of the interval, defaults to datetime.now()
+ :type end_date: datetime
+ :return: a list of dates within the interval following the dag's schedule
+ :rtype: list
+ """
+ run_dates = []
+
+ using_start_date = start_date
+ using_end_date = end_date
+
+ # dates for dag runs
+ using_start_date = using_start_date or min([t.start_date for t in self.tasks])
+ using_end_date = using_end_date or datetime.now()
+
+ # next run date for a subdag isn't relevant (schedule_interval for subdags
+ # is ignored) so we use the dag run's start date in the case of a subdag
+ next_run_date = (self.normalize_schedule(using_start_date)
+ if not self.is_subdag else using_start_date)
+
+ while next_run_date and next_run_date <= using_end_date:
+ run_dates.append(next_run_date)
+ next_run_date = self.following_schedule(next_run_date)
+
+ return run_dates
+
def normalize_schedule(self, dttm):
"""
Returns dttm + interval unless dttm is first interval then it returns dttm
diff --git a/docs/integration.rst b/docs/integration.rst
index c31d96417c725..30150ac41a3e4 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -3,8 +3,10 @@ Integration
- :ref:`Azure`
- :ref:`AWS`
+- :ref:`Databricks`
- :ref:`GCP`
+
.. _Azure:
Azure: Microsoft Azure
@@ -54,26 +56,33 @@ WasbHook
.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
+
+
.. _AWS:
AWS: Amazon Webservices
-----------------------
----
+To be filled.
+
+
.. _Databricks:
Databricks
---------------------------
+----------
+
`Databricks `_ has contributed an Airflow operator which enables
submitting runs to the Databricks platform. Internally the operator talks to the
``api/2.0/jobs/runs/submit`` `endpoint `_.
DatabricksSubmitRunOperator
-''''''''''''''''''''''''''''
+'''''''''''''''''''''''''''
.. autoclass:: airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator
+
+
.. _GCP:
GCP: Google Cloud Platform
@@ -87,7 +96,7 @@ BigQuery
''''''''
BigQuery Operators
-^^^^^^^^^^^^^^^^^^
+""""""""""""""""""
- :ref:`BigQueryCheckOperator` : Performs checks against a SQL query that will return a single row with different values.
- :ref:`BigQueryValueCheckOperator` : Performs a simple value check using SQL code.
@@ -100,48 +109,48 @@ BigQuery Operators
.. _BigQueryCheckOperator:
BigQueryCheckOperator
-"""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator
.. _BigQueryValueCheckOperator:
BigQueryValueCheckOperator
-""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator
.. _BigQueryIntervalCheckOperator:
BigQueryIntervalCheckOperator
-"""""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator
.. _BigQueryOperator:
BigQueryOperator
-""""""""""""""""
+^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. _BigQueryToBigQueryOperator:
BigQueryToBigQueryOperator
-""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator
.. _BigQueryToCloudStorageOperator:
BigQueryToCloudStorageOperator
-""""""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
BigQueryHook
-^^^^^^^^^^^^
+""""""""""""
.. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook
:members:
@@ -151,14 +160,15 @@ Cloud DataFlow
''''''''''''''
DataFlow Operators
-^^^^^^^^^^^^^^^^^^
+""""""""""""""""""
-- :ref:`DataFlowJavaOperator` :
+- :ref:`DataFlowJavaOperator` : launching Cloud Dataflow jobs written in Java.
+- :ref:`DataFlowPythonOperator` : launching Cloud Dataflow jobs written in python.
.. _DataFlowJavaOperator:
DataFlowJavaOperator
-""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator
@@ -196,8 +206,16 @@ DataFlowJavaOperator
},
dag=dag)
+.. _DataFlowPythonOperator:
+
+DataFlowPythonOperator
+^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator
+
+
DataFlowHook
-^^^^^^^^^^^^
+""""""""""""
.. autoclass:: airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook
:members:
@@ -208,7 +226,7 @@ Cloud DataProc
''''''''''''''
DataProc Operators
-^^^^^^^^^^^^^^^^^^
+""""""""""""""""""
- :ref:`DataProcPigOperator` : Start a Pig query Job on a Cloud DataProc cluster.
- :ref:`DataProcHiveOperator` : Start a Hive query Job on a Cloud DataProc cluster.
@@ -220,35 +238,35 @@ DataProc Operators
.. _DataProcPigOperator:
DataProcPigOperator
-"""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPigOperator
.. _DataProcHiveOperator:
DataProcHiveOperator
-""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHiveOperator
.. _DataProcSparkSqlOperator:
DataProcSparkSqlOperator
-""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator
.. _DataProcSparkOperator:
DataProcSparkOperator
-"""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcSparkOperator
.. _DataProcHadoopOperator:
DataProcHadoopOperator
-""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator
@@ -264,20 +282,18 @@ DataProcPySparkOperator
Cloud Datastore
'''''''''''''''
-Datastore Operators
-^^^^^^^^^^^^^^^^^^^
-
DatastoreHook
-~~~~~~~~~~~~~
+"""""""""""""
.. autoclass:: airflow.contrib.hooks.datastore_hook.DatastoreHook
:members:
+
Cloud ML Engine
'''''''''''''''
Cloud ML Engine Operators
-^^^^^^^^^^^^^^^^^^^^^^^^^
+"""""""""""""""""""""""""
- :ref:`CloudMLBatchPredictionOperator` : Start a Cloud ML Engine batch prediction job.
- :ref:`CloudMLModelOperator` : Manages a Cloud ML Engine model.
@@ -287,7 +303,7 @@ Cloud ML Engine Operators
.. _CloudMLBatchPredictionOperator:
CloudMLBatchPredictionOperator
-""""""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.cloudml_operator.CloudMLBatchPredictionOperator
:members:
@@ -295,7 +311,7 @@ CloudMLBatchPredictionOperator
.. _CloudMLModelOperator:
CloudMLModelOperator
-""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.cloudml_operator.CloudMLModelOperator
:members:
@@ -303,7 +319,7 @@ CloudMLModelOperator
.. _CloudMLTrainingOperator:
CloudMLTrainingOperator
-"""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.cloudml_operator.CloudMLTrainingOperator
:members:
@@ -311,16 +327,28 @@ CloudMLTrainingOperator
.. _CloudMLVersionOperator:
CloudMLVersionOperator
-"""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.cloudml_operator.CloudMLVersionOperator
:members:
+Cloud ML Engine Hook
+""""""""""""""""""""
+
+.. _CloudMLHook:
+
+CloudMLHook
+^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.hooks.gcp_cloudml_hook.CloudMLHook
+ :members:
+
+
Cloud Storage
'''''''''''''
Storage Operators
-^^^^^^^^^^^^^^^^^
+"""""""""""""""""
- :ref:`GoogleCloudStorageDownloadOperator` : Downloads a file from Google Cloud Storage.
- :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery.
@@ -328,7 +356,7 @@ Storage Operators
.. _GoogleCloudStorageDownloadOperator:
GoogleCloudStorageDownloadOperator
-""""""""""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
:members:
@@ -336,15 +364,14 @@ GoogleCloudStorageDownloadOperator
.. _GoogleCloudStorageToBigQueryOperator:
GoogleCloudStorageToBigQueryOperator
-""""""""""""""""""""""""""""""""""""
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
:members:
GoogleCloudStorageHook
-^^^^^^^^^^^^^^^^^^^^^^
+""""""""""""""""""""""
.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
:members:
-
diff --git a/tests/jobs.py b/tests/jobs.py
index c9ab742c1cf16..fa27b4681d373 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -71,6 +71,7 @@
TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')
+
class BackfillJobTest(unittest.TestCase):
def setUp(self):
@@ -302,6 +303,72 @@ def test_cli_backfill_depends_on_past(self):
self.assertEqual(ti.state, State.SUCCESS)
dag.clear()
+ def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1):
+ dag = DAG(
+ dag_id=dag_id,
+ start_date=DEFAULT_DATE,
+ schedule_interval="@hourly",
+ max_active_runs=max_active_runs
+ )
+
+ with dag:
+ op1 = DummyOperator(task_id='leave1')
+ op2 = DummyOperator(task_id='leave2')
+ op3 = DummyOperator(task_id='upstream_level_1')
+ op4 = DummyOperator(task_id='upstream_level_2')
+
+ op1 >> op2 >> op3
+ op4 >> op3
+
+ dag.clear()
+ return dag
+
+ def test_backfill_max_limit_check_within_limit(self):
+ dag = self._get_dag_test_max_active_limits(
+ 'test_backfill_max_limit_check_within_limit',
+ max_active_runs=16)
+
+ start_date = DEFAULT_DATE - datetime.timedelta(hours=3)
+ end_date = DEFAULT_DATE
+
+ executor = TestExecutor(do_update=True)
+ job = BackfillJob(dag=dag,
+ start_date=start_date,
+ end_date=end_date,
+ executor=executor,
+ donot_pickle=True)
+ job.run()
+
+ # dag run could not run since the max_active_runs has been reached
+ dagruns = DagRun.find(dag_id=dag.dag_id)
+ self.assertEqual(4, len(dagruns))
+ self.assertTrue(all([run.state == State.SUCCESS for run in dagruns]))
+
+ def test_backfill_max_limit_check(self):
+ dag = self._get_dag_test_max_active_limits('test_backfill_max_limit_check')
+
+ start_date = DEFAULT_DATE - datetime.timedelta(hours=3)
+ end_date = DEFAULT_DATE
+
+ # Existing dagrun that is not within the backfill range
+ dr = dag.create_dagrun(run_id="test_dagrun",
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+ start_date=DEFAULT_DATE)
+
+ executor = TestExecutor(do_update=True)
+ job = BackfillJob(dag=dag,
+ start_date=start_date,
+ end_date=end_date,
+ executor=executor,
+ donot_pickle=True)
+ job.run()
+
+ # dag run could not run since the max_active_runs has been reached
+ dagruns = DagRun.find(dag_id=dag.dag_id)
+ self.assertEqual(1, len(dagruns))
+ self.assertEqual(dagruns[0].run_id, dr.run_id)
+
def test_sub_set_subdag(self):
dag = DAG(
'test_sub_set_subdag',
@@ -467,70 +534,84 @@ def test_update_counters(self):
ti = TI(task1, dr.execution_date)
ti.refresh_from_db()
- started = {}
- tasks_to_run = {}
- failed = set()
- succeeded = set()
- started = {}
- skipped = set()
+ ti_status = BackfillJob._DagRunTaskStatus()
# test for success
ti.set_state(State.SUCCESS, session)
- started[ti.key] = ti
- job._update_counters(started=started, succeeded=succeeded,
- skipped=skipped, failed=failed,
- tasks_to_run=tasks_to_run)
- self.assertTrue(len(started) == 0)
- self.assertTrue(len(succeeded) == 1)
- self.assertTrue(len(skipped) == 0)
- self.assertTrue(len(failed) == 0)
- self.assertTrue(len(tasks_to_run) == 0)
-
- succeeded.clear()
+ ti_status.started[ti.key] = ti
+ job._update_counters(ti_status=ti_status)
+ self.assertTrue(len(ti_status.started) == 0)
+ self.assertTrue(len(ti_status.succeeded) == 1)
+ self.assertTrue(len(ti_status.skipped) == 0)
+ self.assertTrue(len(ti_status.failed) == 0)
+ self.assertTrue(len(ti_status.to_run) == 0)
+
+ ti_status.succeeded.clear()
# test for skipped
ti.set_state(State.SKIPPED, session)
- started[ti.key] = ti
- job._update_counters(started=started, succeeded=succeeded,
- skipped=skipped, failed=failed,
- tasks_to_run=tasks_to_run)
- self.assertTrue(len(started) == 0)
- self.assertTrue(len(succeeded) == 0)
- self.assertTrue(len(skipped) == 1)
- self.assertTrue(len(failed) == 0)
- self.assertTrue(len(tasks_to_run) == 0)
-
- skipped.clear()
+ ti_status.started[ti.key] = ti
+ job._update_counters(ti_status=ti_status)
+ self.assertTrue(len(ti_status.started) == 0)
+ self.assertTrue(len(ti_status.succeeded) == 0)
+ self.assertTrue(len(ti_status.skipped) == 1)
+ self.assertTrue(len(ti_status.failed) == 0)
+ self.assertTrue(len(ti_status.to_run) == 0)
+
+ ti_status.skipped.clear()
# test for failed
ti.set_state(State.FAILED, session)
- started[ti.key] = ti
- job._update_counters(started=started, succeeded=succeeded,
- skipped=skipped, failed=failed,
- tasks_to_run=tasks_to_run)
- self.assertTrue(len(started) == 0)
- self.assertTrue(len(succeeded) == 0)
- self.assertTrue(len(skipped) == 0)
- self.assertTrue(len(failed) == 1)
- self.assertTrue(len(tasks_to_run) == 0)
-
- failed.clear()
+ ti_status.started[ti.key] = ti
+ job._update_counters(ti_status=ti_status)
+ self.assertTrue(len(ti_status.started) == 0)
+ self.assertTrue(len(ti_status.succeeded) == 0)
+ self.assertTrue(len(ti_status.skipped) == 0)
+ self.assertTrue(len(ti_status.failed) == 1)
+ self.assertTrue(len(ti_status.to_run) == 0)
+
+ ti_status.failed.clear()
# test for reschedule
# test for failed
ti.set_state(State.NONE, session)
- started[ti.key] = ti
- job._update_counters(started=started, succeeded=succeeded,
- skipped=skipped, failed=failed,
- tasks_to_run=tasks_to_run)
- self.assertTrue(len(started) == 0)
- self.assertTrue(len(succeeded) == 0)
- self.assertTrue(len(skipped) == 0)
- self.assertTrue(len(failed) == 0)
- self.assertTrue(len(tasks_to_run) == 1)
+ ti_status.started[ti.key] = ti
+ job._update_counters(ti_status=ti_status)
+ self.assertTrue(len(ti_status.started) == 0)
+ self.assertTrue(len(ti_status.succeeded) == 0)
+ self.assertTrue(len(ti_status.skipped) == 0)
+ self.assertTrue(len(ti_status.failed) == 0)
+ self.assertTrue(len(ti_status.to_run) == 1)
session.close()
+ def test_dag_get_run_dates(self):
+
+ def get_test_dag_for_backfill(schedule_interval=None):
+ dag = DAG(
+ dag_id='test_get_dates',
+ start_date=DEFAULT_DATE,
+ schedule_interval=schedule_interval)
+ DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+ return dag
+
+ test_dag = get_test_dag_for_backfill()
+ self.assertEqual([DEFAULT_DATE], test_dag.get_run_dates(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE))
+
+ test_dag = get_test_dag_for_backfill(schedule_interval="@hourly")
+ self.assertEqual([DEFAULT_DATE - datetime.timedelta(hours=3),
+ DEFAULT_DATE - datetime.timedelta(hours=2),
+ DEFAULT_DATE - datetime.timedelta(hours=1),
+ DEFAULT_DATE],
+ test_dag.get_run_dates(
+ start_date=DEFAULT_DATE - datetime.timedelta(hours=3),
+ end_date=DEFAULT_DATE,))
+
class LocalTaskJobTest(unittest.TestCase):
def setUp(self):
@@ -558,7 +639,9 @@ def test_localtaskjob_heartbeat(self, is_descendant):
ti.hostname = "blablabla"
session.commit()
- job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+ job1 = LocalTaskJob(task_instance=ti,
+ ignore_ti_state=True,
+ executor=SequentialExecutor())
self.assertRaises(AirflowException, job1.heartbeat_callback)
is_descendant.return_value = True
@@ -597,7 +680,9 @@ def test_localtaskjob_double_trigger(self):
session.commit()
ti_run = TI(task=task, execution_date=DEFAULT_DATE)
- job1 = LocalTaskJob(task_instance=ti_run, ignore_ti_state=True, executor=SequentialExecutor())
+ job1 = LocalTaskJob(task_instance=ti_run,
+ ignore_ti_state=True,
+ executor=SequentialExecutor())
self.assertRaises(AirflowException, job1.run)
ti = dr.get_task_instance(task_id=task.task_id, session=session)
@@ -755,7 +840,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self):
res_keys = map(lambda x: x.key, res)
self.assertIn(ti_no_dagrun.key, res_keys)
self.assertIn(ti_with_dagrun.key, res_keys)
-
+
def test_find_executable_task_instances_pool(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_pool'
task_id_1 = 'dummy'
@@ -2323,7 +2408,7 @@ def test_list_py_file_paths(self):
for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
detected_files.append(file_path)
self.assertEqual(sorted(detected_files), sorted(expected_files))
-
+
def test_reset_orphaned_tasks_nothing(self):
"""Try with nothing. """
scheduler = SchedulerJob(**self.default_scheduler_args)