Skip to content

Commit

Permalink
StampingVisitor on_signature() required returning a key with the li…
Browse files Browse the repository at this point in the history
…st of stamped header

keys. It will now implicity assume all given keys are the stamped header keys, if not
overriden by an explicit "stamped_headers" key in the returned value (like it required
before this patch)
  • Loading branch information
Nusnus authored and auvipy committed Oct 30, 2022
1 parent 914efb0 commit 53dd65e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
5 changes: 4 additions & 1 deletion celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,10 @@ def stamp(self, visitor=None, **headers):
"""
headers = headers.copy()
if visitor is not None:
headers.update(visitor.on_signature(self, **headers))
visitor_headers = visitor.on_signature(self, **headers)
if "stamped_headers" not in visitor_headers:
visitor_headers["stamped_headers"] = list(visitor_headers.keys())
headers.update(visitor_headers)
else:
headers["stamped_headers"] = [header for header in headers.keys() if header not in self.options]
_merge_dictionaries(headers, self.options)
Expand Down
17 changes: 15 additions & 2 deletions docs/userguide/canvas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1232,9 +1232,22 @@ the external monitoring system.
class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': uuid4(), 'stamped_headers': ['monitoring_id']}
return {'monitoring_id': uuid4().hex}
Next, lets see how to use the ``MonitoringIdStampingVisitor`` stamping visitor.
.. note::

The ``stamped_headers`` key returned in ``on_signature`` is used to specify the headers that will be
stamped on the task. If this key is not specified, the stamping visitor will assume all keys in the
returned dictionary are the stamped headers from the visitor.
This means the following code block will result in the same behavior as the previous example.

.. code-block:: python
class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': uuid4().hex, 'stamped_headers': ['monitoring_id']}
Next, lets see how to use the ``MonitoringIdStampingVisitor`` example stamping visitor.

.. code-block:: python
Expand Down
2 changes: 1 addition & 1 deletion t/integration/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_revoked_by_headers_simple_canvas(self, manager):

class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': target_monitoring_id, 'stamped_headers': ['monitoring_id']}
return {'monitoring_id': target_monitoring_id}

for monitoring_id in [target_monitoring_id, uuid4().hex, 4242, None]:
stamped_task = add.si(1, 1)
Expand Down
43 changes: 41 additions & 2 deletions t/unit/tasks/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import pytest_subtests # noqa: F401

from celery._state import _task_stack
from celery.canvas import (Signature, _chain, _maybe_group, chain, chord, chunks, group, maybe_signature,
maybe_unroll_group, signature, xmap, xstarmap)
from celery.canvas import (Signature, StampingVisitor, _chain, _maybe_group, chain, chord, chunks, group,
maybe_signature, maybe_unroll_group, signature, xmap, xstarmap)
from celery.result import AsyncResult, EagerResult, GroupResult

SIG = Signature({
Expand Down Expand Up @@ -190,6 +190,45 @@ def test_manual_stamping(self):
sig_1.apply()
assert sorted(sig_1_res._get_task_meta()['groups']) == sorted(stamps)

def test_custom_stamping_visitor(self, subtests):
"""
Test manual signature stamping with a custom visitor class.
"""
self.app.conf.task_always_eager = True
self.app.conf.task_store_eager_result = True
self.app.conf.result_extended = True

class CustomStampingVisitor1(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
# without using stamped_headers key explicitly
# the key will be calculated from the headers implicitly
return {'header': 'value'}

class CustomStampingVisitor2(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'header': 'value', 'stamped_headers': ['header']}

sig_1 = self.add.s(2, 2)
sig_1.stamp(visitor=CustomStampingVisitor1())
sig_1_res = sig_1.freeze()
sig_1.apply()
sig_2 = self.add.s(2, 2)
sig_2.stamp(visitor=CustomStampingVisitor2())
sig_2_res = sig_2.freeze()
sig_2.apply()

with subtests.test("sig_1 is stamped with custom visitor", stamped_headers=["header", "groups"]):
assert sorted(sig_1_res._get_task_meta()["stamped_headers"]) == sorted(["header", "groups"])

with subtests.test("sig_2 is stamped with custom visitor", stamped_headers=["header", "groups"]):
assert sorted(sig_2_res._get_task_meta()["stamped_headers"]) == sorted(["header", "groups"])

with subtests.test("sig_1 is stamped with custom visitor", header=["value"]):
assert sig_1_res._get_task_meta()["header"] == ["value"]

with subtests.test("sig_2 is stamped with custom visitor", header=["value"]):
assert sig_2_res._get_task_meta()["header"] == ["value"]

def test_getitem_property_class(self):
assert Signature.task
assert Signature.args
Expand Down

0 comments on commit 53dd65e

Please sign in to comment.