Skip to content

Commit

Permalink
Adds CELERYD_WORKER_LOST_WAIT to control WorkerLostError timeout.
Browse files Browse the repository at this point in the history
Closes celery#595
  • Loading branch information
Brendon Crawford authored and ask committed Apr 16, 2012
1 parent b3495bb commit a923f20
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
1 change: 1 addition & 0 deletions celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def to_python(self, value):
"TASK_LOG_FORMAT": Option(DEFAULT_TASK_LOG_FMT),
"TASK_SOFT_TIME_LIMIT": Option(type="int"),
"TASK_TIME_LIMIT": Option(type="int"),
"WORKER_LOST_WAIT": Option(10.0, type="float")
},
"CELERYBEAT": {
"SCHEDULE": Option({}, type="dict"),
Expand Down
21 changes: 14 additions & 7 deletions celery/concurrency/processes/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
# Miscellaneous
#

LOST_WORKER_TIMEOUT = 10.0

job_counter = itertools.count()


Expand Down Expand Up @@ -533,7 +535,7 @@ class Pool(object):

def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, timeout=None, soft_timeout=None,
force_execv=False):
force_execv=False, lost_worker_timeout=LOST_WORKER_TIMEOUT):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
Expand All @@ -544,6 +546,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._initializer = initializer
self._initargs = initargs
self._force_execv = force_execv
self.lost_worker_timeout = lost_worker_timeout or LOST_WORKER_TIMEOUT

if soft_timeout and SIG_SOFT_TIMEOUT is None:
warnings.warn(UserWarning("Soft timeouts are not supported: "
Expand Down Expand Up @@ -755,12 +758,13 @@ def map(self, func, iterable, chunksize=None):
assert self._state == RUN
return self.map_async(func, iterable, chunksize).get()

def imap(self, func, iterable, chunksize=1, lost_worker_timeout=10.0):
def imap(self, func, iterable, chunksize=1, lost_worker_timeout=None):
'''
Equivalent of `itertools.imap()` -- can be MUCH slower
than `Pool.map()`
'''
assert self._state == RUN
lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
if chunksize == 1:
result = IMapIterator(self._cache,
lost_worker_timeout=lost_worker_timeout)
Expand All @@ -777,11 +781,12 @@ def imap(self, func, iterable, chunksize=1, lost_worker_timeout=10.0):
return (item for chunk in result for item in chunk)

def imap_unordered(self, func, iterable, chunksize=1,
lost_worker_timeout=10.0):
lost_worker_timeout=None):
'''
Like `imap()` method but ordering of results is arbitrary
'''
assert self._state == RUN
lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
if chunksize == 1:
result = IMapUnorderedIterator(self._cache,
lost_worker_timeout=lost_worker_timeout)
Expand All @@ -800,7 +805,7 @@ def imap_unordered(self, func, iterable, chunksize=1,
def apply_async(self, func, args=(), kwds={},
callback=None, accept_callback=None, timeout_callback=None,
waitforslot=False, error_callback=None,
soft_timeout=None, timeout=None):
soft_timeout=None, timeout=None, lost_worker_timeout=None):
'''
Asynchronous equivalent of `apply()` builtin.
Expand All @@ -817,6 +822,7 @@ def apply_async(self, func, args=(), kwds={},
'''
assert self._state == RUN
lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
if soft_timeout and SIG_SOFT_TIMEOUT is None:
warnings.warn(UserWarning("Soft timeouts are not supported: "
"on this platform: It does not have the SIGUSR1 signal."))
Expand All @@ -826,7 +832,8 @@ def apply_async(self, func, args=(), kwds={},
if self._state == RUN:
result = ApplyResult(self._cache, callback,
accept_callback, timeout_callback,
error_callback, soft_timeout, timeout)
error_callback, soft_timeout, timeout,
lost_worker_timeout)
if timeout or soft_timeout:
# start the timeout handler thread when required.
self._start_timeout_handler()
Expand Down Expand Up @@ -970,7 +977,7 @@ class ApplyResult(object):

def __init__(self, cache, callback, accept_callback=None,
timeout_callback=None, error_callback=None, soft_timeout=None,
timeout=None, lost_worker_timeout=10.0):
timeout=None, lost_worker_timeout=LOST_WORKER_TIMEOUT):
self._mutex = threading.Lock()
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
Expand Down Expand Up @@ -1131,7 +1138,7 @@ def worker_pids(self):
class IMapIterator(object):
_worker_lost = None

def __init__(self, cache, lost_worker_timeout=10.0):
def __init__(self, cache, lost_worker_timeout=LOST_WORKER_TIMEOUT):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
Expand Down
4 changes: 3 additions & 1 deletion celery/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def create(self, w):
timeout=w.task_time_limit,
soft_timeout=w.task_soft_time_limit,
putlocks=w.pool_putlocks,
force_execv=w.force_execv)
force_execv=w.force_execv,
lost_worker_timeout=w.worker_lost_wait)
return pool


Expand Down Expand Up @@ -191,6 +192,7 @@ class WorkController(configurated):
prefetch_multiplier = from_config()
state_db = from_config()
disable_rate_limits = from_config()
worker_lost_wait = from_config()

_state = None
_running = 0
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,18 @@ This option will be enabled by default in a later version.

This is not a problem on Windows, as it does not have `fork()`.

.. setting:: CELERYD_WORKER_LOST_WAIT

CELERYD_WORKER_LOST_WAIT
~~~~~~~~~~~~~~~~~~~~~~~~

In some cases a worker may be killed without proper cleanup,
and the worker may have published a result before terminating.
This value specifies how long we wait for any missing results before
raising a :exc:`@WorkerLostError` exception.

Default is 10.0

.. setting:: CELERYD_MAX_TASKS_PER_CHILD

CELERYD_MAX_TASKS_PER_CHILD
Expand Down

0 comments on commit a923f20

Please sign in to comment.