diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 5c6bf1c815e..a351b58630e 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -262,3 +262,4 @@ Chris Mitchell, 2018/02/27 Josue Balandrano Coronel, 2018/05/24 Federico Bond, 2018/06/20 Tom Booth, 2018/07/06 +Axel haustant, 2018/08/14 diff --git a/celery/canvas.py b/celery/canvas.py index d5b2b755eb1..c85e8d7c045 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -1226,8 +1226,9 @@ def apply_async(self, args=(), kwargs={}, task_id=None, tasks = (self.tasks.clone() if isinstance(self.tasks, group) else group(self.tasks, app=app)) if app.conf.task_always_eager: - return self.apply(args, kwargs, - body=body, task_id=task_id, **options) + with allow_join_result(): + return self.apply(args, kwargs, + body=body, task_id=task_id, **options) # chord([A, B, ...], C) return self.run(tasks, body, args, task_id=task_id, **options) diff --git a/t/integration/tasks.py b/t/integration/tasks.py index f857cadb958..0e3d13cbb40 100644 --- a/t/integration/tasks.py +++ b/t/integration/tasks.py @@ -3,7 +3,7 @@ from time import sleep -from celery import chain, group, shared_task +from celery import chain, chord, group, shared_task from celery.exceptions import SoftTimeLimitExceeded from celery.utils.log import get_task_logger @@ -42,6 +42,11 @@ def chain_add(x, y): ).apply_async() +@shared_task +def chord_add(x, y): + chord(add.s(x, x), add.s(y)).apply_async() + + @shared_task def delayed_sum(numbers, pause_time=1): """Sum the iterable of numbers.""" diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index 344425bb6fb..32eb6f329b8 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -366,6 +366,17 @@ def test_add_chord_to_chord(self, manager): res = c() assert res.get() == [0, 5 + 6 + 7] + @flaky + def test_eager_chord_inside_task(self, manager): + from .tasks import chord_add + + prev = chord_add.app.conf.task_always_eager + chord_add.app.conf.task_always_eager = True + + chord_add.apply_async(args=(4, 8), throw=True).get() + + chord_add.app.conf.task_always_eager = prev + @flaky def test_group_chain(self, manager): if not manager.app.conf.result_backend.startswith('redis'): diff --git a/t/unit/tasks/test_canvas.py b/t/unit/tasks/test_canvas.py index 3dbe6fb0ab3..5d9a25816f2 100644 --- a/t/unit/tasks/test_canvas.py +++ b/t/unit/tasks/test_canvas.py @@ -747,6 +747,29 @@ def test_freeze_tasks_is_not_group(self): x.tasks = [self.add.s(2, 2)] x.freeze() + def test_chain_always_eager(self): + self.app.conf.task_always_eager = True + from celery import _state + from celery import result + + fixture_task_join_will_block = _state.task_join_will_block + try: + _state.task_join_will_block = _state.orig_task_join_will_block + result.task_join_will_block = _state.orig_task_join_will_block + + @self.app.task(shared=False) + def finalize(*args): + pass + + @self.app.task(shared=False) + def chord_add(): + return chord([self.add.s(4, 4)], finalize.s()).apply_async() + + chord_add.apply_async(throw=True).get() + finally: + _state.task_join_will_block = fixture_task_join_will_block + result.task_join_will_block = fixture_task_join_will_block + class test_maybe_signature(CanvasCase):