Skip to content

Commit

Permalink
Tests passing on Python 3
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Sep 15, 2011
1 parent 3e0c447 commit cf6868e
Show file tree
Hide file tree
Showing 40 changed files with 329 additions and 235 deletions.
9 changes: 6 additions & 3 deletions celery/app/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
reset_multiprocessing_logger)
from ..utils.term import colored

is_py3k = sys.version_info >= (3, 0)


class Logging(object):
#: The logging subsystem is only configured once per process.
Expand Down Expand Up @@ -51,7 +53,8 @@ def setup_logging_subsystem(self, loglevel=None, logfile=None,
if colorize is None:
colorize = self.supports_color(logfile)
reset_multiprocessing_logger()
ensure_process_aware_logger()
if not is_py3k:
ensure_process_aware_logger()
receivers = signals.setup_logging.send(sender=None,
loglevel=loglevel, logfile=logfile,
format=format, colorize=colorize)
Expand Down Expand Up @@ -147,9 +150,9 @@ def redirect_stdouts_to_logger(self, logger, loglevel=None,
"""
proxy = LoggingProxy(logger, loglevel)
if stdout:
sys.stdout = proxy
sys.stdout = sys.__stdout__ = proxy
if stderr:
sys.stderr = proxy
sys.stderr = sys.__stderr__ = proxy
return proxy

def _setup_logger(self, logger, logfile, format, colorize,
Expand Down
7 changes: 5 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
from ..datastructures import LRUCache
from ..exceptions import TimeoutError, TaskRevokedError
from ..utils import timeutils
from ..utils.encoding import from_utf8
from ..utils.serialization import (get_pickled_exception,
get_pickleable_exception,
create_exception_cls)

EXCEPTION_ABLE_CODECS = frozenset(["pickle", "yaml"])
is_py3k = sys.version_info >= (3, 0)


def unpickle_backend(cls, args, kwargs):
Expand Down Expand Up @@ -51,7 +53,8 @@ def encode(self, data):
return payload

def decode(self, payload):
return serialization.decode(str(payload),
payload = is_py3k and payload or str(payload)
return serialization.decode(payload,
content_type=self.content_type,
content_encoding=self.content_encoding)

Expand Down Expand Up @@ -108,7 +111,7 @@ def exception_to_python(self, exc):
"""Convert serialized exception to Python exception."""
if self.serializer in EXCEPTION_ABLE_CODECS:
return get_pickled_exception(exc)
return create_exception_cls(exc["exc_type"].encode("utf-8"),
return create_exception_cls(from_utf8(exc["exc_type"]),
sys.modules[__name__])

def prepare_value(self, result):
Expand Down
18 changes: 9 additions & 9 deletions celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ def __init__(self, name=None, task=None, last_run_at=None,
def _default_now(self):
return datetime.utcnow()

def next(self, last_run_at=None):
def _next_instance(self, last_run_at=None):
"""Returns a new instance of the same class, but with
its date and count fields updated."""
return self.__class__(**dict(self,
last_run_at=last_run_at or datetime.utcnow(),
total_run_count=self.total_run_count + 1))
__next__ = next # for 2to3
__next__ = next = _next_instance # for 2to3

def update(self, other):
"""Update values from another entry.
Expand Down Expand Up @@ -367,8 +367,8 @@ def __init__(self, logger=None, max_interval=None, schedule_filename=None,
self.schedule_filename = schedule_filename or \
app.conf.CELERYBEAT_SCHEDULE_FILENAME

self._shutdown = threading.Event()
self._stopped = threading.Event()
self._is_shutdown = threading.Event()
self._is_stopped = threading.Event()

def start(self, embedded_process=False):
self.logger.info("Celerybeat: Starting...")
Expand All @@ -381,24 +381,24 @@ def start(self, embedded_process=False):
platforms.set_process_title("celerybeat")

try:
while not self._shutdown.isSet():
while not self._is_shutdown.isSet():
interval = self.scheduler.tick()
self.logger.debug("Celerybeat: Waking up %s.",
humanize_seconds(interval, prefix="in "))
time.sleep(interval)
except (KeyboardInterrupt, SystemExit):
self._shutdown.set()
self._is_shutdown.set()
finally:
self.sync()

def sync(self):
self.scheduler.close()
self._stopped.set()
self._is_stopped.set()

def stop(self, wait=False):
self.logger.info("Celerybeat: Shutting down...")
self._shutdown.set()
wait and self._stopped.wait() # block until shutdown done.
self._is_shutdown.set()
wait and self._is_stopped.wait() # block until shutdown done.

def get_scheduler(self, lazy=False):
filename = self.schedule_filename
Expand Down
3 changes: 2 additions & 1 deletion celery/bin/celeryd_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

from .. import __version__
from ..utils import term
from ..utils.encoding import from_utf8

SIGNAMES = set(sig for sig in dir(signal)
if sig.startswith("SIG") and "_" not in sig)
Expand Down Expand Up @@ -368,7 +369,7 @@ def splash(self):

def waitexec(self, argv, path=sys.executable):
args = " ".join([path] + list(argv))
argstr = shlex.split(args.encode("utf-8"))
argstr = shlex.split(from_utf8(args))
pipe = Popen(argstr, env=self.env)
self.info(" %s" % " ".join(argstr))
retcode = pipe.wait()
Expand Down
33 changes: 23 additions & 10 deletions celery/concurrency/processes/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,29 @@ def __init__(self, value=1, verbose=None):
_Semaphore.__init__(self, value, verbose)
self._initial_value = value

def release(self):
if self._Semaphore__value < self._initial_value:
_Semaphore.release(self)
if __debug__:
self._note("%s.release: success, value=%s (unchanged)" % (
self, self._Semaphore__value))

def clear(self):
while self._Semaphore__value < self._initial_value:
_Semaphore.release(self)
if sys.version_info >= (3, 0):

def release(self):
if self._value < self._initial_value:
_Semaphore.release(self)
if __debug__:
self._note("%s.release: success, value=%s (unchanged)" % (
self, self._value))

def clear(self):
while self._value < self._initial_value:
_Semaphore.release(self)
else:
def release(self):
if self._Semaphore__value < self._initial_value:
_Semaphore.release(self)
if __debug__:
self._note("%s.release: success, value=%s (unchanged)" % (
self, self._Semaphore__value))

def clear(self):
while self._Semaphore__value < self._initial_value:
_Semaphore.release(self)

#
# Exceptions
Expand Down
35 changes: 28 additions & 7 deletions celery/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from __future__ import absolute_import
from __future__ import with_statement

import sys
import time
import traceback

Expand Down Expand Up @@ -85,8 +86,15 @@ def __setitem__(self, key, value):
def __contains__(self, key):
return hasattr(self.obj, key)

def iteritems(self):
def _iterate_items(self):
return vars(self.obj).iteritems()
iteritems = _iterate_items

if sys.version_info >= (3, 0):
items = _iterate_items
else:
def items(self):
return list(self._iterate_items())


class ConfigurationView(AttributeDictMixin):
Expand Down Expand Up @@ -151,23 +159,26 @@ def _iter(self, op):
# changes takes precedence.
return chain(*[op(d) for d in reversed(self._order)])

def iterkeys(self):
def _iterate_keys(self):
return self._iter(lambda d: d.iterkeys())
iterkeys = _iterate_keys

def iteritems(self):
def _iterate_items(self):
return self._iter(lambda d: d.iteritems())
iteritems = _iterate_items

def itervalues(self):
def _iterate_values(self):
return self._iter(lambda d: d.itervalues())
itervalues = _iterate_values

def keys(self):
return list(self.iterkeys())
return list(self._iterate_keys())

def items(self):
return list(self.iteritems())
return list(self._iterate_items())

def values(self):
return list(self.itervalues())
return list(self._iterate_values())


class ExceptionInfo(object):
Expand Down Expand Up @@ -315,6 +326,16 @@ def __getitem__(self, key):
value = self[key] = self.data.pop(key)
return value

def keys(self):
# userdict.keys in py3k calls __getitem__
return self.data.keys()

def values(self):
return self.data.values()

def items(self):
return self.data.items()

def __setitem__(self, key, value):
# remove least recently used key.
with self.mutex:
Expand Down
5 changes: 3 additions & 2 deletions celery/execute/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ def _trace(self):
handler = self._trace_handlers[trace.status]
r = handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
self.handle_after_return(trace.status, trace.retval,
trace.exc_type, trace.tb, trace.strtb)
trace.exc_type, trace.tb, trace.strtb,
einfo=trace.exc_info)
return r

def handle_after_return(self, status, retval, type_, tb, strtb,
einfo=None):
if status in states.EXCEPTION_STATES:
einfo = ExceptionInfo((retval, type_, tb))
einfo = ExceptionInfo(einfo)
self.task.after_return(status, retval, self.task_id,
self.args, self.kwargs, einfo)

Expand Down
17 changes: 12 additions & 5 deletions celery/task/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import

import sys
import urllib2

from urllib import urlencode
Expand Down Expand Up @@ -36,11 +37,17 @@ def maybe_utf8(value):
return value


def utf8dict(tup):
"""With a dict's items() tuple return a new dict with any utf-8
keys/values encoded."""
return dict((key.encode("utf-8"), maybe_utf8(value))
for key, value in tup)
if sys.version_info >= (3, 0):
def utf8dict(tup):
if not isinstance(tup, dict):
return dict(tup)
return tup
else:
def utf8dict(tup):
"""With a dict's items() tuple return a new dict with any utf-8
keys/values encoded."""
return dict((key.encode("utf-8"), maybe_utf8(value))
for key, value in tup)


def extract_response(raw_response):
Expand Down
1 change: 1 addition & 0 deletions celery/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
os.environ["CELERY_LOADER"] = "default"
os.environ["EVENTLET_NOPATCH"] = "yes"
os.environ["GEVENT_NOPATCH"] = "yes"
os.environ["KOMBU_DISABLE_LIMIT_PROTECTION"] = "yes"

try:
WindowsError = WindowsError # noqa
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/test_app/test_app_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_setup_nolimit(self):
pools.set_limit(None)
try:
pool = self.app.amqp.producers[self.app.broker_connection()]
self.assertIsNone(pool.limit)
self.assertEqual(pool.limit, 0)
self.assertFalse(pool._resource.queue)

r1 = pool.acquire()
Expand Down
18 changes: 9 additions & 9 deletions celery/tests/test_app/test_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_due_tick_SchedulingError(self):
self.assertTrue(scheduler.logger.logged[0])
level, msg, args, kwargs = scheduler.logger.logged[0]
self.assertEqual(level, logging.ERROR)
self.assertIn("Couldn't apply scheduled task", args[0].message)
self.assertIn("Couldn't apply scheduled task", args[0].args[0])

def test_due_tick_RuntimeError(self):
scheduler = mSchedulerRuntimeError()
Expand Down Expand Up @@ -261,7 +261,7 @@ def tick(self):
if self.tick_raises_exit:
raise SystemExit()
if self.shutdown_service:
self.shutdown_service._shutdown.set()
self.shutdown_service._is_shutdown.set()
return 0.0

return beat.Service(scheduler_cls=PersistentScheduler), sh
Expand All @@ -278,12 +278,12 @@ def test_start(self):
s.sync()
self.assertTrue(sh.closed)
self.assertTrue(sh.synced)
self.assertTrue(s._stopped.isSet())
self.assertTrue(s._is_stopped.isSet())
s.sync()
s.stop(wait=False)
self.assertTrue(s._shutdown.isSet())
self.assertTrue(s._is_shutdown.isSet())
s.stop(wait=True)
self.assertTrue(s._shutdown.isSet())
self.assertTrue(s._is_shutdown.isSet())

p = s.scheduler._store
s.scheduler._store = None
Expand All @@ -294,25 +294,25 @@ def test_start(self):

def test_start_embedded_process(self):
s, sh = self.get_service()
s._shutdown.set()
s._is_shutdown.set()
s.start(embedded_process=True)

def test_start_thread(self):
s, sh = self.get_service()
s._shutdown.set()
s._is_shutdown.set()
s.start(embedded_process=False)

def test_start_tick_raises_exit_error(self):
s, sh = self.get_service()
s.scheduler.tick_raises_exit = True
s.start()
self.assertTrue(s._shutdown.isSet())
self.assertTrue(s._is_shutdown.isSet())

def test_start_manages_one_tick_before_shutdown(self):
s, sh = self.get_service()
s.scheduler.shutdown_service = s
s.start()
self.assertTrue(s._shutdown.isSet())
self.assertTrue(s._is_shutdown.isSet())


class test_EmbeddedService(unittest.TestCase):
Expand Down
6 changes: 4 additions & 2 deletions celery/tests/test_app/test_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ def test_conf_property(self):
self.assertEqual(self.loader.conf["foo"], "bar")

def test_import_default_modules(self):
self.assertEqual(sorted(self.loader.import_default_modules()),
sorted([os, sys, task]))
modnames = lambda l: [m.__name__ for m in l]
self.assertEqual(sorted(modnames(
self.loader.import_default_modules())),
sorted(modnames([os, sys, task])))

def test_import_from_cwd_custom_imp(self):

Expand Down
Loading

0 comments on commit cf6868e

Please sign in to comment.