Skip to content

Commit

Permalink
Merge branch '3.0'
Browse files Browse the repository at this point in the history
Conflicts:
	celery/backends/amqp.py
	celery/bin/base.py
	celery/tests/tasks/test_http.py
	celery/tests/utils.py
	docs/history/changelog-2.1.rst
	docs/userguide/monitoring.rst
  • Loading branch information
ask committed Jan 28, 2013
2 parents 2734c1e + 60a1097 commit 9359f8c
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 63 deletions.
15 changes: 9 additions & 6 deletions celery/backends/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,22 @@ def get_task_meta(self, task_id, backlog_limit=1000):
with self.app.pool.acquire_channel(block=True) as (_, channel):
binding = self._create_binding(task_id)(channel)
binding.declare()
latest, acc = None, None
for i in range(backlog_limit):
latest, acc = acc, binding.get(no_ack=True)

prev = latest = acc = None
for i in range(backlog_limit): ## spool ffwd
prev, latest, acc = latest, acc, binding.get(no_ack=False)
if not acc: # no more messages
break
if prev:
# backends are not expected to keep history,
# so we delete everything except the most recent state.
prev.ack()
else:
raise self.BacklogLimitExceeded(task_id)

if latest:
# new state to report
self._republish(channel, task_id, latest.body,
latest.content_type, latest.content_encoding)
payload = self._cache[task_id] = latest.payload
latest.requeue()
return payload
else:
# no new state, use previous
Expand Down
2 changes: 1 addition & 1 deletion celery/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _find_option_with_arg(argv, short_opts=None, long_opts=None):


