Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
treeder committed Feb 13, 2013
2 parents 83068f5 + 58fc407 commit b6baedb
Show file tree
Hide file tree
Showing 54 changed files with 602 additions and 307 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,5 @@ Henri Colas, 2012/11/16
Thomas Grainger, 2012/11/29
Marius Gedminas, 2012/11/29
Christoph Krybus, 2013/01/07
Jun Sakai, 2013/01/16
Vlad Frolov, 2013/01/23
45 changes: 31 additions & 14 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from kombu.utils.encoding import safe_repr

from celery import signals
from celery.five import items
from celery.five import items, string_t
from celery.utils.text import indent as textindent

from . import app_or_default
Expand Down Expand Up @@ -163,11 +163,16 @@ class TaskProducer(Producer):
auto_declare = False
retry = False
retry_policy = None
utc = True
event_dispatcher = None
send_sent_event = False

def __init__(self, channel=None, exchange=None, *args, **kwargs):
self.retry = kwargs.pop('retry', self.retry)
self.retry_policy = kwargs.pop('retry_policy',
self.retry_policy or {})
self.send_sent_event = kwargs.pop('send_sent_event',
self.send_sent_event)
exchange = exchange or self.exchange
self.queues = self.app.amqp.queues # shortcut
self.default_queue = self.app.amqp.default_queue
Expand All @@ -190,7 +195,7 @@ def publish_task(self, task_name, task_args=None, task_kwargs=None,
if queue is None and exchange is None:
queue = self.default_queue
if queue is not None:
if isinstance(queue, basestring):
if isinstance(queue, string_t):
qname, queue = queue, self.queues[queue]
else:
qname = queue.name
Expand Down Expand Up @@ -246,25 +251,36 @@ def publish_task(self, task_name, task_args=None, task_kwargs=None,
)

signals.task_sent.send(sender=task_name, **body)
if event_dispatcher:
if self.send_sent_event:
evd = event_dispatcher or self.event_dispatcher
exname = exchange or self.exchange
if isinstance(exname, Exchange):
exname = exname.name
event_dispatcher.send(
'task-sent', uuid=task_id,
name=task_name,
args=safe_repr(task_args),
kwargs=safe_repr(task_kwargs),
retries=retries,
eta=eta,
expires=expires,
queue=qname,
exchange=exname,
routing_key=routing_key,
evd.publish(
'task-sent',
{
'uuid': task_id,
'name': task_name,
'args': safe_repr(task_args),
'kwargs': safe_repr(task_kwargs),
'retries': retries,
'eta': eta,
'expires': expires,
'queue': qname,
'exchange': exname,
'routing_key': routing_key,
},
self, retry=retry, retry_policy=retry_policy,
)
return task_id
delay_task = publish_task # XXX Compat

@cached_property
def event_dispatcher(self):
# We call Dispatcher.publish with a custom producer
# so don't need the dispatcher to be "enabled".
return self.app.events.Dispatcher(enabled=False)


class TaskPublisher(TaskProducer):
"""Deprecated version of :class:`TaskProducer`."""
Expand Down Expand Up @@ -358,6 +374,7 @@ def TaskProducer(self):
compression=conf.CELERY_MESSAGE_COMPRESSION,
retry=conf.CELERY_TASK_PUBLISH_RETRY,
retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
utc=conf.CELERY_ENABLE_UTC,
)
TaskPublisher = TaskProducer # compat
Expand Down
5 changes: 2 additions & 3 deletions celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from functools import wraps
from operator import attrgetter

from billiard import forking as _forking
from billiard.util import register_after_fork
from kombu.clocks import LamportClock
from kombu.utils import cached_property
Expand Down Expand Up @@ -78,8 +77,8 @@ class Celery(object):
def __init__(self, main=None, loader=None, backend=None,
amqp=None, events=None, log=None, control=None,
set_as_current=True, accept_magic_kwargs=False,
tasks=None, broker=None, include=None, fixups=None,
changes=None, **kwargs):
tasks=None, broker=None, include=None, changes=None,
fixups=None, **kwargs):
self.clock = LamportClock()
self.main = main
self.amqp_cls = amqp or self.amqp_cls
Expand Down
34 changes: 26 additions & 8 deletions celery/app/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,36 @@ def add_unlock_chord_task(app):
"""
from celery.canvas import subtask
from celery import result as _res
from celery.exceptions import ChordError
from celery.result import from_serializable

@app.task(name='celery.chord_unlock', max_retries=None,
default_retry_delay=1, ignore_result=True, _force_evaluate=True)
def unlock_chord(group_id, callback, interval=None, propagate=False,
max_retries=None, result=None, Result=_res.AsyncResult):
def unlock_chord(group_id, callback, interval=None, propagate=True,
max_retries=None, result=None,
Result=app.AsyncResult, GroupResult=app.GroupResult,
from_serializable=from_serializable):
if interval is None:
interval = unlock_chord.default_retry_delay
result = _res.GroupResult(group_id, [Result(r) for r in result])
j = result.join_native if result.supports_native_join else result.join
if result.ready():
subtask(callback).delay(j(propagate=propagate))
deps = GroupResult(
group_id,
[from_serializable(r, Result=Result) for r in result],
)
j = deps.join_native if deps.supports_native_join else deps.join

if deps.ready():
callback = subtask(callback)
try:
ret = j(propagate=propagate)
except Exception as exc:
culprit = next(deps._failed_join_report())

app._tasks[callback.task].backend.fail_from_current_stack(
callback.id, exc=ChordError('Dependency %s raised %r' % (
culprit.id, exc)),
)
else:
callback.delay(ret)
else:
return unlock_chord.retry(countdown=interval,
max_retries=max_retries)
Expand Down Expand Up @@ -281,7 +299,7 @@ class Chord(app.Task):
ignore_result = False

def run(self, header, body, partial_args=(), interval=1, countdown=1,
max_retries=None, propagate=False, eager=False, **kwargs):
max_retries=None, propagate=True, eager=False, **kwargs):
group_id = uuid()
AsyncResult = self.app.AsyncResult
prepare_member = self._prepare_member
Expand Down
5 changes: 3 additions & 2 deletions celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __repr__(self):
'CONNECTION_RETRY': Option(True, type='bool'),
'CONNECTION_MAX_RETRIES': Option(100, type='int'),
'HEARTBEAT': Option(10, type='int'),
'HEARTBEAT_CHECKRATE': Option(2.0, type='int'),
'POOL_LIMIT': Option(10, type='int'),
'USE_SSL': Option(False, type='bool'),
'TRANSPORT': Option(type='string'),
Expand Down Expand Up @@ -160,7 +161,7 @@ def __repr__(self):
'CONCURRENCY': Option(0, type='int'),
'TIMER': Option(type='string'),
'TIMER_PRECISION': Option(1.0, type='float'),
'FORCE_EXECV': Option(True, type='bool'),
'FORCE_EXECV': Option(False, type='bool'),
'HIJACK_ROOT_LOGGER': Option(True, type='bool'),
'CONSUMER': Option('celery.worker.consumer:Consumer', type='string'),
'LOG_FORMAT': Option(DEFAULT_PROCESS_LOG_FMT),
Expand Down Expand Up @@ -231,7 +232,7 @@ def find_deprecated_settings(source):
warn_deprecated(description='The {0!r} setting'.format(name),
deprecation=opt.deprecate_by,
removal=opt.remove_by,
alternative=opt.alt)
alternative='Use %s instead' % (opt.alt, ))
return source


Expand Down
40 changes: 29 additions & 11 deletions celery/app/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

PY3 = sys.version_info[0] == 3

MP_LOG = os.environ.get('MP_LOG', False)


class TaskFormatter(ColorFormatter):

Expand Down Expand Up @@ -99,23 +101,31 @@ def setup_logging_subsystem(self, loglevel=None, logfile=None,
sender=None, loglevel=loglevel, logfile=logfile,
format=format, colorize=colorize,
)

if not receivers:
root = logging.getLogger()

if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
root.handlers = []

for logger in root, get_multiprocessing_logger():
if logger is not None:
self.setup_handlers(logger, logfile, format,
colorize, **kwargs)
if loglevel:
logger.setLevel(loglevel)
signals.after_setup_logger.send(
sender=None, logger=logger,
loglevel=loglevel, logfile=logfile,
format=format, colorize=colorize,
)
# Configure root logger
self._configure_logger(
root, logfile, loglevel, format, colorize, **kwargs
)

# Configure the multiprocessing logger
self._configure_logger(
get_multiprocessing_logger(),
logfile, loglevel if MP_LOG else logging.ERROR,
format, colorize, **kwargs
)

signals.after_setup_logger.send(
sender=None, logger=root,
loglevel=loglevel, logfile=logfile,
format=format, colorize=colorize,
)

# then setup the root task logger.
self.setup_task_loggers(loglevel, logfile, colorize=colorize)

Expand All @@ -127,6 +137,14 @@ def setup_logging_subsystem(self, loglevel=None, logfile=None,
_MP_FORK_LOGFORMAT_=format)
return receivers

def _configure_logger(self, logger, logfile, loglevel,
format, colorize, **kwargs):
if logger is not None:
self.setup_handlers(logger, logfile, format,
colorize, **kwargs)
if loglevel:
logger.setLevel(loglevel)

def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
colorize=None, propagate=False, **kwargs):
"""Setup the task logger.
Expand Down
6 changes: 0 additions & 6 deletions celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,9 @@ def apply_async(self, args=None, kwargs=None,
if connection:
producer = app.amqp.TaskProducer(connection)
with app.producer_or_acquire(producer) as P:
evd = None
if conf.CELERY_SEND_TASK_SENT_EVENT:
evd = app.events.Dispatcher(channel=P.channel,
buffer_while_offline=False)

extra_properties = self.backend.on_task_call(P, task_id)
task_id = P.publish_task(self.name, args, kwargs,
task_id=task_id,
event_dispatcher=evd,
callbacks=maybe_list(link),
errbacks=maybe_list(link_error),
**dict(options, **extra_properties))
Expand Down
15 changes: 9 additions & 6 deletions celery/backends/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,22 @@ def get_task_meta(self, task_id, backlog_limit=1000):
with self.app.pool.acquire_channel(block=True) as (_, channel):
binding = self._create_binding(task_id)(channel)
binding.declare()
latest, acc = None, None
for i in range(backlog_limit):
latest, acc = acc, binding.get(no_ack=True)

prev = latest = acc = None
for i in range(backlog_limit): # spool ffwd
prev, latest, acc = latest, acc, binding.get(no_ack=False)
if not acc: # no more messages
break
if prev:
# backends are not expected to keep history,
# so we delete everything except the most recent state.
prev.ack()
else:
raise self.BacklogLimitExceeded(task_id)

if latest:
# new state to report
self._republish(channel, task_id, latest.body,
latest.content_type, latest.content_encoding)
payload = self._cache[task_id] = latest.payload
latest.requeue()
return payload
else:
# no new state, use previous
Expand Down
36 changes: 31 additions & 5 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

from datetime import timedelta

from billiard.einfo import ExceptionInfo
from kombu import serialization
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8

from celery import states
from celery.app import current_task
from celery.datastructures import LRUCache
from celery.exceptions import TimeoutError, TaskRevokedError
from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
from celery.five import items
from celery.result import from_serializable, GroupResult
from celery.utils import timeutils
Expand Down Expand Up @@ -84,6 +85,16 @@ def mark_as_failure(self, task_id, exc, traceback=None):
return self.store_result(task_id, exc, status=states.FAILURE,
traceback=traceback)

def fail_from_current_stack(self, task_id, exc=None):
type_, real_exc, tb = sys.exc_info()
try:
exc = real_exc if exc is None else exc
ei = ExceptionInfo((type_, exc, tb))
self.mark_as_failure(task_id, exc, ei.traceback)
return ei
finally:
del(tb)

def mark_as_retry(self, task_id, exc, traceback=None):
"""Mark task as being retries. Stores the current
exception (if any)."""
Expand Down Expand Up @@ -167,6 +178,9 @@ def encode_result(self, result, status):
else:
return self.prepare_value(result)

def is_cached(self, task_id):
return task_id in self._cache

def store_result(self, task_id, result, status, traceback=None, **kwargs):
"""Update task state and result."""
result = self.encode_result(result, status)
Expand Down Expand Up @@ -409,7 +423,7 @@ def on_chord_apply(self, group_id, body, result=None, **kwargs):
else:
self.fallback_chord_unlock(group_id, body, result, **kwargs)

def on_chord_part_return(self, task, propagate=False):
def on_chord_part_return(self, task, propagate=True):
if not self.implements_incr:
return
from celery import subtask
Expand All @@ -421,9 +435,21 @@ def on_chord_part_return(self, task, propagate=False):
deps = GroupResult.restore(gid, backend=task.backend)
val = self.incr(key)
if val >= len(deps):
subtask(task.request.chord).delay(deps.join(propagate=propagate))
deps.delete()
self.client.delete(key)
j = deps.join_native if deps.supports_native_join else deps.join
callback = subtask(task.request.chord)
try:
ret = j(propagate=propagate)
except Exception as exc:
culprit = next(deps._failed_join_report())
self.app._tasks[callback.task].backend.fail_from_current_stack(
callback.id, exc=ChordError('Dependency %s raised %r' % (
culprit.id, exc))
)
else:
callback.delay(ret)
finally:
deps.delete()
self.client.delete(key)
else:
self.expire(key, 86400)

Expand Down
Loading

0 comments on commit b6baedb

Please sign in to comment.