Skip to content

Commit

Permalink
Merge branch '3.0'
Browse files Browse the repository at this point in the history
Conflicts:
	celery/backends/mongodb.py
	celery/bin/amqp.py
	celery/bin/multi.py
	celery/schedules.py
	celery/tests/utils.py
	celery/worker/__init__.py
	requirements/default.txt
	setup.cfg
  • Loading branch information
ask committed Dec 19, 2012
2 parents 535dd52 + 1a69d0e commit 19af443
Show file tree
Hide file tree
Showing 44 changed files with 263 additions and 175 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,4 @@ Alexey Zatelepin, 2012/09/18
Sundar Raman, 2012/09/24
Henri Colas, 2012/11/16
Thomas Grainger, 2012/11/29
Marius Gedminas, 2012/11/29
16 changes: 15 additions & 1 deletion Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
======
:release-date: 2012-11-30 XX:XX:XX X.X UTC

- Now depends on Kombu 2.5

- py-amqp has replaced amqplib as the default transport,
gaining support for AMQP 0.9, and the RabbitMQ extensions
including Consumer Cancel Notifications and heartbeats.

- support for multiple connection URLs for failover.

- Read more in the `Kombu 2.5 changelog`_.

.. _`Kombu 2.5 changelog`:
http://kombu.readthedocs.org/en/latest/changelog.html#version-2-5-0

- Now depends on billiard 2.7.3.19

- Fixed a deadlock issue that could occur when the producer pool
inherited the connection pool instance of the parent process.

Expand All @@ -43,7 +58,6 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
b12ead10-a622-4d44-86e9-3193a778f345,
26c7a420-11f3-4b33-8fac-66cd3b62abfd]>


