Skip to content

Commit

Permalink
Fixes celery amqp when using pyamqp://. Closes celery#2013
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed May 7, 2015
1 parent 5222f8b commit 4fc01fa
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions celery/bin/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@ class AMQShell(cmd.Cmd):
'basic.ack': Spec(('delivery_tag', int)),
}

def _prepare_spec(self, conn):
# XXX Hack to fix Issue #2013
from amqp import Connection, Message
if isinstance(conn.connection, Connection):
self.amqp['basic.publish'] = Spec(('msg', Message),
('exchange', str),
('routing_key', str),
('mandatory', bool, 'no'),
('immediate', bool, 'no'))

def __init__(self, *args, **kwargs):
self.connect = kwargs.pop('connect')
self.silent = kwargs.pop('silent', False)
Expand Down Expand Up @@ -298,6 +308,7 @@ def respond(self, retval):
def _reconnect(self):
"""Re-establish connection to the AMQP server."""
self.conn = self.connect(self.conn)
self._prepare_spec(self.conn)
self.chan = self.conn.default_channel
self.needs_reconnect = False

Expand Down

0 comments on commit 4fc01fa

Please sign in to comment.