Skip to content

Commit

Permalink
Add task_internal_error signal (celery#6049)
Browse files Browse the repository at this point in the history
* Add internal_error signal

There is no special signal for an out of body error which can be the
result of a bad result backend.

* Fix syntax error.

* Document the task_internal_error signal.

Co-authored-by: Laurentiu Dragan <[email protected]>
  • Loading branch information
Omer Katz and ldragan authored Jun 2, 2020
1 parent 1561cad commit f3e31b9
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 45 deletions.
23 changes: 23 additions & 0 deletions celery/app/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ def trace_task(uuid, args, kwargs, request=None):
except MemoryError:
raise
except Exception as exc:
_signal_internal_error(task, uuid, args, kwargs, request, exc)
if eager:
raise
R = report_internal_error(task, exc)
Expand All @@ -541,9 +542,31 @@ def trace_task(task, uuid, args, kwargs, request=None, **opts):
task.__trace__ = build_tracer(task.name, task, **opts)
return task.__trace__(uuid, args, kwargs, request)
except Exception as exc:
_signal_internal_error(task, uuid, args, kwargs, request, exc)
return trace_ok_t(report_internal_error(task, exc), None, 0.0, None)


def _signal_internal_error(task, uuid, args, kwargs, request, exc):
"""Send a special `internal_error` signal to the app for outside body errors"""
try:
_, _, tb = sys.exc_info()
einfo = ExceptionInfo()
einfo.exception = get_pickleable_exception(einfo.exception)
einfo.type = get_pickleable_etype(einfo.type)
signals.task_internal_error.send(
sender=task,
task_id=uuid,
args=args,
kwargs=kwargs,
request=request,
exception=exc,
traceback=tb,
einfo=einfo,
)
finally:
del tb


def _trace_task_ret(name, uuid, request, body, content_type,
content_encoding, loads=loads_message, app=None,
**extra_request):
Expand Down
8 changes: 7 additions & 1 deletion celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .utils.dispatch import Signal

__all__ = (
'before_task_publish', 'after_task_publish',
'before_task_publish', 'after_task_publish', 'task_internal_error',
'task_prerun', 'task_postrun', 'task_success',
'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
'celeryd_after_setup', 'worker_init', 'worker_process_init',
Expand Down Expand Up @@ -65,6 +65,12 @@
'task_id', 'exception', 'args', 'kwargs', 'traceback', 'einfo',
},
)
task_internal_error = Signal(
name='task_internal_error',
providing_args={
'task_id', 'args', 'kwargs', 'request', 'exception', 'traceback', 'einfo'
}
)
task_revoked = Signal(
name='task_revoked',
providing_args={
Expand Down
43 changes: 42 additions & 1 deletion docs/userguide/signals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,46 @@ Provides arguments:

The :class:`billiard.einfo.ExceptionInfo` instance.

.. signal:: task_received
``task_internal_error``
~~~~~~~~~~~~~~~~~~~~~~~

Dispatched when an internal Celery error occurs while executing the task.

Sender is the task object executed.

.. signal:: task_internal_error

Provides arguments:

* ``task_id``

Id of the task.

* ``args``

Positional arguments the task was called with.

* ``kwargs``

Keyword arguments the task was called with.

* ``request``

The original request dictionary.
This is provided as the ``task.request`` may not be ready by the time
the exception is raised.

* ``exception``

Exception instance raised.

* ``traceback``

Stack trace object.

* ``einfo``

The :class:`billiard.einfo.ExceptionInfo` instance.

``task_received``
~~~~~~~~~~~~~~~~~
Expand All @@ -298,6 +337,8 @@ Dispatched when a task is received from the broker and is ready for execution.

Sender is the consumer object.

.. signal:: task_received

Provides arguments:

* ``request``
Expand Down
Loading

0 comments on commit f3e31b9

Please sign in to comment.