- Chains can now chain other chains and use partial arguments (Issue #1057).

Example:
Expand Down
22 changes: 11 additions & 11 deletions celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,26 @@ def debug_import(name, locals=None, globals=None, fromlist=None,
# This is never executed, but tricks static analyzers (PyDev, PyCharm,
# pylint, etc.) into knowing the types of these symbols, and what
# they contain.
from celery.app.base import Celery # noqa
from celery.app.utils import bugreport # noqa
from celery.app.task import Task # noqa
from celery._state import current_app, current_task # noqa
from celery.canvas import ( # noqa
from celery.app.base import Celery # noqa
from celery.app.utils import bugreport # noqa
from celery.app.task import Task # noqa
from celery._state import current_app, current_task # noqa
from celery.canvas import ( # noqa
chain, chord, chunks, group, subtask, xmap, xstarmap,
)
from celery.utils import uuid # noqa
from celery.utils import uuid # noqa

# Lazy loading
from .five import recreate_module

old_module, new_module = recreate_module(__name__, # pragma: no cover
by_module={
'celery.app': ['Celery', 'bugreport', 'shared_task'],
'celery.app': ['Celery', 'bugreport', 'shared_task'],
'celery.app.task': ['Task'],
'celery._state': ['current_app', 'current_task'],
'celery.canvas': ['chain', 'chord', 'chunks', 'group',
'subtask', 'xmap', 'xstarmap'],
'celery.utils': ['uuid'],
'celery._state': ['current_app', 'current_task'],
'celery.canvas': ['chain', 'chord', 'chunks', 'group',
'subtask', 'xmap', 'xstarmap'],
'celery.utils': ['uuid'],
},
direct={'task': 'celery.task'},
__package__='celery', __file__=__file__,
Expand Down
8 changes: 6 additions & 2 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def __init__(self, channel=None, exchange=None, *args, **kwargs):
self.retry_policy or {})
exchange = exchange or self.exchange
self.queues = self.app.amqp.queues # shortcut
self.default_queue = self.app.amqp.default_queue
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)

def publish_task(self, task_name, task_args=None, task_kwargs=None,
Expand All @@ -185,15 +186,17 @@ def publish_task(self, task_name, task_args=None, task_kwargs=None,
"""Send task message."""
retry = self.retry if retry is None else retry

declare = declare or []
qname = queue
if queue is None and exchange is None:
queue = self.default_queue
if queue is not None:
if isinstance(queue, basestring):
qname, queue = queue, self.queues[queue]
else:
qname = queue.name
exchange = exchange or queue.exchange.name
routing_key = routing_key or queue.routing_key
declare = declare or ([queue] if queue else [])

# merge default and custom policy
retry = self.retry if retry is None else retry
Expand Down Expand Up @@ -378,7 +381,8 @@ def router(self):
@property
def producer_pool(self):
if self._producer_pool is None:
self._producer_pool = ProducerPool(self.app.pool,
self._producer_pool = ProducerPool(
self.app.pool,
limit=self.app.pool.limit,
Producer=self.TaskProducer,
)
Expand Down
3 changes: 2 additions & 1 deletion celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ def task(self, *args, **opts):
# the task instance from the current app.
# Really need a better solution for this :(
from . import shared_task as proxies_to_curapp
return proxies_to_curapp(*args, _force_evaluate=True, **opts)
opts['_force_evaluate'] = True # XXX Py2.5
return proxies_to_curapp(*args, **opts)

def inner_create_task_cls(shared=True, filter=None, **opts):

Expand Down
3 changes: 2 additions & 1 deletion celery/app/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def unlock_chord(group_id, callback, interval=None, propagate=False,
if result.ready():
subtask(callback).delay(j(propagate=propagate))
else:
return unlock_chord.retry(countdown=interval, max_retries=max_retries)
return unlock_chord.retry(countdown=interval,
max_retries=max_retries)
return unlock_chord


Expand Down
5 changes: 3 additions & 2 deletions celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def __init__(self, max_interval=None, app=None,
self.max_interval = max_interval
self.socket_timeout = socket_timeout
self.no_color = no_color
self.colored = app.log.colored(self.logfile,
self.colored = app.log.colored(
self.logfile,
enabled=not no_color if no_color is not None else no_color,
)
self.pidfile = pidfile
Expand Down Expand Up @@ -120,7 +121,7 @@ def startup_info(self, beat):
scheduler_info=scheduler.info,
hmax_interval=humanize_seconds(beat.max_interval),
max_interval=beat.max_interval,
)
)

def set_process_title(self):
arg_start = 'manage' in sys.argv[0] and 2 or 1
Expand Down
3 changes: 2 additions & 1 deletion celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def on_before_init(self, purge=False, no_color=None, **kwargs):
self.purge = purge
self.no_color = no_color
self._isatty = isatty(sys.stdout)
self.colored = self.app.log.colored(self.logfile,
self.colored = self.app.log.colored(
self.logfile,
enabled=not no_color if no_color is not None else no_color
)

Expand Down
8 changes: 5 additions & 3 deletions celery/backends/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,13 @@ def _forget(self, task_id):

def cleanup(self):
"""Delete expired metadata."""
self.collection.remove({
self.collection.remove(
{
'date_done': {
'$lt': self.app.now() - self.expires,
}
})
},
},
)

def __reduce__(self, args=(), kwargs={}):
kwargs.update(
Expand Down
8 changes: 5 additions & 3 deletions celery/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
redis = None # noqa
ConnectionError = None # noqa

REDIS_MISSING = """\
You need to install the redis library in order to use \
the Redis result store backend."""


class RedisBackend(KeyValueStoreBackend):
"""Redis task result store."""
Expand Down Expand Up @@ -52,9 +56,7 @@ def __init__(self, host=None, port=None, db=None, password=None,
super(RedisBackend, self).__init__(**kwargs)
conf = self.app.conf
if self.redis is None:
raise ImproperlyConfigured(
'You need to install the redis library in order to use '
+ 'the Redis result store backend.')
raise ImproperlyConfigured(REDIS_MISSING)

# For compatibility with the old REDIS_* configuration keys.
def _get(key):
Expand Down
17 changes: 9 additions & 8 deletions celery/bin/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,15 @@ def parse_options(self, prog_name, arguments, command=None):
return self.parser.parse_args(arguments)

def create_parser(self, prog_name, command=None):
return self.prepare_parser(self.Parser(prog=prog_name,
usage=self.usage(command),
version=self.version,
epilog=self.epilog,
formatter=HelpFormatter(),
description=self.description,
option_list=(self.preload_options +
self.get_options())))
return self.prepare_parser(self.Parser(
prog=prog_name,
usage=self.usage(command),
version=self.version,
epilog=self.epilog,
formatter=HelpFormatter(),
description=self.description,
option_list=(self.preload_options + self.get_options())),
)

def prepare_parser(self, parser):
docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
Expand Down
12 changes: 6 additions & 6 deletions celery/bin/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ def run(self, detach=False, logfile=None, pidfile=None, uid=None,
def get_options(self):
c = self.app.conf

return ((
Option('--detach', action='store_true'),
Option('-s', '--schedule',
return (
(Option('--detach', action='store_true'),
Option('-s', '--schedule',
default=c.CELERYBEAT_SCHEDULE_FILENAME),
Option('--max-interval', type='float'),
Option('-S', '--scheduler', dest='scheduler_cls'),
Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
Option('--max-interval', type='float'),
Option('-S', '--scheduler', dest='scheduler_cls'),
Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
+ daemon_options(default_pidfile='celerybeat.pid')
+ tuple(self.app.user_options['beat'])
)
Expand Down
14 changes: 7 additions & 7 deletions celery/bin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ def set_process_status(self, prog, info=''):
return set_process_title(prog, info=info)

def get_options(self):
return ((
Option('-d', '--dump', action='store_true'),
Option('-c', '--camera'),
Option('--detach', action='store_true'),
Option('-F', '--frequency', '--freq',
return (
(Option('-d', '--dump', action='store_true'),
Option('-c', '--camera'),
Option('--detach', action='store_true'),
Option('-F', '--frequency', '--freq',
type='float', default=1.0),
Option('-r', '--maxrate'),
Option('-l', '--loglevel', default='INFO'))
Option('-r', '--maxrate'),
Option('-l', '--loglevel', default='INFO'))
+ daemon_options(default_pidfile='celeryev.pid')
+ tuple(self.app.user_options['events'])
)
Expand Down
7 changes: 4 additions & 3 deletions celery/bin/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,9 @@ def note_waiting():
left = len(P)
if left:
pids = ', '.join(str(pid) for _, _, pid in P)
self.note(self.colored.blue('> Waiting for {0} {1} -> {2}...'.format(
left, pluralize(left, 'node'), pids)), newline=False)
self.note(self.colored.blue(
'> Waiting for {0} {1} -> {2}...'.format(
left, pluralize(left, 'node'), pids)), newline=False)

if retry:
note_waiting()
Expand Down Expand Up @@ -457,7 +458,7 @@ def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''):
name = nodename
else:
nodename = '%s%s' % (prefix, name)
this_name = options['-n'] = '%s@%s' % (nodename, this_suffix)
this_name = options['-n'] = '%s@%s' % (nodename, this_suffix)
expand = abbreviations({'%h': this_name,
'%n': name,
'%N': nodename,
Expand Down
4 changes: 2 additions & 2 deletions celery/bootsteps.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ def _finalize_steps(self, steps):
last = self._find_last()
self._firstpass(steps)
it = ((C, C.requires) for C in values(steps))
G = self.graph = DependencyGraph(it,
formatter=self.GraphFormatter(root=last),
G = self.graph = DependencyGraph(
it, formatter=self.GraphFormatter(root=last),
)
if last:
for obj in G:
Expand Down
10 changes: 6 additions & 4 deletions celery/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def attrs(self, d, scheme=None):
)

def head(self, **attrs):
return self.FMT(self._head, id=self.id, type=self.type,
return self.FMT(
self._head, id=self.id, type=self.type,
attrs=self.attrs(attrs, self.graph_scheme),
)

Expand Down Expand Up @@ -109,13 +110,14 @@ def FMT(self, fmt, *args, **kwargs):
))

def draw_edge(self, a, b, scheme=None, attrs=None):
return self.FMT(self._edge, self.label(a), self.label(b),
return self.FMT(
self._edge, self.label(a), self.label(b),
dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
)

def draw_node(self, obj, scheme=None, attrs=None):
return self.FMT(self._node, self.label(obj),
attrs=self.attrs(attrs, scheme),
return self.FMT(
self._node, self.label(obj), attrs=self.attrs(attrs, scheme),
)


Expand Down
7 changes: 4 additions & 3 deletions celery/events/cursesmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ def alert_callback(mx, my, xs):
y = count(xs)
task = self.state.tasks[self.selected_task]
info = task.info(extra=['state'])
infoitems = [('args', info.pop('args', None)),
('kwargs', info.pop('kwargs', None))
] + list(info.items())
infoitems = [
('args', info.pop('args', None)),
('kwargs', info.pop('kwargs', None))
] + list(info.items())
for key, value in infoitems:
if key is None:
continue
Expand Down
16 changes: 8 additions & 8 deletions celery/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@

BUILTIN_MODULES = frozenset()

ERROR_ENVVAR_NOT_SET = (
"""The environment variable {0!r} is not set,
ERROR_ENVVAR_NOT_SET = """\
The environment variable {0!r} is not set,
and as such the configuration could not be loaded.
Please set this variable and make it point to
a configuration module.""")
a configuration module."""

_RACE_PROTECTION = False
CONFIG_INVALID_NAME = """
CONFIG_INVALID_NAME = """\
Error: Module '{module}' doesn't exist, or it's not a valid \
Python module name.
"""

CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """
CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
Did you mean '{suggest}'?
"""

Expand Down Expand Up @@ -249,9 +249,9 @@ def read_configuration(self):
return {}

def autodiscover_tasks(self, packages, related_name='tasks'):
self.task_modules.update(mod.__name__
for mod in autodiscover_tasks(packages, related_name) if mod
)
self.task_modules.update(
mod.__name__ for mod in autodiscover_tasks(packages,
related_name) if mod)

@property
def conf(self):
Expand Down
Loading

0 comments on commit 19af443

Please sign in to comment.