Skip to content

Commit

Permalink
Generate compat modules dynamically on use
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Mar 31, 2012
1 parent cf397f5 commit 939bf9c
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 99 deletions.
65 changes: 65 additions & 0 deletions celery/__compat__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import absolute_import

import sys

from types import ModuleType

from .local import Proxy

compat_modules = {
"messaging": {
"TaskPublisher": "amqp.TaskPublisher",
"ConsumerSet": "amqp.ConsumerSet",
"TaskConsumer": "amqp.TaskConsumer",
"establish_connection": "broker_connection",
"with_connection": "with_default_connection",
"get_consumer_set": "amqp.get_task_consumer"
},
"log": {
"get_default_logger": "log.get_default_logger",
"setup_logger": "log.setup_logger",
"setup_task_logger": "log.setup_task_logger",
"get_task_logger": "log.get_task_logger",
"setup_loggig_subsystem": "log.setup_logging_subsystem",
"redirect_stdouts_to_logger": "log.redirect_stdouts_to_logger",
},
}

def rgetattr(obj, path):
for part in path:
obj = getattr(obj, part)
return obj

def _module(g, name, attrs):
attrs = dict((name, Proxy(rgetattr, (g.current_app, attr.split('.'))))
for name, attr in attrs.iteritems())
return type(name, (ModuleType, ), attrs)('.'.join(["celery", name]))


def install_compat_modules(g):
from types import ModuleType
mods = sys.modules

current_app = g.current_app

for name, attrs in compat_modules.iteritems():
print("CREATE MODULE: %r %r" % (name, attrs))
module = _module(g, name, attrs)
setattr(g, name, module)
sys.modules[module.__name__] = module

class registry(ModuleType):
tasks = Proxy(lambda: current_app.tasks)
g.registry = mods["celery.registry"] = registry("celery.registry")

class decorators(ModuleType):
def task(*args, **kwargs):
kwargs.setdefault("accept_magic_kwargs", True)
return current_app.task(*args, **kwargs)

def periodic_task(*args, **kwargs):
from celery.task import periodic_task
kwargs.setdefault("accept_magic_kwargs", True)
return periodic_task(*args, **kwargs)
g.decorators = mods["celery.decorators"] \
= decorators("celery.decorators")
54 changes: 52 additions & 2 deletions celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,69 @@
"Python 2.4 is not supported by this version. "
"Please use Celery versions 2.1.x or earlier.")

# Lazy loading
from types import ModuleType
from .local import Proxy


compat_modules = ("messaging", "log", "registry", "decorators")


class module(ModuleType):
__all__ = ("Celery", "current_app", "bugreport")
__compat_installed__ = False

def __getattr__(self, name):
if name in compat_modules:
if not self.__compat_installed__:
self.__compat_installed__ = True
from .__compat__ import install_compat_modules
install_compat_modules(self)
return ModuleType.__getattribute__(self, name)

def __dir__(self):
result = list(new_module.__all__)
result.extend(("__file__", "__path__", "__doc__", "__all__",
"__docformat__", "__name__", "__path__", "VERSION",
"__package__", "__version__", "__author__",
"__contact__", "__homepage__", "__docformat__"))
return result

# 2.5 does not define __package__
try:
package = __package__
except NameError:
package = "kombu"

# keep a reference to this module so that it's not garbage collected
old_module = sys.modules[__name__]

new_module = sys.modules[__name__] = module(__name__)
new_module.__dict__.update({
"__file__": __file__,
"__path__": __path__,
"__doc__": __doc__,
"__version__": __version__,
"__author__": __author__,
"__contact__": __contact__,
"__homepage__": __homepage__,
"__docformat__": __docformat__,
"__package__": package,
"VERSION": VERSION})

def Celery(*args, **kwargs):
from .app import App
return App(*args, **kwargs)


def _get_current_app():
from .app import current_app
return current_app()
current_app = Proxy(_get_current_app)


def bugreport():
return current_app.bugreport()

new_module.Celery = Celery
new_module.current_app = current_app
new_module.bugreport = bugreport

5 changes: 5 additions & 0 deletions celery/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ def disable_trace():
app_or_default = _app_or_default



def bugreport():
return current_app().bugreport()


app_or_default = _app_or_default
if os.environ.get("CELERY_TRACE_APP"): # pragma: no cover
enable_trace()
4 changes: 3 additions & 1 deletion celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ def _inner(*args, **kwargs):

def prepare_config(self, c):
"""Prepare configuration before it is merged with the defaults."""
c.update(self._preconf)
if self._preconf:
for key, value in self._preconf.iteritems():
setattr(c, key, value)
return find_deprecated_settings(c)

