Skip to content

Commit

Permalink
Defines what happens when a chord task fails. Closes celery#1172
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jan 31, 2013
1 parent 0ac62c1 commit e8515ed
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 16 deletions.
25 changes: 18 additions & 7 deletions celery/app/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,29 @@ def add_unlock_chord_task(app):
"""
from celery.canvas import subtask
from celery import result as _res
from celery.exceptions import ChordError

@app.task(name='celery.chord_unlock', max_retries=None,
default_retry_delay=1, ignore_result=True, _force_evaluate=True)
def unlock_chord(group_id, callback, interval=None, propagate=False,
def unlock_chord(group_id, callback, interval=None, propagate=True,
max_retries=None, result=None):
if interval is None:
interval = unlock_chord.default_retry_delay
result = _res.GroupResult(group_id, map(_res.AsyncResult, result))
j = result.join_native if result.supports_native_join else result.join
if result.ready():
subtask(callback).delay(j(propagate=propagate))
deps = app.GroupResult(group_id, map(app.AsyncResult, result))
j = deps.join_native if deps.supports_native_join else deps.join
if deps.ready():
callback = subtask(callback)
try:
ret = j(propagate=propagate)
except Exception, exc:
culprit = deps._failed_join_report().next()

app._tasks[callback.task].backend.fail_from_current_stack(
callback.id, exc=ChordError('Dependency %s raised %r' % (
culprit.id, exc)),
)
else:
callback.delay(ret)
else:
return unlock_chord.retry(countdown=interval,
max_retries=max_retries)
Expand Down Expand Up @@ -274,7 +285,7 @@ class Chord(app.Task):
ignore_result = False

def run(self, header, body, partial_args=(), interval=1, countdown=1,
max_retries=None, propagate=False, eager=False, **kwargs):
max_retries=None, propagate=True, eager=False, **kwargs):
group_id = uuid()
AsyncResult = self.app.AsyncResult
prepare_member = self._prepare_member
Expand Down
41 changes: 35 additions & 6 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

from datetime import timedelta

from billiard.einfo import ExceptionInfo
from kombu import serialization
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8

from celery import states
from celery.app import current_task
from celery.datastructures import LRUCache
from celery.exceptions import TimeoutError, TaskRevokedError
from celery.exceptions import ChordError, TaskRevokedError, TimeoutError
from celery.result import from_serializable, GroupResult
from celery.utils import timeutils
from celery.utils.serialization import (
Expand Down Expand Up @@ -112,6 +113,16 @@ def mark_as_failure(self, task_id, exc, traceback=None):
return self.store_result(task_id, exc, status=states.FAILURE,
traceback=traceback)

def fail_from_current_stack(self, task_id, exc=None):
type_, real_exc, tb = sys.exc_info()
try:
exc = real_exc if exc is None else exc
ei = ExceptionInfo((type_, exc, tb))
self.mark_as_failure(task_id, exc, ei.traceback)
return ei
finally:
del(tb)

def mark_as_retry(self, task_id, exc, traceback=None):
"""Mark task as being retries. Stores the current
exception (if any)."""
Expand Down Expand Up @@ -226,7 +237,7 @@ def reload_group_result(self, task_id):
raise NotImplementedError(
'reload_group_result is not supported by this backend.')

def on_chord_part_return(self, task, propagate=False):
def on_chord_part_return(self, task, propagate=True):
pass

def fallback_chord_unlock(self, group_id, body, result=None,
Expand All @@ -245,6 +256,9 @@ def current_task_children(self):
def __reduce__(self, args=(), kwargs={}):
return (unpickle_backend, (self.__class__, args, kwargs))

def is_cached(self, task_id):
return False


class BaseDictBackend(BaseBackend):

Expand All @@ -253,6 +267,9 @@ def __init__(self, *args, **kwargs):
self._cache = LRUCache(limit=kwargs.get('max_cached_results') or
self.app.conf.CELERY_MAX_CACHED_RESULTS)

def is_cached(self, task_id):
return task_id in self._cache

def store_result(self, task_id, result, status, traceback=None, **kwargs):
"""Store task result and status."""
result = self.encode_result(result, status)
Expand Down Expand Up @@ -463,7 +480,7 @@ def on_chord_apply(self, group_id, body, result=None, **kwargs):
else:
self.fallback_chord_unlock(group_id, body, result, **kwargs)

def on_chord_part_return(self, task, propagate=False):
def on_chord_part_return(self, task, propagate=True):
if not self.implements_incr:
return
from celery import subtask
Expand All @@ -475,9 +492,21 @@ def on_chord_part_return(self, task, propagate=False):
deps = GroupResult.restore(gid, backend=task.backend)
val = self.incr(key)
if val >= len(deps):
subtask(task.request.chord).delay(deps.join(propagate=propagate))
deps.delete()
self.client.delete(key)
j = deps.join_native if deps.supports_native_join else deps.join
callback = subtask(task.request.chord)
try:
ret = j(propagate=propagate)
except Exception, exc:
culprit = deps._failed_join_report().next()
self.app._tasks[callback.task].backend.fail_from_current_stack(
callback.id, exc=ChordError('Dependency %s raised %r' % (
culprit.id, exc))
)
else:
callback.delay(ret)
finally:
deps.delete()
self.client.delete(key)
else:
self.expire(key, 86400)

Expand Down
4 changes: 4 additions & 0 deletions celery/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ class CDeprecationWarning(DeprecationWarning):

class IncompleteStream(Exception):
"""Found the end of a stream of data, but the data is not yet complete."""


class ChordError(Exception):
"""A task part of the chord raised an exception."""
6 changes: 6 additions & 0 deletions celery/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,12 @@ def join_native(self, timeout=None, propagate=True, interval=0.5):
acc[results.index(task_id)] = meta['result']
return acc

def _failed_join_report(self):
for res in self.results:
if (res.backend.is_cached(res.id) and
res.state in states.PROPAGATE_STATES):
yield res

def __len__(self):
return len(self.results)

Expand Down
44 changes: 41 additions & 3 deletions docs/userguide/canvas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ get the sum of the resulting numbers::
>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.subtask((i, i))
... for i in xrange(100))(tsum.subtask()).get()
>>> chord(add.s(i, i)
... for i in xrange(100))(tsum.s()).get()
9900


Expand All @@ -706,7 +706,9 @@ The synchronization step is costly, so you should avoid using chords as much
as possible. Still, the chord is a powerful primitive to have in your toolbox
as synchronization is a required step for many parallel algorithms.

Let's break the chord expression down::
Let's break the chord expression down:

.. code-block:: python
>>> callback = tsum.subtask()
>>> header = [add.subtask((i, i)) for i in xrange(100)]
Expand All @@ -722,6 +724,42 @@ the return value of each task in the header. The task id returned by
and get the final return value (but remember to :ref:`never have a task wait
for other tasks <task-synchronous-subtasks>`)

Error handling
~~~~~~~~~~~~~~

.. versionadded:: 3.0.14

So what happens if one of the tasks raises an exception?

Errors will propagate to the callback, so the callback will not be executed
instead the callback changes to failure state, and the error is set
to the :exc:`~celery.exceptions.ChordError` exception:

.. code-block:: python
>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "*/celery/result.py", line 120, in get
interval=interval)
File "*/celery/backends/amqp.py", line 150, in wait_for
raise self.exception_to_python(meta['result'])
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
raised ValueError('something something',)
While the traceback may be different depending on which result backend is
being used, you can see the error description includes the id of the task that failed
and a string representation of the original exception. You can also
find the original traceback in ``result.traceback``.

Note that the rest of the tasks will still execute, so the third task
(``add.s(8, 8)``) is still executed even though the middle task failed.
Also the :exc:`~celery.exceptions.ChordError` only shows the task that failed
first (in time): it does not respect the ordering of the header group.

.. _chord-important-notes:

Important Notes
Expand Down

0 comments on commit e8515ed

Please sign in to comment.