Skip to content

Commit

Permalink
Tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Nov 30, 2012
1 parent 9bbb5c1 commit 5338f86
Show file tree
Hide file tree
Showing 14 changed files with 64 additions and 62 deletions.
51 changes: 26 additions & 25 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,17 @@ def publish_task(self, task_name, task_args=None, task_kwargs=None,
expires=None, exchange=None, exchange_type=None,
event_dispatcher=None, retry=None, retry_policy=None,
queue=None, now=None, retries=0, chord=None, callbacks=None,
errbacks=None, mandatory=None, priority=None, immediate=None,
routing_key=None, serializer=None, delivery_mode=None,
compression=None, declare=None, **kwargs):
errbacks=None, routing_key=None, serializer=None,
delivery_mode=None, compression=None, declare=None, **kwargs):
"""Send task message."""

declare = declare or []
qname = queue
if queue is not None:
if isinstance(queue, basestring):
queue = self.queues[queue]
qname, queue = queue, self.queues[queue]
else:
qname = queue.name
exchange = exchange or queue.exchange.name
routing_key = routing_key or queue.routing_key

Expand All @@ -212,28 +214,27 @@ def publish_task(self, task_name, task_args=None, task_kwargs=None,
eta = eta and eta.isoformat()
expires = expires and expires.isoformat()

body = {'task': task_name,
'id': task_id,
'args': task_args,
'kwargs': task_kwargs,
'retries': retries or 0,
'eta': eta,
'expires': expires,
'utc': self.utc,
'callbacks': callbacks,
'errbacks': errbacks}
group_id = group_id or taskset_id
if group_id:
body['taskset'] = group_id
if chord:
body['chord'] = chord

self.publish(body, exchange=exchange, mandatory=mandatory,
immediate=immediate, routing_key=routing_key,
body = {
'task': task_name,
'id': task_id,
'args': task_args,
'kwargs': task_kwargs,
'retries': retries or 0,
'eta': eta,
'expires': expires,
'utc': self.utc,
'callbacks': callbacks,
'errbacks': errbacks,
'taskset': group_id or taskset_id,
'chord': chord,
}

self.publish(body,
exchange=exchange, routing_key=routing_key,
serializer=serializer or self.serializer,
compression=compression or self.compression,
retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
priority=priority, declare=declare,
retry=retry, retry_policy=_rp,
delivery_mode=delivery_mode, declare=declare,
**kwargs)

signals.task_sent.send(sender=task_name, **body)
Expand All @@ -248,7 +249,7 @@ def publish_task(self, task_name, task_args=None, task_kwargs=None,
retries=retries,
eta=eta,
expires=expires,
queue=queue,
queue=qname,
exchange=exname,
routing_key=routing_key)
return task_id
Expand Down
2 changes: 1 addition & 1 deletion celery/app/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def unlock_chord(group_id, callback, interval=None, propagate=False,
if result.ready():
subtask(callback).delay(j(propagate=propagate))
else:
raise unlock_chord.retry(countdown=interval, max_retries=max_retries)
return unlock_chord.retry(countdown=interval, max_retries=max_retries)
return unlock_chord


Expand Down
7 changes: 5 additions & 2 deletions celery/bin/camqadm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

from itertools import count

from amqplib import client_0_8 as amqp
try:
import amqp
except ImportError:
from amqplib import client_0_8 as amqp

from celery.app import app_or_default
from celery.utils.functional import padlist
Expand Down Expand Up @@ -216,7 +219,7 @@ def get_amqp_api_command(self, cmd, arglist):
>>> get_amqp_api_command('queue.delete', ['pobox', 'yes', 'no'])
(<bound method Channel.queue_delete of
<amqplib.client_0_8.channel.Channel object at 0x...>>,
<amqp.channel.Channel object at 0x...>>,
('testfoo', True, False))
"""
Expand Down
4 changes: 2 additions & 2 deletions celery/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class Task(BaseTask):
exchange = None
exchange_type = None
delivery_mode = None
mandatory = False
immediate = False
mandatory = False # XXX deprecated
immediate = False # XXX deprecated
priority = None
type = 'regular'
error_whitelist = ()
Expand Down
6 changes: 3 additions & 3 deletions celery/tests/app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,13 @@ def test_amqp_get_broker_info(self):
'userid': 'guest',
'password': 'guest',
'virtual_host': '/'},
self.app.connection('amqplib://').info())
self.app.connection('amqp://').info())
self.app.conf.BROKER_PORT = 1978
self.app.conf.BROKER_VHOST = 'foo'
self.assertDictContainsSubset({'port': 1978,
'virtual_host': 'foo'},
self.app.connection('amqplib://:1978/foo').info())
conn = self.app.connection('amqplib:////value')
self.app.connection('amqp://:1978/foo').info())
conn = self.app.connection('amqp:////value')
self.assertDictContainsSubset({'virtual_host': '/value'},
conn.info())

Expand Down
14 changes: 12 additions & 2 deletions celery/tests/backends/test_amqp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import
from __future__ import with_statement

import pickle
import socket

from datetime import timedelta
Expand All @@ -14,7 +15,7 @@
from celery.exceptions import TimeoutError
from celery.utils import uuid

from celery.tests.utils import AppCase, sleepdeprived
from celery.tests.utils import AppCase, sleepdeprived, Mock


class SomeClass(object):
Expand Down Expand Up @@ -140,11 +141,14 @@ class Message(object):
def __init__(self, **merge):
self.payload = dict({'status': states.STARTED,
'result': None}, **merge)
self.body = pickle.dumps(self.payload)
self.content_type = 'application/x-python-serialize'
self.content_encoding = 'binary'

class MockBinding(object):

def __init__(self, *args, **kwargs):
pass
self.channel = Mock()

def __call__(self, *args, **kwargs):
return self
Expand All @@ -158,10 +162,14 @@ def get(self, no_ack=False):
except Empty:
pass

def is_bound(self):
return True

class MockBackend(AMQPBackend):
Queue = MockBinding

backend = MockBackend()
backend._republish = Mock()

# FFWD's to the latest state.
results.put(Message(status=states.RECEIVED, seq=1))
Expand All @@ -178,6 +186,8 @@ class MockBackend(AMQPBackend):
backend.get_task_meta(tid)
self.assertIn(tid, backend._cache, 'Caches last known state')

self.assertTrue(backend._republish.called)

# Returns cache if no new states.
results.queue.clear()
assert not results.qsize()
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/functional/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class WorkerCase(Case):

@classmethod
def setUpClass(cls):
logging.getLogger('amqplib').setLevel(logging.ERROR)
logging.getLogger('amqp').setLevel(logging.ERROR)
cls.worker = Worker.managed(cls.hostname, caller=cls)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions celery/worker/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
* When a task with an ETA is received the QoS prefetch count is also
incremented, so another message can be reserved. When the ETA is met
the prefetch count is decremented again, though this cannot happen
immediately because amqplib doesn't support doing broker requests
across threads. Instead the current prefetch count is kept as a
immediately because most broker clients don't support doing broker
requests across threads. Instead the current prefetch count is kept as a
shared counter, so as soon as :meth:`~Consumer.consume_messages`
detects that the value has changed it will send out the actual
QoS event to the broker.
Expand Down
7 changes: 4 additions & 3 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,10 @@ Only the scheme part (``transport://``) is required, the rest
is optional, and defaults to the specific transports default values.

The transport part is the broker implementation to use, and the
default is ``amqp``, but there are many other choices including
``librabbitmq``, ``amqplib``, ``redis``, ``beanstalk``,
``sqlalchemy``, ``django``, ``mongodb``, ``couchdb`` and ``pika``.
default is ``amqp``, which uses ``librabbitmq`` by default or falls back to
``pyamqp`` if that is not installed. Also there are many other choices including
``redis``, ``beanstalk``, ``sqlalchemy``, ``django``, ``mongodb``,
``couchdb``.
It can also be a fully qualified path to your own transport implementation.

See the Kombu documentation for more information about broker URLs.
Expand Down
6 changes: 3 additions & 3 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ kombu

Kombu depends on the following packages:

- `amqplib`_
- `amqp`_

The underlying pure-Python amqp client implementation. AMQP being the default
broker it is a natural dependency.
broker this is a natural dependency.

.. _`amqplib`: http://pypi.python.org/pypi/amqplib
.. _`amqp`: http://pypi.python.org/pypi/amqp

- `anyjson`_

Expand Down
14 changes: 0 additions & 14 deletions docs/userguide/calling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -491,20 +491,6 @@ AMQP's full routing capabilities. Interested parties may read the
Routing key used to determine.
- mandatory
This sets the delivery to be mandatory. An exception will be raised
if there are no running workers able to take on the task.
Not supported by :mod:`amqplib`.
- immediate
Request immediate delivery. Will raise an exception
if the task cannot be routed to a worker immediately.
Not supported by :mod:`amqplib`.
- priority
A number between `0` and `9`, where `0` is the highest priority.
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/optimizing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ If you're using RabbitMQ (AMQP) as the broker then you can install the
The 'amqp' transport will automatically use the librabbitmq module if it's
installed, or you can also specify the transport you want directly by using
the ``amqplib://`` or ``librabbitmq://`` prefixes.
the ``pyamqp://`` or ``librabbitmq://`` prefixes.

.. _optimizing-connection-pools:

Expand Down
5 changes: 3 additions & 2 deletions docs/whatsnew-3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Highlights

Celery will automatically use the :mod:`librabbitmq` module
if installed, which is a very fast and memory-optimized
replacement for the amqplib module.
replacement for the py-amqp module.

- Redis support is more reliable with improved ack emulation.

Expand Down Expand Up @@ -112,7 +112,8 @@ or Redis as a broker, resulting in:
- Sub-millisecond timer precision.
- Faster shutdown times.

The transports supported are: ``amqplib``, ``librabbitmq``, and ``redis``
The transports supported are: ``py-amqp`` ``librabbitmq``, ``redis``,
and ``amqplib``.
Hopefully this can be extended to include additional broker transports
in the future.

Expand Down
2 changes: 1 addition & 1 deletion funtests/benchmarks/bench_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq')
if hasattr(sys, 'pypy_version_info'):
BROKER_TRANSPORT = 'amqplib'
BROKER_TRANSPORT = 'pyamqp'

celery = Celery(__name__)
celery.conf.update(BROKER_TRANSPORT=BROKER_TRANSPORT,
Expand Down

0 comments on commit 5338f86

Please sign in to comment.