Skip to content

Commit

Permalink
Support chords with CELERY_TASK_ALWAYS_EAGER (fix celery#4873) (celer…
Browse files Browse the repository at this point in the history
  • Loading branch information
noirbizarre authored and auvipy committed Aug 15, 2018
1 parent 0e86862 commit 62dc3f0
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 3 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,4 @@ Chris Mitchell, 2018/02/27
Josue Balandrano Coronel, 2018/05/24
Federico Bond, 2018/06/20
Tom Booth, 2018/07/06
Axel haustant, 2018/08/14
5 changes: 3 additions & 2 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,9 @@ def apply_async(self, args=(), kwargs={}, task_id=None,
tasks = (self.tasks.clone() if isinstance(self.tasks, group)
else group(self.tasks, app=app))
if app.conf.task_always_eager:
return self.apply(args, kwargs,
body=body, task_id=task_id, **options)
with allow_join_result():
return self.apply(args, kwargs,
body=body, task_id=task_id, **options)
# chord([A, B, ...], C)
return self.run(tasks, body, args, task_id=task_id, **options)

Expand Down
7 changes: 6 additions & 1 deletion t/integration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from time import sleep

from celery import chain, group, shared_task
from celery import chain, chord, group, shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -42,6 +42,11 @@ def chain_add(x, y):
).apply_async()


@shared_task
def chord_add(x, y):
chord(add.s(x, x), add.s(y)).apply_async()


@shared_task
def delayed_sum(numbers, pause_time=1):
"""Sum the iterable of numbers."""
Expand Down
11 changes: 11 additions & 0 deletions t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,17 @@ def test_add_chord_to_chord(self, manager):
res = c()
assert res.get() == [0, 5 + 6 + 7]

@flaky
def test_eager_chord_inside_task(self, manager):
from .tasks import chord_add

prev = chord_add.app.conf.task_always_eager
chord_add.app.conf.task_always_eager = True

chord_add.apply_async(args=(4, 8), throw=True).get()

chord_add.app.conf.task_always_eager = prev

@flaky
def test_group_chain(self, manager):
if not manager.app.conf.result_backend.startswith('redis'):
Expand Down
23 changes: 23 additions & 0 deletions t/unit/tasks/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,29 @@ def test_freeze_tasks_is_not_group(self):
x.tasks = [self.add.s(2, 2)]
x.freeze()

def test_chain_always_eager(self):
self.app.conf.task_always_eager = True
from celery import _state
from celery import result

fixture_task_join_will_block = _state.task_join_will_block
try:
_state.task_join_will_block = _state.orig_task_join_will_block
result.task_join_will_block = _state.orig_task_join_will_block

@self.app.task(shared=False)
def finalize(*args):
pass

@self.app.task(shared=False)
def chord_add():
return chord([self.add.s(4, 4)], finalize.s()).apply_async()

chord_add.apply_async(throw=True).get()
finally:
_state.task_join_will_block = fixture_task_join_will_block
result.task_join_will_block = fixture_task_join_will_block


class test_maybe_signature(CanvasCase):

Expand Down

0 comments on commit 62dc3f0

Please sign in to comment.