Skip to content

Commit

Permalink
platforms.signals: New interface for managing process signals.
Browse files Browse the repository at this point in the history
* install_signal_handler("USR1", handler) -> signals["USR1"] = handler
* install_signal_handler(USR1=handler)    -> signals.update(USR1=handler)
* ignore_signal("USR1", "USR2")           -> signals.ignore("USR1", "USR2")
* reset_signal("USR1", "USR2")            -> signals.reset("USR1", "USR2")
* get_signal("USR1")                      -> signals.signum("USR1")
  • Loading branch information
ask committed Apr 18, 2011
1 parent a40ab46 commit f0962a9
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 90 deletions.
2 changes: 1 addition & 1 deletion celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ def _sync(signum, frame):
beat.sync()
raise SystemExit()

platforms.install_signal_handler(SIGTERM=_sync, SIGINT=_sync)
platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)
14 changes: 7 additions & 7 deletions celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def _stop(signum, frame):
worker.stop(in_sighandler=True)
raise SystemExit()

platforms.install_signal_handler(SIGINT=_stop)
platforms.signals["SIGINT"] = _stop


def install_worker_int_again_handler(worker):
Expand All @@ -301,7 +301,7 @@ def _stop(signum, frame):
worker.terminate(in_sighandler=True)
raise SystemTerminate()

platforms.install_signal_handler(SIGINT=_stop)
platforms.signals["SIGINT"] =_stop


def install_worker_term_handler(worker):
Expand All @@ -316,7 +316,7 @@ def _stop(signum, frame):
worker.stop(in_sighandler=True)
raise SystemExit()

platforms.install_signal_handler(SIGTERM=_stop)
platforms.signals["SIGTERM"] = _stop


def install_worker_restart_handler(worker):
Expand All @@ -328,7 +328,7 @@ def restart_worker_sig_handler(signum, frame):
worker.stop(in_sighandler=True)
os.execv(sys.executable, [sys.executable] + sys.argv)

platforms.install_signal_handler(SIGHUP=restart_worker_sig_handler)
platforms.signals["SIGHUP"] = restart_worker_sig_handler


def install_cry_handler(logger):
Expand All @@ -341,7 +341,7 @@ def cry_handler(signum, frame):
"""Signal handler logging the stacktrace of all active threads."""
logger.error("\n" + cry())

platforms.install_signal_handler(SIGUSR1=cry_handler)
platforms.signals["SIGUSR1"] = cry_handler


def install_rdb_handler(): # pragma: no cover
Expand All @@ -352,7 +352,7 @@ def rdb_handler(signum, frame):
rdb.set_trace(frame)

if os.environ.get("CELERY_RDBSIG"):
platforms.install_signal_handler(SIGUSR2=rdb_handler)
platforms.signals["SIGUSR2"] = rdb_handler


def install_HUP_not_supported_handler(worker):
Expand All @@ -361,4 +361,4 @@ def warn_on_HUP_handler(signum, frame):
worker.logger.error("SIGHUP not supported: "
"Restarting with HUP is unstable on this platform!")

platforms.install_signal_handler(SIGHUP=warn_on_HUP_handler)
platforms.signals["SIGHUP"] = warn_on_HUP_handler
2 changes: 0 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class BaseBackend(object):

TimeoutError = TimeoutError

can_get_many = False

def __init__(self, *args, **kwargs):
from celery.app import app_or_default
self.app = app_or_default(kwargs.get("app"))
Expand Down
2 changes: 1 addition & 1 deletion celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def __init__(self, *args, **kwargs):
self.name = "Beat"

def run(self):
platforms.reset_signal("SIGTERM")
platforms.signals.reset("SIGTERM")
self.service.start(embedded_process=True)

def stop(self):
Expand Down
93 changes: 52 additions & 41 deletions celery/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys
import errno
import signal
import signal as _signal

from celery.local import try_import

Expand Down Expand Up @@ -172,7 +172,7 @@ def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
raise RuntimeError("This platform does not support detach.")
workdir = os.getcwd() if workdir is None else workdir

reset_signal("SIGCLD") # Make sure SIGCLD is using the default handler.
signals.reset("SIGCLD") # Make sure SIGCLD is using the default handler.
set_effective_user(uid=uid, gid=gid)

# Since without stderr any errors will be silently suppressed,
Expand Down Expand Up @@ -258,61 +258,72 @@ def set_effective_user(uid=None, gid=None):
gid and setegid(gid)


