Skip to content

Commit

Permalink
Canvas Header Stamping (celery#7384)
Browse files Browse the repository at this point in the history
* Strip down the header-stamping PR to the basics.

* Serialize groups.

* Add groups to result backend meta data.

* Fix spelling mistake.

* Revert changes to canvas.py

* Revert changes to app/base.py

* Add stamping implementation to canvas.py

* Send task to AMQP with groups.

* Successfully pass single group to result.

* _freeze_gid dict merge fixed

* First draft of the visitor API.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* OptionsVisitor created

* Fixed canvas.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added test for simple test for chord and fixed chord implementation

* Changed _IMMUTABLE_OPTIONS

* Fixed chord interface

* Fixed chord interface

* Fixed chord interface

* Fixed chord interface

* Fixed list order

* Fixed tests (stamp test and chord test), fixed order in groups

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fixed lint and elements

* Changed implementation of stamp API and fix lint

* Added documentation to Stamping API. Added chord with groups test

* Implemented stamping inside replace and added test for an implementation

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added test additonal tests for chord, improved coverage

* Added test additonal tests for chord, improved coverage

* Added test additonal tests for chord, improved coverage

* Splitted into subtests

* Group stamping rollback

* group.id is None fixed

* Added integration test

* Added integration test

* apply_async fixed

* Integration test and test_chord fixed

* Lint fixed

* chord freeze fixed

* Minor fixes.

* Chain apply_async fixed and tests fixed

* lint fixed

* Added integration test for chord

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* type -> isinstance

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Redo header stamping (celery#7341)

* _freeze_gid dict merge fixed

* OptionsVisitor created

* Fixed canvas.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added test for simple test for chord and fixed chord implementation

* Changed _IMMUTABLE_OPTIONS

* Fixed chord interface

* Fixed chord interface

* Fixed chord interface

* Fixed chord interface

* Fixed list order

* Fixed tests (stamp test and chord test), fixed order in groups

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fixed lint and elements

* Changed implementation of stamp API and fix lint

* Added documentation to Stamping API. Added chord with groups test

* Implemented stamping inside replace and added test for an implementation

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added test additonal tests for chord, improved coverage

* Added test additonal tests for chord, improved coverage

* Added test additonal tests for chord, improved coverage

* Splitted into subtests

* Group stamping rollback

* group.id is None fixed

* Added integration test

* Added integration test

* apply_async fixed

* Integration test and test_chord fixed

* Lint fixed

* chord freeze fixed

* Minor fixes.

* Chain apply_async fixed and tests fixed

* lint fixed

* Added integration test for chord

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* type -> isinstance

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Omer Katz <[email protected]>

* Added stamping mechanism

* Manual stamping improved

* flake8 fixed

* Added subtests

* Add comma.

* Moved groups to stamps

* Fixed chord and added test for that

* Strip down the header-stamping PR to the basics.

* Serialize groups.

* Add groups to result backend meta data.

* Fix spelling mistake.

* Revert changes to canvas.py

* Revert changes to app/base.py

* Add stamping implementation to canvas.py

* Send task to AMQP with groups.

* Successfully pass single group to result.

* _freeze_gid dict merge fixed

* First draft of the visitor API.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* OptionsVisitor created

* Fixed canvas.py

* Added test for simple test for chord and fixed chord implementation

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Changed _IMMUTABLE_OPTIONS

* Fixed chord interface

* Fixed chord interface

* Fixed chord interface

* Fixed chord interface

* Fixed list order

* Fixed tests (stamp test and chord test), fixed order in groups

* Fixed lint and elements

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Changed implementation of stamp API and fix lint

* Added documentation to Stamping API. Added chord with groups test

* Implemented stamping inside replace and added test for an implementation

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added test additonal tests for chord, improved coverage

* Added test additonal tests for chord, improved coverage

* Added test additonal tests for chord, improved coverage

* Splitted into subtests

* Group stamping rollback

* group.id is None fixed

* Added integration test

* Added integration test

* apply_async fixed

* Integration test and test_chord fixed

* Lint fixed

* chord freeze fixed

* Minor fixes.

* Chain apply_async fixed and tests fixed

* lint fixed

* Added integration test for chord

* type -> isinstance

* Added stamping mechanism

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Manual stamping improved

* fail_ci_if_error uncommented

* flake8 fixed

* Added subtests

* Changes

* Add comma.

* Fixed chord and added test for that

* canvas.py fixed

* Test chord.py fixed

* Fixed stamped_headers

* collections import fixed

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* collections import fixed

* Update celery/backends/base.py

Co-authored-by: Omer Katz <[email protected]>

* ampq.py fixed

* Refrain from using deprecated import path.

* Fix test_complex_chain regression.

Whenever we stamp a group we need to freeze it first if it wasn't already frozen.
Somewhere along the line, the group id changed because we were freezing twice.
This commit places the stamping operation after preparing the chain's steps which fixes the problem somehow.

We don't know why yet.

* Fixed integration tests

* Fixed integration tests

* Fixed integration tests

* Fixed integration tests

* Fixed issues with maybe_list. Add documentation

* Fixed potential issue with integration tests

* Fixed issues with _regen

* Fixed issues with _regen

* Fixed test_generator issues

* Fixed _regen stamping

* Fixed _regen stamping

* Fixed TimeOut issue

* Fixed TimeOut issue

* Fixed TimeOut issue

* Update docs/userguide/canvas.rst

Co-authored-by: Omer Katz <[email protected]>

* Fixed Couchbase

* Better stamping intro

* New GroupVisitor example

* Adjust documentation.

Co-authored-by: Naomi Elstein <[email protected]>
Co-authored-by: Omer Katz <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <[email protected]>
Co-authored-by: Omer Katz <[email protected]>
  • Loading branch information
6 people authored Jun 29, 2022
1 parent 34fc87c commit 1c4ff33
Show file tree
Hide file tree
Showing 15 changed files with 1,195 additions and 157 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ jobs:
- name: Install apt packages
run: |
sudo apt update && sudo apt-get install -f libcurl4-openssl-dev libssl-dev libgnutls28-dev httping expect libmemcached-dev
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
Expand Down
45 changes: 26 additions & 19 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None,
time_limit=None, soft_time_limit=None,
create_sent_event=False, root_id=None, parent_id=None,
shadow=None, chain=None, now=None, timezone=None,
origin=None, ignore_result=False, argsrepr=None, kwargsrepr=None):
origin=None, ignore_result=False, argsrepr=None, kwargsrepr=None, stamped_headers=None,
**options):

