Skip to content

Commit

Permalink
Backports py3k support from 3.0-devel branch into 2.4
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Sep 16, 2011
1 parent 257aaa5 commit 2846d88
Show file tree
Hide file tree
Showing 39 changed files with 492 additions and 260 deletions.
7 changes: 5 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
from ..datastructures import LRUCache
from ..exceptions import TimeoutError, TaskRevokedError
from ..utils import timeutils
from ..utils.encoding import from_utf8
from ..utils.serialization import (get_pickled_exception,
get_pickleable_exception,
create_exception_cls)

EXCEPTION_ABLE_CODECS = frozenset(["pickle", "yaml"])
is_py3k = sys.version_info >= (3, 0)


def unpickle_backend(cls, args, kwargs):
Expand Down Expand Up @@ -51,7 +53,8 @@ def encode(self, data):
return payload

def decode(self, payload):
return serialization.decode(str(payload),
payload = is_py3k and payload or str(payload)
return serialization.decode(payload,
content_type=self.content_type,
content_encoding=self.content_encoding)

Expand Down Expand Up @@ -108,7 +111,7 @@ def exception_to_python(self, exc):
"""Convert serialized exception to Python exception."""
if self.serializer in EXCEPTION_ABLE_CODECS:
return get_pickled_exception(exc)
return create_exception_cls(exc["exc_type"].encode("utf-8"),
return create_exception_cls(from_utf8(exc["exc_type"]),
sys.modules[__name__])

def prepare_value(self, result):
Expand Down
18 changes: 9 additions & 9 deletions celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ def __init__(self, name=None, task=None, last_run_at=None,
def _default_now(self):
return datetime.now()

def next(self, last_run_at=None):
def _next_instance(self, last_run_at=None):
"""Returns a new instance of the same class, but with
its date and count fields updated."""
return self.__class__(**dict(self,
last_run_at=last_run_at or datetime.now(),
total_run_count=self.total_run_count + 1))
__next__ = next # for 2to3
__next__ = next = _next_instance # for 2to3

def update(self, other):
"""Update values from another entry.
Expand Down Expand Up @@ -372,8 +372,8 @@ def __init__(self, logger=None, max_interval=None, schedule_filename=None,
self.schedule_filename = schedule_filename or \
app.conf.CELERYBEAT_SCHEDULE_FILENAME

self._shutdown = threading.Event()
self._stopped = threading.Event()
self._is_shutdown = threading.Event()
self._is_stopped = threading.Event()
self.debug = SilenceRepeated(self.logger.debug,
10 if self.max_interval < 60 else 1)

Expand All @@ -388,24 +388,24 @@ def start(self, embedded_process=False):
platforms.set_process_title("celerybeat")

try:
while not self._shutdown.isSet():
while not self._is_shutdown.isSet():
interval = self.scheduler.tick()
self.debug("Celerybeat: Waking up %s." % (
humanize_seconds(interval, prefix="in ")))
time.sleep(interval)
except (KeyboardInterrupt, SystemExit):
self._shutdown.set()
self._is_shutdown.set()
finally:
self.sync()

def sync(self):
self.scheduler.close()
self._stopped.set()
self._is_stopped.set()

def stop(self, wait=False):
self.logger.info("Celerybeat: Shutting down...")
self._shutdown.set()
wait and self._stopped.wait() # block until shutdown done.
self._is_shutdown.set()
wait and self._is_stopped.wait() # block until shutdown done.

def get_scheduler(self, lazy=False):
filename = self.schedule_filename
Expand Down
3 changes: 2 additions & 1 deletion celery/bin/celeryd_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

from .. import __version__
from ..utils import term
from ..utils.encoding import from_utf8

SIGNAMES = set(sig for sig in dir(signal)
if sig.startswith("SIG") and "_" not in sig)
Expand Down Expand Up @@ -368,7 +369,7 @@ def splash(self):

def waitexec(self, argv, path=sys.executable):
args = " ".join([path] + list(argv))
argstr = shlex.split(args.encode("utf-8"))
argstr = shlex.split(from_utf8(args))
pipe = Popen(argstr, env=self.env)
self.info(" %s" % " ".join(argstr))
retcode = pipe.wait()
Expand Down
4 changes: 2 additions & 2 deletions celery/concurrency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def on_ready(self, callback, errback, ret_value):
else:
self.safe_apply_callback(callback, ret_value)

def on_worker_error(self, errback, exc):
errback(ExceptionInfo((exc.__class__, exc, None)))
def on_worker_error(self, errback, exc_info):
errback(exc_info)

def safe_apply_callback(self, fun, *args):
if fun:
Expand Down
52 changes: 38 additions & 14 deletions celery/concurrency/processes/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from multiprocessing import util
from multiprocessing.util import Finalize, debug

from celery.datastructures import ExceptionInfo
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from celery.exceptions import WorkerLostError

Expand Down Expand Up @@ -74,16 +75,30 @@ def __init__(self, value=1, verbose=None):
_Semaphore.__init__(self, value, verbose)
self._initial_value = value

def release(self):
if self._Semaphore__value < self._initial_value:
_Semaphore.release(self)
if __debug__:
self._note("%s.release: success, value=%s (unchanged)" % (
self, self._Semaphore__value))
if sys.version_info >= (3, 0):

def clear(self):
while self._Semaphore__value < self._initial_value:
_Semaphore.release(self)
def release(self):
if self._value < self._initial_value:
_Semaphore.release(self)
if __debug__:
self._note("%s.release: success, value=%s (unchanged)" % (
self, self._value))

def clear(self):
while self._value < self._initial_value:
_Semaphore.release(self)
else:

def release(self): # noqa
if self._Semaphore__value < self._initial_value:
_Semaphore.release(self)
if __debug__:
self._note("%s.release: success, value=%s (unchanged)" % (
self, self._Semaphore__value))

def clear(self): # noqa
while self._Semaphore__value < self._initial_value:
_Semaphore.release(self)

#
# Exceptions
Expand Down Expand Up @@ -166,8 +181,8 @@ def poll(timeout): # noqa
put((ACK, (job, i, time.time(), pid)))
try:
result = (True, func(*args, **kwds))
except Exception, e:
result = (False, e)
except Exception:
result = (False, ExceptionInfo(sys.exc_info()))
try:
put((READY, (job, i, result)))
except Exception, exc:
Expand Down Expand Up @@ -320,7 +335,12 @@ def _on_hard_timeout(job, i, hard_timeout):
return
debug('hard time limit exceeded for %i', i)
# Remove from cache and set return value to an exception
job._set(i, (False, TimeLimitExceeded(hard_timeout)))
exc_info = None
try:
raise TimeLimitExceeded(hard_timeout)
except TimeLimitExceeded:
exc_info = sys.exc_info()
job._set(i, (False, ExceptionInfo(exc_info)))

# Remove from _pool
process, _index = _process_by_pid(job._worker_pid)
Expand Down Expand Up @@ -571,8 +591,12 @@ def _join_exited_workers(self, shutdown=False, lost_worker_timeout=10.0):
if not job.ready() and job._worker_lost]:
now = now or time.time()
if now - job._worker_lost > lost_worker_timeout:
err = WorkerLostError("Worker exited prematurely.")
job._set(None, (False, err))
exc_info = None
try:
raise WorkerLostError("Worker exited prematurely.")
except WorkerLostError:
exc_info = sys.exc_info()
job._set(None, (False, exc_info))

if shutdown and not len(self._pool):
raise WorkersJoined()
Expand Down
81 changes: 70 additions & 11 deletions celery/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from __future__ import absolute_import
from __future__ import with_statement

import sys
import time
import traceback

Expand Down Expand Up @@ -81,8 +82,16 @@ def __setitem__(self, key, value):
def __contains__(self, key):
return hasattr(self.obj, key)

def iteritems(self):
def _iterate_items(self):
return vars(self.obj).iteritems()
iteritems = _iterate_items

if sys.version_info >= (3, 0):
items = _iterate_items
else:

def items(self):
return list(self._iterate_items())


class ConfigurationView(AttributeDictMixin):
Expand Down Expand Up @@ -147,23 +156,53 @@ def _iter(self, op):
# changes takes precedence.
return chain(*[op(d) for d in reversed(self._order)])

def iterkeys(self):
def _iterate_keys(self):
return self._iter(lambda d: d.iterkeys())
iterkeys = _iterate_keys

def iteritems(self):
def _iterate_items(self):
return self._iter(lambda d: d.iteritems())
iteritems = _iterate_items

def itervalues(self):
def _iterate_values(self):
return self._iter(lambda d: d.itervalues())
itervalues = _iterate_values

def keys(self):
return list(self.iterkeys())
return list(self._iterate_keys())

def items(self):
return list(self.iteritems())
return list(self._iterate_items())

def values(self):
return list(self.itervalues())
return list(self._iterate_values())


class _Code(object):

def __init__(self, code):
self.co_filename = code.co_filename
self.co_name = code.co_name


class _Frame(object):
Code = _Code

def __init__(self, frame):
self.f_globals = {"__file__": frame.f_globals["__file__"]}
self.f_code = self.Code(frame.f_code)


class _Traceback(object):
Frame = _Frame

def __init__(self, tb):
self.tb_frame = self.Frame(tb.tb_frame)
self.tb_lineno = tb.tb_lineno
if tb.tb_next is None:
self.tb_next = None
else:
self.tb_next = _Traceback(tb.tb_next)


class ExceptionInfo(object):
Expand All @@ -174,15 +213,21 @@ class ExceptionInfo(object):
"""

#: The original exception.
#: Exception type.
type = None

#: Exception instance.
exception = None

#: A traceback form the point when :attr:`exception` was raised.
#: Pickleable traceback instance for use with :mod:`traceback`
tb = None

#: String representation of the traceback.
traceback = None

def __init__(self, exc_info):
_, exception, _ = exc_info
self.exception = exception
self.type, self.exception, tb = exc_info
self.tb = _Traceback(tb)
self.traceback = ''.join(traceback.format_exception(*exc_info))

def __str__(self):
Expand All @@ -191,6 +236,10 @@ def __str__(self):
def __repr__(self):
return "<ExceptionInfo: %r>" % (self.exception, )

@property
def exc_info(self):
return self.type, self.exception, self.tb


def consume_queue(queue):
"""Iterator yielding all immediately available items in a
Expand Down Expand Up @@ -311,6 +360,16 @@ def __getitem__(self, key):
value = self[key] = self.data.pop(key)
return value

def keys(self):
# userdict.keys in py3k calls __getitem__
return self.data.keys()

def values(self):
return self.data.values()

def items(self):
return self.data.items()

def __setitem__(self, key, value):
# remove least recently used key.
with self.mutex:
Expand Down
5 changes: 3 additions & 2 deletions celery/execute/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ def _trace(self):
handler = self._trace_handlers[trace.status]
r = handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
self.handle_after_return(trace.status, trace.retval,
trace.exc_type, trace.tb, trace.strtb)
trace.exc_type, trace.tb, trace.strtb,
einfo=trace.exc_info)
return r

def handle_after_return(self, status, retval, type_, tb, strtb,
einfo=None):
if status in states.EXCEPTION_STATES:
einfo = ExceptionInfo((retval, type_, tb))
einfo = ExceptionInfo(einfo)
self.task.after_return(status, retval, self.task_id,
self.args, self.kwargs, einfo)

Expand Down
Loading

0 comments on commit 2846d88

Please sign in to comment.