Skip to content

Commit

Permalink
Merge branch 'master' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Dec 17, 2013
2 parents 7983b0b + bbdef99 commit 6878765
Show file tree
Hide file tree
Showing 59 changed files with 1,249 additions and 568 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,4 @@ Daniel M Taub, 2013/10/22
Matt Wise, 2013/11/06
Michael Robellard, 2013/11/07
Vsevolod Kulaga, 2013/11/16

Ionel Cristian Mărieș, 2013/12/09
171 changes: 171 additions & 0 deletions Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,177 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.

.. _version-3.1.7:

3.1.7
=====
:release-date: 2013-12-17 06:00 P.M UTC
:release-by: Ask Solem

.. _v317-important:

Important Notes
---------------

Init script security improvements
---------------------------------

Where the generic init scripts (for ``celeryd``, and ``celerybeat``) before
delegated the responsibility of dropping privileges to the target application,
it will now use ``su`` instead, so that the Python program is not trusted
with superuser privileges.

This is not in reaction to any known exploit, but it will
limit the possibility of a privilege escalation bug being abused in the
future.

You have to upgrade the init scripts manually from this directory:
https://github.com/celery/celery/tree/3.1/extra/generic-init.d

AMQP result backend
~~~~~~~~~~~~~~~~~~~

The 3.1 release accidentally left the amqp backend configured to be
non-persistent by default.

Upgrading from 3.0 would give a "not equivalent" error when attempting to
set or retrieve results for a task. That is unless you manually set the
persistence setting::

CELERY_RESULT_PERSISTENT = True

This version restores the previous value so if you already forced
the upgrade by removing the existing exchange you must either
keep the configuration by setting ``CELERY_RESULT_PERSISTENT = False``
or delete the ``celeryresults`` exchange again.

Synchronous subtasks
~~~~~~~~~~~~~~~~~~~~

Tasks waiting for the result of a subtask will now emit
a :exc:`RuntimeWarning` warning when using the prefork pool,
and in 3.2 this will result in an exception being raised.

It's not legal for tasks to block by waiting for subtasks
as this is likely to lead to resource starvation and eventually
deadlock when using the prefork pool (see also :ref:`task-synchronous-subtasks`).

If you really know what you are doing you can avoid the warning (and
the future exception being raised) by moving the operation in a whitelist
block:

.. code-block:: python

from celery.result import allow_join_result

@app.task
def misbehaving():
result = other_task.delay()
with allow_join_result():
result.get()

Note also that if you wait for the result of a subtask in any form
when using the prefork pool you must also disable the pool prefetching
behavior with the worker :ref:`-Ofair option <prefork-pool-prefetch>`.

.. _v317-fixes:

Fixes
-----

- Now depends on :ref:`Kombu 3.0.8 <kombu:version-3.0.8>`.

- Now depends on :mod:`billiard` 3.3.0.13