args = args or ()
kwargs = kwargs or {}
if not isinstance(args, (list, tuple)):
Expand Down Expand Up @@ -319,25 +321,30 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None,
if not root_id: # empty root_id defaults to task_id
root_id = task_id

stamps = {header: maybe_list(options[header]) for header in stamped_headers or []}
headers = {
'lang': 'py',
'task': name,
'id': task_id,
'shadow': shadow,
'eta': eta,
'expires': expires,
'group': group_id,
'group_index': group_index,
'retries': retries,
'timelimit': [time_limit, soft_time_limit],
'root_id': root_id,
'parent_id': parent_id,
'argsrepr': argsrepr,
'kwargsrepr': kwargsrepr,
'origin': origin or anon_nodename(),
'ignore_result': ignore_result,
'stamped_headers': stamped_headers,
'stamps': stamps,
}

return task_message(
headers={
'lang': 'py',
'task': name,
'id': task_id,
'shadow': shadow,
'eta': eta,
'expires': expires,
'group': group_id,
'group_index': group_index,
'retries': retries,
'timelimit': [time_limit, soft_time_limit],
'root_id': root_id,
'parent_id': parent_id,
'argsrepr': argsrepr,
'kwargsrepr': kwargsrepr,
'origin': origin or anon_nodename(),
'ignore_result': ignore_result,
},
headers=headers,
properties={
'correlation_id': task_id,
'reply_to': reply_to or '',
Expand Down
4 changes: 2 additions & 2 deletions celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
options.setdefault('priority',
parent.request.delivery_info.get('priority'))

# alias for 'task_as_v2'
message = amqp.create_task_message(
task_id, name, args, kwargs, countdown, eta, group_id, group_index,
expires, retries, chord,
Expand All @@ -774,8 +775,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
self.conf.task_send_sent_event,
root_id, parent_id, shadow, chain,
ignore_result=ignore_result,
argsrepr=options.get('argsrepr'),
kwargsrepr=options.get('kwargsrepr'),
**options
)

if connection:
Expand Down
18 changes: 16 additions & 2 deletions celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from celery import current_app, states
from celery._state import _task_stack
from celery.canvas import _chain, group, signature
from celery.canvas import GroupStampingVisitor, _chain, group, signature
from celery.exceptions import Ignore, ImproperlyConfigured, MaxRetriesExceededError, Reject, Retry
from celery.local import class_property
from celery.result import EagerResult, denied_join_result
Expand Down Expand Up @@ -93,6 +93,8 @@ class Context:
taskset = None # compat alias to group
timelimit = None
utc = None
stamped_headers = None
stamps = None

def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)
Expand Down Expand Up @@ -794,8 +796,14 @@ def apply(self, args=None, kwargs=None,
'exchange': options.get('exchange'),
'routing_key': options.get('routing_key'),
'priority': options.get('priority'),
},
}
}
if 'stamped_headers' in options:
request['stamped_headers'] = maybe_list(options['stamped_headers'])
request['stamps'] = {
header: maybe_list(options.get(header, [])) for header in request['stamped_headers']
}

tb = None
tracer = build_tracer(
task.name, task, eager=True,
Expand Down Expand Up @@ -942,6 +950,12 @@ def replace(self, sig):
# retain their original task IDs as well
for t in reversed(self.request.chain or []):
sig |= signature(t, app=self.app)
# Stamping sig with parents groups
stamped_headers = self.request.stamped_headers
if self.request.stamps:
groups = self.request.stamps.get("groups")
sig.stamp(visitor=GroupStampingVisitor(groups=groups, stamped_headers=stamped_headers))

# Finally, either apply or delay the new signature!
if self.request.is_eager:
return sig.apply().get()
Expand Down
7 changes: 5 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _call_task_errbacks(self, request, exc, traceback):
hasattr(errback.type, '__header__') and

# workaround to support tasks with bind=True executed as
# link errors. Otherwise retries can't be used
# link errors. Otherwise, retries can't be used
not isinstance(errback.type.__header__, partial) and
arity_greater(errback.type.__header__, 1)
):
Expand Down Expand Up @@ -488,8 +488,11 @@ def _get_result_meta(self, result,
'retries': getattr(request, 'retries', None),
'queue': request.delivery_info.get('routing_key')
if hasattr(request, 'delivery_info') and
request.delivery_info else None
request.delivery_info else None,
}
if getattr(request, 'stamps'):
request_meta['stamped_headers'] = request.stamped_headers
request_meta.update(request.stamps)

if encode:
# args and kwargs need to be encoded properly before saving
Expand Down
Loading

0 comments on commit 1c4ff33

Please sign in to comment.