Skip to content

Commit

Permalink
amqp backend: polling now uses message.requeue instead of republishing
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jan 28, 2013
1 parent c9f238b commit 247d8e7
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions celery/backends/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,21 @@ def get_task_meta(self, task_id, backlog_limit=1000):
with self.app.pool.acquire_channel(block=True) as (_, channel):
binding = self._create_binding(task_id)(channel)
binding.declare()
latest, acc = None, None
for i in xrange(backlog_limit):
latest, acc = acc, binding.get(no_ack=True)
prev = latest = acc = None
for i in xrange(backlog_limit): ## spool ffwd
prev, latest, acc = latest, acc, binding.get(no_ack=False)
if not acc: # no more messages
break
if prev:
# backends are not expected to keep history,
# so we delete everything except the most recent state.
prev.ack()
else:
raise self.BacklogLimitExceeded(task_id)

if latest:
# new state to report
self._republish(channel, task_id, latest.body,
latest.content_type, latest.content_encoding)
payload = self._cache[task_id] = latest.payload
latest.requeue()
return payload
else:
# no new state, use previous
Expand Down

0 comments on commit 247d8e7

Please sign in to comment.