Skip to content

Commit

Permalink
[optimization] Revoke now accepts list argument (and used by ResultSe…
Browse files Browse the repository at this point in the history
…t.revoke)
  • Loading branch information
ask committed Sep 9, 2013
1 parent 8bbfa19 commit b9ca48d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 16 deletions.
8 changes: 3 additions & 5 deletions celery/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,9 @@ def revoke(self, connection=None, terminate=False, signal=None):
Default is TERM.
"""
with self.app.connection_or_acquire(connection) as conn:
for result in self.results:
result.revoke(
connection=conn, terminate=terminate, signal=signal,
)
self.app.control.revoke([r.id for r in self.results],
connection=connection,
terminate=terminate, signal=signal)

def __iter__(self):
return iter(self.results)
Expand Down
51 changes: 40 additions & 11 deletions celery/worker/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from celery.five import UserDict, items, StringIO
from celery.platforms import signals as _signals
from celery.utils import timeutils
from celery.utils.functional import maybe_list
from celery.utils.log import get_logger
from celery.utils import jsonify

Expand All @@ -34,23 +35,51 @@ def register(cls, method, name=None):
return method


def _find_requests_by_id(ids, requests):
found, total = 0, len(ids)
for request in worker_state.reserved_requests:
if request.id in ids:
yield request
found += 1
if found >= total:
break


@Panel.register
def revoke(state, task_id, terminate=False, signal=None, **kwargs):
"""Revoke task by task id."""
revoked.add(task_id)
if terminate:
# supports list argument since 3.1
task_ids, task_id = maybe_list(task_id) or [], None
to_terminate = set()
terminated = set()
for task_id in task_ids:
revoked.add(task_id)
if terminate:
to_terminate.add(task_id)

if to_terminate:
signum = _signals.signum(signal or 'TERM')
for request in worker_state.reserved_requests:
if request.id == task_id:
logger.info('Terminating %s (%s)', task_id, signum)
request.terminate(state.consumer.pool, signal=signum)
_to_terminate = set()
# reserved_requests changes size during iteration
# so need to consume the items first, then terminate after.
requests = set(_find_requests_by_id(
to_terminate,
worker_state.reserved_requests,
))
for request in requests:
logger.info('Terminating %s (%s)', task_id, signum)
request.terminate(state.consumer.pool, signal=signum)
terminated.add(request.id)
if len(terminated) >= len(_to_terminate):
break
else:
return {'ok': 'terminate: task {0} not found'.format(task_id)}
return {'ok': 'terminating {0} ({1})'.format(task_id, signal)}

logger.info('Revoking task %s', task_id)
return {'ok': 'revoking task {0}'.format(task_id)}
if not terminated:
return {'ok': 'terminate: tasks unknown'}
return {'ok': 'terminate: {0}'.format(', '.join(terminated))}

idstr = ', '.join(task_ids)
logger.info('Tasks flagged as revoked: %s', idstr)
return {'ok': 'tasks {0} flagged as revoked'.format(idstr)}


@Panel.register
Expand Down
30 changes: 30 additions & 0 deletions docs/userguide/workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ Terminating a task also revokes it.

::

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
Expand All @@ -257,6 +261,32 @@ Terminating a task also revokes it.
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
... terminate=True, signal='SIGKILL')




Revoking multiple tasks
-----------------------

.. versionadded:: 3.1


The revoke method also accepts a list argument, where it will revoke
several tasks at once.

**Example**

::

>>> app.control.revoke([
... '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
... 'f565793e-b041-4b2b-9ca4-dca22762a55d',
... 'd9d35e03-2997-42d0-a13e-64a66b88a618',
])


The ``GroupResult.revoke`` method takes advantage of this since
version 3.1.

.. _worker-persistent-revokes:

Persistent revokes
Expand Down

0 comments on commit b9ca48d

Please sign in to comment.