- Events: Fixed compatibility with non-standard json libraries
that sends float as :class:`decimal.Decimal` (Issue #1731)

- Events: State worker objects now always defines attributes:
``active``, ``processed``, ``loadavg``, ``sw_ident``, ``sw_ver``
and ``sw_sys``.

- Worker: Now keeps count of the total number of tasks processed,
not just by type (``all_active_count``).

- Init scripts: Fixed problem with reading configuration file
when the init script is symlinked to a runlevel (e.g. ``S02celeryd``).
(Issue #1740).

This also removed a rarely used feature where you can symlink the script
to provide alternative configurations. You instead copy the script
and give it a new name, but perhaps a better solution is to provide
arguments to ``CELERYD_OPTS`` to separate them:

.. code-block:: bash

CELERYD_NODES="X1 X2 Y1 Y2"
CELERYD_OPTS="-A:X1 x -A:X2 x -A:Y1 y -A:Y2 y"

- Fallback chord unlock task is now always called after the chord header
(Issue #1700).

This means that the unlock task will not be started if there's
an error sending the header.

- Celery command: Fixed problem with arguments for some control commands.

Fix contributed by Konstantin Podshumok.

- Fixed bug in ``utcoffset`` where the offset when in DST would be
completely wrong (Issue #1743).

- Worker: Errors occurring while attempting to serialize the result of a
task will now cause the task to be marked with failure and a
:class:`kombu.exceptions.EncodingError` error.

Fix contributed by Ionel Cristian Mărieș.

- Worker with ``-B`` argument did not properly shut down the beat instance.

- Worker: The ``%n`` and ``%h`` formats are now also supported by the
:option:`--logfile`, :option:`--pidfile` and :option:`--statedb` arguments.

Example:

.. code-block:: bash

$ celery -A proj worker -n foo@%h --logfile=%n.log --statedb=%n.db

- Redis/Cache result backends: Will now timeout if keys evicted while trying
to join a chord.

- The fallbock unlock chord task now raises :exc:`Retry` so that the
retry even is properly logged by the worker.

- Multi: Will no longer apply Eventlet/gevent monkey patches (Issue #1717).

- Redis result backend: Now supports UNIX sockets.

Like the Redis broker transport the result backend now also supports
using ``redis+socket:///tmp/redis.sock`` URLs.

Contributed by Alcides Viamontes Esquivel.

- Events: Events sent by clients was mistaken for worker related events
(Issue #1714).

For ``events.State`` the tasks now have a ``Task.client`` attribute
that is set when a ``task-sent`` event is being received.

Also, a clients logical clock is not in sync with the cluster so
they live in a "time bubble". So for this reason monitors will no
longer attempt to merge with the clock of an event sent by a client,
instead it will fake the value by using the current clock with
a skew of -1.

- Prefork pool: The method used to find terminated processes was flawed
in that it did not also take into account missing popen objects.

- Canvas: ``group`` and ``chord`` now works with anon signatures as long
as the group/chord object is associated with an app instance (Issue #1744).

You can pass the app by using ``group(..., app=app)``.

.. _version-3.1.6:

3.1.6
Expand Down
20 changes: 11 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png

:Version: 3.1.6 (Cipater)
:Version: 3.1.7 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
Expand Down Expand Up @@ -274,10 +274,11 @@ Transports and Backends
for using Redis as a message transport or as a result backend.

:celery[mongodb]:
for using MongoDB as a message transport, or as a result backend.
for using MongoDB as a message transport (*experimental*),
or as a result backend (*supported*).

:celery[sqs]:
for using Amazon SQS as a message transport.
for using Amazon SQS as a message transport (*experimental*).

:celery[memcache]:
for using memcached as a result backend.
Expand All @@ -286,28 +287,29 @@ Transports and Backends
for using Apache Cassandra as a result backend.

:celery[couchdb]:
for using CouchDB as a message transport.
for using CouchDB as a message transport (*experimental*).

:celery[couchbase]:
for using CouchBase as a result backend.

:celery[beanstalk]:
for using Beanstalk as a message transport.
for using Beanstalk as a message transport (*experimental*).

:celery[zookeeper]:
for using Zookeeper as a message transport.

:celery[zeromq]:
for using ZeroMQ as a message transport.
for using ZeroMQ as a message transport (*experimental*).

:celery[sqlalchemy]:
for using SQLAlchemy as a message transport, or as a result backend.
for using SQLAlchemy as a message transport (*experimental*),
or as a result backend (*supported*).

:celery[pyro]:
for using the Pyro4 message transport.
for using the Pyro4 message transport (*experimental*).

:celery[slmq]:
for using the SoftLayer Message Queue transport.
for using the SoftLayer Message Queue transport (*experimental*).

.. _celery-installing-from-source:

Expand Down
2 changes: 1 addition & 1 deletion celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)

SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 6, '', '')
VERSION = version_info_t(3, 1, 7, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = '[email protected]'
Expand Down
4 changes: 2 additions & 2 deletions celery/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def _warn_deprecated(new):


def main():
maybe_patch_concurrency()
if 'multi' not in sys.argv:
maybe_patch_concurrency()
from celery.bin.celery import main
main()

Expand All @@ -37,7 +38,6 @@ def _compat_worker():


def _compat_multi():
maybe_patch_concurrency()
_warn_deprecated('celery multi')
from celery.bin.multi import main
main()
Expand Down
2 changes: 1 addition & 1 deletion celery/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __iter__(self):

def _set_task_join_will_block(blocks):
global _task_join_will_block
_task_join_will_block = True
_task_join_will_block = blocks


def task_join_will_block():
Expand Down
7 changes: 4 additions & 3 deletions celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
conf = self.conf
if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
warnings.warn(AlwaysEagerIgnored(
'CELERY_ALWAYS_EAGER has no effect on send_task'))
'CELERY_ALWAYS_EAGER has no effect on send_task',
), stacklevel=2)
options = router.route(options, name, args, kwargs)
if connection:
producer = self.amqp.TaskProducer(connection)
Expand Down Expand Up @@ -445,8 +446,8 @@ def _maybe_close_pool(self):
if self._pool:
self._pool.force_close_all()
self._pool = None
amqp = self.amqp
if amqp._producer_pool:
amqp = self.__dict__.get('amqp')
if amqp is not None and amqp._producer_pool is not None:
amqp._producer_pool.force_close_all()
amqp._producer_pool = None

Expand Down
27 changes: 10 additions & 17 deletions celery/app/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def add_unlock_chord_task(app):
"""
from celery.canvas import signature
from celery.exceptions import ChordError
from celery.result import result_from_tuple
from celery.result import allow_join_result, result_from_tuple

default_propagate = app.conf.CELERY_CHORD_PROPAGATES

Expand Down Expand Up @@ -95,7 +95,8 @@ def unlock_chord(group_id, callback, interval=None, propagate=None,
if deps.ready():
callback = signature(callback, app=app)
try:
ret = j(propagate=propagate)
with allow_join_result():
ret = j(timeout=3.0, propagate=propagate)
except Exception as exc:
try:
culprit = next(deps._failed_join_report())
Expand All @@ -117,8 +118,8 @@ def unlock_chord(group_id, callback, interval=None, propagate=None,
exc=ChordError('Callback error: {0!r}'.format(exc)),
)
else:
return unlock_chord.retry(countdown=interval,
max_retries=max_retries)
raise unlock_chord.retry(countdown=interval,
max_retries=max_retries)
return unlock_chord


Expand Down Expand Up @@ -277,8 +278,6 @@ def prepare_steps(self, args, tasks):
tasks.append(task)
prev_task, prev_res = task, res

print(tasks)

return tasks, results

def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
Expand Down Expand Up @@ -356,17 +355,11 @@ def run(self, header, body, partial_args=(), interval=None,
results = [AsyncResult(prepare_member(task, body, group_id))
for task in header.tasks]

# - fallback implementations schedules the chord_unlock task here
app.backend.on_chord_apply(group_id, body,
interval=interval,
countdown=countdown,
max_retries=max_retries,
propagate=propagate,
result=results)
# - call the header group, returning the GroupResult.
final_res = header(*partial_args, task_id=group_id)

return final_res
return self.backend.apply_chord(
header, partial_args, group_id,
body, interval=interval, countdown=countdown,
max_retries=max_retries, propagate=propagate, result=results,
)

def _prepare_member(self, task, body, group_id):
opts = task.options
Expand Down
Loading

0 comments on commit 6878765

Please sign in to comment.