Skip to content

Commit

Permalink
Run chord_unlock on same queue as chord body - fixes celery#4337 (cel…
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexHill authored and Omer Katz committed Dec 14, 2017
1 parent 3af6a63 commit fde58ad
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
5 changes: 4 additions & 1 deletion celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,11 @@ def on_chord_part_return(self, request, state, result, **kwargs):
def fallback_chord_unlock(self, header_result, body, countdown=1,
**kwargs):
kwargs['result'] = [r.as_tuple() for r in header_result]
queue = body.options.get('queue', getattr(body.type, 'queue', None))
self.app.tasks['celery.chord_unlock'].apply_async(
(header_result.id, body,), kwargs, countdown=countdown,
(header_result.id, body,), kwargs,
countdown=countdown,
queue=queue,
)

def ensure_chords_allowed(self):
Expand Down
32 changes: 31 additions & 1 deletion t/unit/backends/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ class test_BaseBackend_interface:
def setup(self):
self.b = BaseBackend(self.app)

@self.app.task(shared=False)
def callback(result):
pass

self.callback = callback

def test__forget(self):
with pytest.raises(NotImplementedError):
self.b._forget('SOMExx-N0Nex1stant-IDxx-')
Expand All @@ -80,9 +86,33 @@ def test_apply_chord(self, unlock='celery.chord_unlock'):
uuid(),
[self.app.AsyncResult(x) for x in range(3)],
)
self.b.apply_chord(header_result, None)
self.b.apply_chord(header_result, self.callback.s())
assert self.app.tasks[unlock].apply_async.call_count

def test_chord_unlock_queue(self, unlock='celery.chord_unlock'):
self.app.tasks[unlock] = Mock()
header_result = self.app.GroupResult(
uuid(),
[self.app.AsyncResult(x) for x in range(3)],
)
body = self.callback.s()

self.b.apply_chord(header_result, body)
called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
assert called_kwargs['queue'] is None

self.b.apply_chord(header_result, body.set(queue='test_queue'))
called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
assert called_kwargs['queue'] == 'test_queue'

@self.app.task(shared=False, queue='test_queue_two')
def callback_queue(result):
pass

self.b.apply_chord(header_result, callback_queue.s())
called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
assert called_kwargs['queue'] == 'test_queue_two'


class test_exception_pickle:

Expand Down

0 comments on commit fde58ad

Please sign in to comment.