def get_signal(signal_name):
"""Get signal number from signal name."""
if not isinstance(signal_name, basestring) or not signal_name.isupper():
raise TypeError("signal name must be uppercase string.")
if not signal_name.startswith("SIG"):
signal_name = "SIG" + signal_name
return getattr(signal, signal_name)
class Signals(object):
ignored = _signal.SIG_IGN
default = _signal.SIG_DFL

def supported(self, signal_name):
"""Returns true value if ``signal_name`` exists on this platform."""
try:
return self.signum(signal_name)
except AttributeError:
pass

def reset_signal(*signal_names):
"""Reset signal to the default signal handler.
def signum(self, signal_name):
"""Get signal number from signal name."""
if isinstance(signal_name, int):
return signal_name
if not isinstance(signal_name, basestring) or not signal_name.isupper():
raise TypeError("signal name must be uppercase string.")
if not signal_name.startswith("SIG"):
signal_name = "SIG" + signal_name
return getattr(_signal, signal_name)

Does nothing if the platform doesn't support signals,
or the specified signal in particular.
def reset(self, *signal_names):
"""Reset signals to the default signal handler.
"""
for signal_name in signal_names:
try:
signum = getattr(signal, signal_name)
signal.signal(signum, signal.SIG_DFL)
except (AttributeError, ValueError):
pass
Does nothing if the platform doesn't support signals,
or the specified signal in particular.
"""
self.update((sig, self.default) for sig in signal_names)

def ignore_signal(*signal_names):
"""Ignore signal using :const:`SIG_IGN`.
def ignore(self, *signal_names):
"""Ignore signal using :const:`SIG_IGN`.
Does nothing if the platform doesn't support signals,
or the specified signal in particular.
Does nothing if the platform doesn't support signals,
or the specified signal in particular.
"""
for signal_name in signal_names:
try:
signum = getattr(signal, signal_name)
signal.signal(signum, signal.SIG_IGN)
except (AttributeError, ValueError):
pass
"""
self.update((sig, self.ignored) for sig in signal_names)

def __getitem__(self, signal_name):
return _signal.getsignal(self.signum(signal_name))

def install_signal_handler(signal_name=None, handler=None, **sigmap):
"""Install signal handlers.
def __setitem__(self, signal_name, handler):
"""Install signal handler.
Does nothing if the current platform doesn't support signals,
or the specified signal in particular.
Does nothing if the current platform doesn't support signals,
or the specified signal in particular.
"""
if signal_name:
sigmap[signal_name] = handler
for signal_name, handler in sigmap.iteritems():
"""
try:
signum = getattr(signal, signal_name)
signal.signal(signum, handler)
_signal.signal(self.signum(signal_name), handler)
except (AttributeError, ValueError):
pass

def update(self, _d_=None, **sigmap):
"""Set signal handlers from a mapping."""
for signal_name, handler in dict(_d_ or {}, **sigmap).iteritems():
self[signal_name] = handler


signals = Signals()
get_signal = signals.signum # compat
install_signal_handler = signals.__setitem__ # compat
reset_signal = signals.reset # compat
ignore_signal = signals.ignore # compat


def strargv(argv):
arg_start = 2 if "manage" in argv[0] else 1
Expand Down
13 changes: 6 additions & 7 deletions celery/tests/test_bin/test_celerybeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,17 @@ def test_run(self):
def psig(self, fun, *args, **kwargs):
handlers = {}

def i(sig=None, handler=None, **sigmap):
if sig:
sigmap[sig] = handler
handlers.update(sigmap)
class Signals(platforms.Signals):

p, platforms.install_signal_handler = \
platforms.install_signal_handler, i
def __setitem__(self, sig, handler):
handlers[sig] = handler

p, platforms.signals = platforms.signals, Signals()
try:
fun(*args, **kwargs)
return handlers
finally:
platforms.install_signal_handler = p
platforms.signals = p

def test_install_sync_handler(self):
b = beatapp.Beat()
Expand Down
39 changes: 18 additions & 21 deletions celery/tests/test_bin/test_celeryd.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ def test_loglevel_string(self):
def test_run_worker(self):
handlers = {}

def i(sig=None, handler=None, **sigmap):
if sig:
sigmap[sig] = handler
handlers.update(sigmap)
class Signals(platforms.Signals):

