Skip to content

Commit

Permalink
AMQP backend must specify accept content
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Sep 9, 2013
1 parent e50f52e commit 8bbfa19
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions celery/backends/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AMQPBackend(BaseBackend):

def __init__(self, app, connection=None, exchange=None, exchange_type=None,
persistent=None, serializer=None, auto_delete=True,
**kwargs):
accept=None, **kwargs):
super(AMQPBackend, self).__init__(app, **kwargs)
conf = self.app.conf
self._connection = connection
Expand All @@ -74,6 +74,7 @@ def __init__(self, app, connection=None, exchange=None, exchange_type=None,
self.exchange = self._create_exchange(exchange, exchange_type,
self.persistent)
self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
self.accept = conf.CELERY_ACCEPT_CONTENT if accept is None else accept
self.auto_delete = auto_delete

self.expires = None
Expand Down Expand Up @@ -201,7 +202,8 @@ def consume(self, task_id, timeout=None):
wait = self.drain_events
with self.app.pool.acquire_channel(block=True) as (conn, channel):
binding = self._create_binding(task_id)
with self.Consumer(channel, binding, no_ack=True) as consumer:
with self.Consumer(channel, binding,
no_ack=True, accept=self.accept) as consumer:
while 1:
try:
return wait(conn, consumer, timeout)[task_id]
Expand Down Expand Up @@ -240,8 +242,8 @@ def on_message(message):
if uid in task_ids else push_cache(uid, body)

bindings = self._many_bindings(task_ids)
with self.Consumer(channel, bindings,
on_message=on_message, no_ack=True):
with self.Consumer(channel, bindings, on_message=on_message,
accept=self.accept, no_ack=True):
wait = conn.drain_events
popleft = results.popleft
while ids:
Expand Down

0 comments on commit 8bbfa19

Please sign in to comment.