Skip to content

Commit

Permalink
Merge pull request celery#2226 from aneilbaboo/master
Browse files Browse the repository at this point in the history
Fix issue celery#2225
  • Loading branch information
Omer Katz committed Jul 14, 2015
2 parents 893aa2c + 939978e commit 9b2c3bf
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
1 change: 1 addition & 0 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ def as_task_v1(self, task_id, name, args=None, kwargs=None,
'id': task_id,
'args': args,
'kwargs': kwargs,
'group': group_id,
'retries': retries,
'eta': eta,
'expires': expires,
Expand Down
10 changes: 7 additions & 3 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ def fallback_chord_unlock(self, group_id, body, result=None,

def apply_chord(self, header, partial_args, group_id, body,
options={}, **kwargs):
options['task_id'] = group_id
result = header(*partial_args, **options or {})
fixed_options = dict((k,v) for k,v in options.items() if k!='task_id')
result = header(*partial_args, task_id=group_id, **fixed_options or {})
self.fallback_chord_unlock(group_id, body, **kwargs)
return result

Expand Down Expand Up @@ -534,7 +534,11 @@ def _restore_group(self, group_id):
def _apply_chord_incr(self, header, partial_args, group_id, body,
result=None, options={}, **kwargs):
self.save_group(group_id, self.app.GroupResult(group_id, result))
return header(*partial_args, task_id=group_id, **options or {})

fixed_options = dict((k,v) for k,v in options.items() if k != 'task_id')

return header(*partial_args, task_id=group_id, **fixed_options or {})


def on_chord_part_return(self, task, state, result, propagate=None):
if not self.implements_incr:
Expand Down
8 changes: 4 additions & 4 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ def prepare_steps(self, args, tasks,
if link_error:
task.set(link_error=link_error)

if not isinstance(prev_task, chord):
results.append(res)
tasks.append(task)
tasks.append(task)
results.append(res)

prev_task, prev_res = task, res

return tasks, results
Expand Down Expand Up @@ -603,7 +603,7 @@ def _maybe_group(tasks):
elif isinstance(tasks, Signature):
tasks = [tasks]
else:
tasks = regen(tasks)
tasks = map(signature, regen(tasks))
return tasks


Expand Down

0 comments on commit 9b2c3bf

Please sign in to comment.