From 703ae4b0b7d3731a06178decf8b8cfa415653680 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 20 Apr 2012 18:29:51 +0100 Subject: [PATCH] No longer depends on multiprocessing backport, only billiard --- celery/app/__init__.py | 4 +- celery/apps/worker.py | 19 +--------- celery/beat.py | 9 ++--- celery/bin/celeryd.py | 9 +---- celery/concurrency/processes/__init__.py | 3 +- celery/contrib/rdb.py | 5 ++- celery/platforms.py | 6 +-- celery/tests/test_app/test_beat.py | 5 +-- celery/tests/test_bin/test_celeryd.py | 48 +----------------------- celery/utils/__init__.py | 4 +- celery/utils/log.py | 13 +------ celery/utils/mp.py | 47 +++++++++++++++++++++++ celery/utils/patch.py | 7 +--- celery/worker/__init__.py | 8 +--- requirements/py25.txt | 3 +- setup.cfg | 3 +- setup.py | 4 +- 17 files changed, 70 insertions(+), 127 deletions(-) create mode 100644 celery/utils/mp.py diff --git a/celery/app/__init__.py b/celery/app/__init__.py index f2d5f1a98bb..fa0110f726d 100644 --- a/celery/app/__init__.py +++ b/celery/app/__init__.py @@ -54,13 +54,13 @@ def _app_or_default(app=None): def _app_or_default_trace(app=None): # pragma: no cover from traceback import print_stack - from multiprocessing import current_process + from celery.utils.mp import get_process_name if app is None: if getattr(state._tls, "current_app", None): print("-- RETURNING TO CURRENT APP --") # noqa+ print_stack() return state._tls.current_app - if current_process()._name == "MainProcess": + if get_process_name() == "MainProcess": raise Exception("DEFAULT APP") print("-- RETURNING TO DEFAULT APP --") # noqa+ print_stack() diff --git a/celery/apps/worker.py b/celery/apps/worker.py index 0e91ff4de4b..b64b2496470 100644 --- a/celery/apps/worker.py +++ b/celery/apps/worker.py @@ -3,10 +3,6 @@ import atexit import logging -try: - import multiprocessing -except ImportError: - multiprocessing = None # noqa import os import socket import sys @@ -19,6 +15,7 @@ from celery.utils import cry, isatty from celery.utils.imports import qualname from celery.utils.log import LOG_LEVELS, get_logger, mlevel +from celery.utils.mp import cpu_count, get_process_name from celery.utils.text import pluralize from celery.worker import WorkController @@ -60,20 +57,6 @@ """ -def cpu_count(): - if multiprocessing is not None: - try: - return multiprocessing.cpu_count() - except NotImplementedError: - pass - return 2 - - -def get_process_name(): - if multiprocessing is not None: - return multiprocessing.current_process().name - - class Worker(configurated): WorkController = WorkController diff --git a/celery/beat.py b/celery/beat.py index d88b90747c6..5c88ae31db0 100644 --- a/celery/beat.py +++ b/celery/beat.py @@ -18,10 +18,6 @@ import sys import threading import traceback -try: - import multiprocessing -except ImportError: - multiprocessing = None # noqa from kombu.utils import reprcall from kombu.utils.functional import maybe_promise @@ -36,6 +32,7 @@ from .utils.imports import instantiate from .utils.timeutils import humanize_seconds from .utils.log import get_logger +from .utils.mp import Process logger = get_logger(__name__) debug, info, error = logger.debug, logger.info, logger.error @@ -449,9 +446,9 @@ def stop(self): self.service.stop(wait=True) -if multiprocessing is not None: +if Process is not None: - class _Process(multiprocessing.Process): + class _Process(Process): """Embedded task scheduler using multiprocessing.""" def __init__(self, *args, **kwargs): diff --git a/celery/bin/celeryd.py b/celery/bin/celeryd.py index 321401a2cab..317cd23b988 100644 --- a/celery/bin/celeryd.py +++ b/celery/bin/celeryd.py @@ -75,14 +75,7 @@ import sys -try: - import multiprocessing # noqa -except ImportError: # pragma: no cover - freeze_support = lambda: True -else: - # patch with freeze_support from billiard - from billiard import freeze_support # noqa - +from celery.utils.mp import freeze_support from celery.bin.base import Command, Option diff --git a/celery/concurrency/processes/__init__.py b/celery/concurrency/processes/__init__.py index d11b3c07ffe..41aee053898 100644 --- a/celery/concurrency/processes/__init__.py +++ b/celery/concurrency/processes/__init__.py @@ -10,8 +10,7 @@ from celery import signals from celery.app import app_or_default from celery.concurrency.base import BasePool - -from billiard.pool import Pool, RUN +from celery.utils.mp import Pool, RUN if platform.system() == "Windows": # pragma: no cover # On Windows os.kill calls TerminateProcess which cannot be diff --git a/celery/contrib/rdb.py b/celery/contrib/rdb.py index 5912c79a48c..252bd868bbe 100644 --- a/celery/contrib/rdb.py +++ b/celery/contrib/rdb.py @@ -67,8 +67,9 @@ def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT, self.active = True try: - from multiprocessing import current_process - _, port_skew = current_process().name.split('-') + from celery.utils.mp import current_process + if current_process: + _, port_skew = current_process().name.split('-') except (ImportError, ValueError): pass port_skew = int(port_skew) diff --git a/celery/platforms.py b/celery/platforms.py index 78d3949da72..c52f3b17634 100644 --- a/celery/platforms.py +++ b/celery/platforms.py @@ -23,6 +23,7 @@ from .local import try_import from kombu.utils.limits import TokenBucket +from celery.utils.mp import current_process _setproctitle = try_import("setproctitle") resource = try_import("resource") @@ -34,11 +35,6 @@ EX_UNAVAILABLE = getattr(os, "EX_UNAVAILABLE", 69) EX_USAGE = getattr(os, "EX_USAGE", 64) -try: - from multiprocessing.process import current_process -except ImportError: - current_process = None # noqa - SYSTEM = _platform.system() IS_OSX = SYSTEM == "Darwin" IS_WINDOWS = SYSTEM == "Windows" diff --git a/celery/tests/test_app/test_beat.py b/celery/tests/test_app/test_beat.py index a60efd517a3..056a0866e12 100644 --- a/celery/tests/test_app/test_beat.py +++ b/celery/tests/test_app/test_beat.py @@ -305,9 +305,8 @@ def test_start_manages_one_tick_before_shutdown(self): class test_EmbeddedService(Case): def test_start_stop_process(self): - try: - from multiprocessing import Process - except ImportError: + from celery.utils.mp import Process + if not Process: raise SkipTest("multiprocessing not available") s = beat.EmbeddedService() diff --git a/celery/tests/test_bin/test_celeryd.py b/celery/tests/test_bin/test_celeryd.py index d4f2d095581..b6f9617eabe 100644 --- a/celery/tests/test_bin/test_celeryd.py +++ b/celery/tests/test_bin/test_celeryd.py @@ -6,10 +6,6 @@ import sys from functools import wraps -try: - from multiprocessing import current_process -except ImportError: - current_process = None # noqa from mock import patch from nose import SkipTest @@ -24,6 +20,7 @@ from celery.tests.utils import (AppCase, WhateverIO, mask_modules, reset_modules, skip_unless_module) +from celery.utils.mp import current_process from celery.utils.log import ensure_process_aware_logger @@ -57,49 +54,6 @@ class Worker(cd.Worker): WorkController = _WorkController -class test_compilation(AppCase): - - def test_no_multiprocessing(self): - with mask_modules("multiprocessing"): - with reset_modules("celery.apps.worker"): - from celery.apps.worker import multiprocessing - self.assertIsNone(multiprocessing) - - def test_cpu_count_no_mp(self): - with mask_modules("multiprocessing"): - with reset_modules("celery.apps.worker"): - from celery.apps.worker import cpu_count - self.assertEqual(cpu_count(), 2) - - @skip_unless_module("multiprocessing") - def test_no_cpu_count(self): - - @patch("multiprocessing.cpu_count") - def _do_test(pcount): - pcount.side_effect = NotImplementedError("cpu_count") - from celery.apps.worker import cpu_count - self.assertEqual(cpu_count(), 2) - pcount.assert_called_with() - - _do_test() - - def test_process_name_wo_mp(self): - with mask_modules("multiprocessing"): - with reset_modules("celery.apps.worker"): - from celery.apps.worker import get_process_name - self.assertIsNone(get_process_name()) - - @skip_unless_module("multiprocessing") - def test_process_name_w_mp(self): - - @patch("multiprocessing.current_process") - def _do_test(current_process): - from celery.apps.worker import get_process_name - self.assertTrue(get_process_name()) - - _do_test() - - class test_Worker(AppCase): Worker = Worker diff --git a/celery/utils/__init__.py b/celery/utils/__init__.py index 1265a059b1d..ddedb04c1a1 100644 --- a/celery/utils/__init__.py +++ b/celery/utils/__init__.py @@ -27,9 +27,7 @@ from .imports import symbol_by_name, qualname from .functional import noop - -register_after_fork = symbol_by_name( - "multiprocessing.util.register_after_fork", default=noop) +from .mp import register_after_fork PENDING_DEPRECATION_FMT = """ %(description)s is scheduled for deprecation in \ diff --git a/celery/utils/log.py b/celery/utils/log.py index a932fe0183e..208b700fa36 100644 --- a/celery/utils/log.py +++ b/celery/utils/log.py @@ -6,15 +6,10 @@ import threading import traceback -try: - from multiprocessing import current_process - from multiprocessing import util as mputil -except ImportError: - current_process = mputil = None # noqa - from kombu.log import get_logger as _get_logger, LOG_LEVELS from .encoding import safe_str, str_t +from .mp import current_process, util as mputil from .term import colored _process_aware = False @@ -170,12 +165,6 @@ def fileno(self): def _patch_logger_class(): """Make sure process name is recorded when loggers are used.""" - - try: - from multiprocessing.process import current_process - except ImportError: - current_process = None # noqa - logging._acquireLock() try: OldLoggerClass = logging.getLoggerClass() diff --git a/celery/utils/mp.py b/celery/utils/mp.py new file mode 100644 index 00000000000..06ccf320b63 --- /dev/null +++ b/celery/utils/mp.py @@ -0,0 +1,47 @@ +try: + import billiard + from billiard import util + from billiard import pool + current_process = billiard.current_process + register_after_fork = util.register_after_fork + freeze_support = billiard.freeze_support + Process = billiard.Process + cpu_count = billiard.cpu_count + Pool = pool.Pool + RUN = pool.RUN +except ImportError: + try: + import multiprocessing + from multiprocessing import util + from multiprocessing import pool + current_process = multiprocessing.current_process + register_after_fork = util.register_after_fork + freeze_support = multiprocessing.freeze_support + Process = multiprocessing.Process + cpu_count = multiprocessing.cpu_count + Pool = pool.Pool + RUN = pool.RUN + except ImportError: + current_process = None + util = None + register_after_fork = lambda *a, **kw: None + freeze_support = lambda: True + Process = None + cpu_count = lambda: 2 + Pool = None + RUN = 1 + + +def get_process_name(): + if current_process is not None: + return current_process().name + +def forking_enable(enabled): + try: + from billiard import forking_enable + except ImportError: + try: + from multiprocessing import forking_enable + except ImportError: + return + forking_enable(enabled) diff --git a/celery/utils/patch.py b/celery/utils/patch.py index 226126ea3c2..78e780445e7 100644 --- a/celery/utils/patch.py +++ b/celery/utils/patch.py @@ -18,12 +18,7 @@ def _patch_logger_class(): """Make sure process name is recorded when loggers are used.""" - - try: - from multiprocessing.process import current_process - except ImportError: - current_process = None # noqa - + from .mp import current_process logging._acquireLock() try: OldLoggerClass = logging.getLoggerClass() diff --git a/celery/worker/__init__.py b/celery/worker/__init__.py index 47ea6bd1962..148a0c219b7 100644 --- a/celery/worker/__init__.py +++ b/celery/worker/__init__.py @@ -28,6 +28,7 @@ from celery.app.abstract import configurated, from_config from celery.exceptions import SystemTerminate from celery.utils.functional import noop +from celery.utils.mp import forking_enable from celery.utils.imports import qualname, reload_from_cwd from celery.utils.log import get_logger @@ -86,12 +87,7 @@ def __init__(self, w, autoscale=None, **kwargs): w.max_concurrency, w.min_concurrency = w.autoscale def create(self, w): - try: - from billiard import forking_enable - except ImportError: - pass - else: - forking_enable(not w.force_execv) + forking_enable(not w.force_execv) pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency, initargs=(w.app, w.hostname), maxtasksperchild=w.max_tasks_per_child, diff --git a/requirements/py25.txt b/requirements/py25.txt index a5d6966381f..93d8d1c0fd9 100644 --- a/requirements/py25.txt +++ b/requirements/py25.txt @@ -1,4 +1,3 @@ -multiprocessing==2.6.2.1 importlib ordereddict -simplejson \ No newline at end of file +simplejson diff --git a/setup.cfg b/setup.cfg index 3a31b1a8939..dee9c51284e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,8 +42,7 @@ upload-dir = docs/.build/html [bdist_rpm] requires = uuid importlib - multiprocessing == 2.6.2.1 - billiard>=2.7.3.0 + billiard>=2.7.3.2 python-dateutil >= 1.5 anyjson >= 0.3.1 kombu >= 2.1.5 diff --git a/setup.py b/setup.py index 80ea3fb3dd5..52f0c51278c 100644 --- a/setup.py +++ b/setup.py @@ -112,7 +112,7 @@ def run(self, *args, **kwargs): except ImportError: install_requires.append("importlib") install_requires.extend([ - "billiard>=2.7.3.0", + "billiard>=2.7.3.2", "anyjson>=0.3.1", "kombu>=2.1.5,<3.0", ]) @@ -126,8 +126,6 @@ def run(self, *args, **kwargs): is_pypy = hasattr(sys, "pypy_version_info") if sys.version_info < (2, 7): install_requires.append("ordereddict") # Replacement for the ordered dict -if sys.version_info < (2, 6) and not (is_jython or is_pypy): - install_requires.append("multiprocessing") if is_jython: install_requires.append("threadpool")