def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
"""With short and long opt alternatives that specify the command-line
"""With short and long opt alternatives that specify the command line
option to set the pool, this makes sure that anything that needs
to be patched is completed as early as possible.
(e.g. eventlet/gevent monkey patches)."""
Expand Down
16 changes: 10 additions & 6 deletions celery/tests/backends/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ def mock_memcache(self):
memcache.Client = MemcachedClient
memcache.Client.__module__ = memcache.__name__
prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
yield True
if prev is not None:
sys.modules['memcache'] = prev
try:
yield True
finally:
if prev is not None:
sys.modules['memcache'] = prev

@contextmanager
def mock_pylibmc(self):
Expand All @@ -146,9 +148,11 @@ def mock_pylibmc(self):
pylibmc.Client.__module__ = pylibmc.__name__
prev = sys.modules.get('pylibmc')
sys.modules['pylibmc'] = pylibmc
yield True
if prev is not None:
sys.modules['pylibmc'] = prev
try:
yield True
finally:
if prev is not None:
sys.modules['pylibmc'] = prev


class test_get_best_memcache(Case, MockCacheMixin):
Expand Down
6 changes: 4 additions & 2 deletions celery/tests/tasks/test_chord.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def patch_unlock_retry():
unlock = current_app.tasks['celery.chord_unlock']
retry = Mock()
prev, unlock.retry = unlock.retry, retry
yield unlock, retry
unlock.retry = prev
try:
yield unlock, retry
finally:
unlock.retry = prev


class test_unlock_chord_task(AppCase):
Expand Down
7 changes: 4 additions & 3 deletions celery/tests/tasks/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ def _mocked(url, *args, **kwargs):

http.urlopen = _mocked

yield True

http.urlopen = urlopen
try:
yield True
finally:
http.urlopen = urlopen


def _response(res):
Expand Down
89 changes: 52 additions & 37 deletions celery/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ def wrap_logger(logger, loglevel=logging.ERROR):
siohandler = logging.StreamHandler(sio)
logger.handlers = [siohandler]

yield sio

logger.handlers = old_handlers
try:
yield sio
finally:
logger.handlers = old_handlers


@contextmanager
Expand All @@ -233,10 +234,10 @@ def eager_tasks():

prev = app.conf.CELERY_ALWAYS_EAGER
app.conf.CELERY_ALWAYS_EAGER = True

yield True

app.conf.CELERY_ALWAYS_EAGER = prev
try:
yield True
finally:
app.conf.CELERY_ALWAYS_EAGER = prev


def with_eager_tasks(fun):
Expand Down Expand Up @@ -374,8 +375,10 @@ def myimp(name, *args, **kwargs):
return realimport(name, *args, **kwargs)

builtins.__import__ = myimp
yield True
builtins.__import__ = realimport
try:
yield True
finally:
builtins.__import__ = realimport


@contextmanager
Expand All @@ -386,10 +389,11 @@ def override_stdouts():
sys.stdout = sys.__stdout__ = mystdout
sys.stderr = sys.__stderr__ = mystderr

yield mystdout, mystderr

sys.stdout = sys.__stdout__ = prev_out
sys.stderr = sys.__stderr__ = prev_err
try:
yield mystdout, mystderr
finally:
sys.stdout = sys.__stdout__ = prev_out
sys.stderr = sys.__stderr__ = prev_err


def patch(module, name, mocked):
Expand Down Expand Up @@ -420,14 +424,16 @@ def replace_module_value(module, name, value=None):
delattr(module, name)
except AttributeError:
pass
yield
if prev is not None:
setattr(sys, name, prev)
if not has_prev:
try:
delattr(module, name)
except AttributeError:
pass
try:
yield
finally:
if prev is not None:
setattr(sys, name, prev)
if not has_prev:
try:
delattr(module, name)
except AttributeError:
pass
pypy_version = partial(
replace_module_value, sys, 'pypy_version_info',
)
Expand All @@ -439,28 +445,34 @@ def replace_module_value(module, name, value=None):
@contextmanager
def sys_platform(value):
prev, sys.platform = sys.platform, value
yield
sys.platform = prev
try:
yield
finally:
sys.platform = prev


@contextmanager
def reset_modules(*modules):
prev = dict((k, sys.modules.pop(k)) for k in modules if k in sys.modules)
yield
sys.modules.update(prev)
try:
yield
finally:
sys.modules.update(prev)


@contextmanager
def patch_modules(*modules):
prev = {}
for mod in modules:
prev[mod], sys.modules[mod] = sys.modules[mod], ModuleType(mod)
yield
for name, mod in items(prev):
if mod is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = mod
try:
yield
finally:
for name, mod in items(prev):
if mod is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = mod


@contextmanager
Expand Down Expand Up @@ -505,8 +517,10 @@ def on_exit(*x):
reraise(x[0], x[1], x[2])
context.__exit__.side_effect = on_exit
context.__enter__.return_value = context
yield context
context.reset()
try:
yield context
finally:
context.reset()


@contextmanager
Expand Down Expand Up @@ -537,10 +551,11 @@ def patch_settings(app=None, **config):
pass
setattr(app.conf, key, value)

yield app.conf

for key, value in items(prev):
setattr(app.conf, key, value)
try:
yield app.conf
finally:
for key, value in items(prev):
setattr(app.conf, key, value)


@contextmanager
Expand Down
4 changes: 2 additions & 2 deletions docs/history/changelog-2.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ News
* :func:`~celery.task.control.broadcast`: Added callback argument, this can be
used to process replies immediately as they arrive.

* celeryctl: New command-line utility to manage and inspect worker nodes,
* celeryctl: New command line utility to manage and inspect worker nodes,
apply tasks and inspect the results of tasks.

.. seealso::
Expand Down Expand Up @@ -472,7 +472,7 @@ News
stdouts = logging.getLogger("mystdoutslogger")
log.redirect_stdouts_to_logger(stdouts, loglevel=logging.WARNING)
* worker: Added command-line option :option:`-I`/:option:`--include`:
* worker Added command line option :option:`-I`/:option:`--include`:

A comma separated list of (task) modules to be imported.

Expand Down
2 changes: 1 addition & 1 deletion docs/history/changelog-2.3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Fixes
* ``CELERY_TASK_ERROR_WHITE_LIST`` is now properly initialized
in all loaders.

* celeryd_detach now passes through command-line configuration.
* celeryd_detach now passes through command line configuration.

* Remote control command ``add_consumer`` now does nothing if the
queue is already being consumed from.
Expand Down
2 changes: 0 additions & 2 deletions docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ Workers

.. _monitoring-control:


Management Command-line Utilities (``inspect``/``control``)
-----------------------------------------------------------

.. versionadded:: 2.1

:program:`celery` can also be used to inspect
and manage worker nodes (and to some degree tasks).
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/routing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Hands-on with the API
---------------------

Celery comes with a tool called :program:`celery amqp`
that is used for command-line access to the AMQP API, enabling access to
that is used for command line access to the AMQP API, enabling access to
administration tasks like creating/deleting queues and exchanges, purging
queues or sending messages. It can also be used for non-AMQP brokers,
but different implementation may not implement all commands.
Expand Down
4 changes: 2 additions & 2 deletions docs/whatsnew-3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ In Other News
>>> import celery
>>> print(celery.bugreport())

- Using the ``celery`` command-line program:
- Using the ``celery`` command line program:

.. code-block:: bash
Expand Down Expand Up @@ -910,7 +910,7 @@ Internals

- Renamed module ``celery.abstract`` -> :mod:`celery.worker.bootsteps`.

- Command-line docs are now parsed from the module docstrings.
- Command line docs are now parsed from the module docstrings.

- Test suite directory has been reorganized.

Expand Down

0 comments on commit 9359f8c

Please sign in to comment.