p = platforms.install_signal_handler
platforms.install_signal_handler = i
def __setitem__(self, sig, handler):
handlers[sig] = handler

p = platforms.signals
platforms.signals = Signals()
try:
w = self.Worker()
w._isatty = False
Expand All @@ -101,7 +101,7 @@ def i(sig=None, handler=None, **sigmap):
self.assertIn(sig, handlers)
self.assertNotIn("SIGHUP", handlers)
finally:
platforms.install_signal_handler = p
platforms.signals = p

@disable_stdouts
def test_startup_info(self):
Expand Down Expand Up @@ -380,38 +380,35 @@ def terminate(self, in_sighandler=False):
def psig(self, fun, *args, **kwargs):
handlers = {}

def i(sig=None, handler=None, **sigmap):
if sig:
sigmap[sig] = handler
handlers.update(sigmap)
class Signals(platforms.Signals):
def __setitem__(self, sig, handler):
handlers[sig] = handler

p, platforms.install_signal_handler = \
platforms.install_signal_handler, i
p, platforms.signals = platforms.signals, Signals()
try:
fun(*args, **kwargs)
return handlers
finally:
platforms.install_signal_handler = p
platforms.signals = p

@disable_stdouts
def test_worker_int_handler(self):
worker = self._Worker()
handlers = self.psig(cd.install_worker_int_handler, worker)
next_handlers = {}

def i(sig=None, handler=None, **sigmap):
if sig:
sigmap[sig] = handler
next_handlers.update(sigmap)
class Signals(platforms.Signals):

def __setitem__(self, sig, handler):
next_handlers[sig] = handler

p = platforms.install_signal_handler
platforms.install_signal_handler = i
p, platforms.signals = platforms.signals, Signals()
try:
self.assertRaises(SystemExit, handlers["SIGINT"],
"SIGINT", object())
self.assertTrue(worker.stopped)
finally:
platforms.install_signal_handler = p
platforms.signals = p

self.assertRaises(SystemExit, next_handlers["SIGINT"],
"SIGINT", object())
Expand Down
10 changes: 4 additions & 6 deletions celery/tests/test_worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,9 @@ def create_worker(self, **kw):
worker.logger = Mock()
return worker

@patch("celery.platforms.reset_signal")
@patch("celery.platforms.ignore_signal")
@patch("celery.platforms.signals")
@patch("celery.platforms.set_mp_process_title")
def test_process_initializer(self, set_mp_process_title, ignore_signal,
reset_signal):
def test_process_initializer(self, set_mp_process_title, _signals):
from celery import Celery
from celery import signals
from celery.app import _tls
Expand All @@ -544,9 +542,9 @@ def on_worker_process_init(**kwargs):
app = Celery(loader=Mock(), set_as_current=False)
process_initializer(app, "awesome.worker.com")
self.assertIn((tuple(WORKER_SIGIGNORE), {}),
ignore_signal.call_args_list)
_signals.ignore.call_args_list)
self.assertIn((tuple(WORKER_SIGRESET), {}),
reset_signal.call_args_list)
_signals.reset.call_args_list)
self.assertTrue(app.loader.init_worker.call_count)
self.assertTrue(on_worker_process_init.called)
self.assertIs(_tls.current_app, app)
Expand Down
4 changes: 2 additions & 2 deletions celery/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def process_initializer(app, hostname):
"""
app = app_or_default(app)
app.set_current()
platforms.reset_signal(*WORKER_SIGRESET)
platforms.ignore_signal(*WORKER_SIGIGNORE)
platforms.signals.reset(*WORKER_SIGRESET)
platforms.signals.ignore(*WORKER_SIGIGNORE)
platforms.set_mp_process_title("celeryd", hostname=hostname)

# This is for Windows and other platforms not supporting
Expand Down
4 changes: 2 additions & 2 deletions celery/worker/control/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from datetime import datetime

from celery.platforms import get_signal
from celery.platforms import signals as _signals
from celery.registry import tasks
from celery.utils import timeutils
from celery.worker import state
Expand All @@ -19,7 +19,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
revoked.add(task_id)
action = "revoked"
if terminate:
signum = get_signal(signal)
signum = _signals.signum(signal)
for request in state.active_requests:
if request.task_id == task_id:
action = "terminated (%s)" % (signum, )
Expand Down

0 comments on commit f0962a9

Please sign in to comment.