def now(self):
Expand Down
5 changes: 2 additions & 3 deletions celery/bin/celeryd_detach.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
try:
os.execv(path, [path] + argv)
except Exception:
import logging
from ..log import setup_logger
logger = setup_logger(logfile=logfile, loglevel=logging.ERROR)
from .. import current_app
logger = current_app.log.setup_logger("ERROR", logfile)
logger.critical("Can't exec %r", " ".join([path] + argv),
exc_info=True)

Expand Down
44 changes: 0 additions & 44 deletions celery/decorators.py

This file was deleted.

15 changes: 0 additions & 15 deletions celery/log.py

This file was deleted.

13 changes: 0 additions & 13 deletions celery/messaging.py

This file was deleted.

6 changes: 0 additions & 6 deletions celery/registry.py

This file was deleted.

24 changes: 12 additions & 12 deletions celery/tests/test_app/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@
import logging
from tempfile import mktemp

from celery import current_app
from celery import log
from celery.log import (setup_logger, setup_task_logger,
get_default_logger, get_task_logger,
redirect_stdouts_to_logger, setup_logging_subsystem)
from celery.utils.log import LoggingProxy
from celery.utils import uuid
from celery.utils.compat import _CompatLoggerAdapter
from celery.tests.utils import (Case, override_stdouts, wrap_logger,
get_handlers, set_handlers)

log = current_app.log


class test_default_logger(Case):

def setUp(self):
self.setup_logger = setup_logger
self.get_logger = get_default_logger
self.setup_logger = log.setup_logger
self.get_logger = log.get_default_logger
log._setup = False

def test_setup_logging_subsystem_colorize(self):
setup_logging_subsystem(colorize=None)
setup_logging_subsystem(colorize=True)
log.setup_logging_subsystem(colorize=None)
log.setup_logging_subsystem(colorize=True)

def test_setup_logging_subsystem_no_mputil(self):
from celery.utils import log as logtools
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_redirect_stdouts(self):
root=False)
try:
with wrap_logger(logger) as sio:
redirect_stdouts_to_logger(logger, loglevel=logging.ERROR)
log.redirect_stdouts_to_logger(logger, loglevel=logging.ERROR)
logger.error("foo")
self.assertIn("foo", sio.getvalue())
finally:
Expand Down Expand Up @@ -116,17 +116,17 @@ def test_logging_proxy(self):
class test_task_logger(test_default_logger):

def setUp(self):
logger = get_task_logger()
logger = log.get_task_logger()
logger.handlers = []
logging.root.manager.loggerDict.pop(logger.name, None)
self.uid = uuid()

def setup_logger(self, *args, **kwargs):
return setup_task_logger(*args, **dict(kwargs, task_name=self.uid,
task_id=self.uid))
return log.setup_task_logger(*args, **dict(kwargs, task_name=self.uid,
task_id=self.uid))

def get_logger(self, *args, **kwargs):
return get_task_logger(*args, **dict(kwargs, name=self.uid))
return log.get_task_logger(*args, **dict(kwargs, name=self.uid))


class MockLogger(logging.Logger):
Expand Down
4 changes: 1 addition & 3 deletions celery/tests/test_worker/test_worker_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from celery.exceptions import (RetryTaskError,
WorkerLostError, InvalidTaskError)
from celery.execute.trace import eager_trace_task, TraceInfo
from celery.log import setup_logger
from celery.result import AsyncResult
from celery.task import task as task_dec
from celery.task.base import Task
Expand Down Expand Up @@ -685,8 +684,7 @@ def _test_on_failure(self, exception):

logfh = WhateverIO()
tw.logger.handlers = []
tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
root=False)
tw.logger = app.log.setup_logger("INFO", logfh, root=False)

app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True

Expand Down
21 changes: 21 additions & 0 deletions celery/utils/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,24 @@ def emit(self, record):
stat = os.stat(self.baseFilename)
self.dev, self.ino = stat[ST_DEV], stat[ST_INO]
logging.FileHandler.emit(self, record)



def add_compat_modules():

from celery import current_app
from celery.local import Proxy

from types import ModuleType

class messaging(ModuleType):
TaskPublisher = Proxy(lambda: current_app.amqp.TaskPublisher)
ConsumerSet = Proxy(lambda: current_app.amqp.ConsumerSet)
TaskConsumer = Proxy(lambda: current_app.amqp.TaskConsumer)
establish_connection = Proxy(lambda: current_app.broker_connection)
with_connection = Proxy(lambda: current_app.with_default_connection)
get_consumer_set = Proxy(lambda: current_app.amqp.get_task_consumer)

sys.modules["celery.messaging"] = messaging("celery.messaging")


0 comments on commit 939bf9c

Please sign in to comment.