Skip to content

Commit

Permalink
99% coverage (excluding celery.concurrency.asynpool and experimental …
Browse files Browse the repository at this point in the history
…backends)
  • Loading branch information
ask committed Nov 21, 2015
1 parent 3812290 commit a28d300
Show file tree
Hide file tree
Showing 49 changed files with 1,668 additions and 191 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ omit =
*celery/backends/couchdb.py
*celery/backends/couchbase.py
*celery/backends/cassandra.py
*celery/backends/riak.py
*celery/concurrency/asynpool.py
*celery/utils/debug.py
11 changes: 6 additions & 5 deletions celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,19 @@ def on_init_blueprint(self):
trace.setup_worker_optimizations(self.app, self.hostname)

def on_start(self):
app = self.app
if not self._custom_logging and self.redirect_stdouts:
self.app.log.redirect_stdouts(self.redirect_stdouts_level)
app.log.redirect_stdouts(self.redirect_stdouts_level)

WorkController.on_start(self)

# this signal can be used to e.g. change queues after
# the -Q option has been applied.
signals.celeryd_after_setup.send(
sender=self.hostname, instance=self, conf=self.app.conf,
sender=self.hostname, instance=self, conf=app.conf,
)

if not self.app.conf.value_set_for('accept_content'):
if not app.conf.value_set_for('accept_content'): # pragma: no cover
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

if self.purge:
Expand Down Expand Up @@ -187,7 +188,7 @@ def setup_logging(self, colorize=None):

def purge_messages(self):
count = self.app.control.purge()
if count:
if count: # pragma: no cover
print('purge: Erased {0} {1} from the queue.\n'.format(
count, pluralize(count, 'message')))

Expand All @@ -209,7 +210,7 @@ def startup_info(self):
appr = '{0}:{1:#x}'.format(app.main or '__main__', id(app))
if not isinstance(app.loader, AppLoader):
loader = qualname(app.loader)
if loader.startswith('celery.loaders'):
if loader.startswith('celery.loaders'): # pragma: no cover
loader = loader[14:]
appr += ' ({0})'.format(loader)
if self.autoscale:
Expand Down
4 changes: 2 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class KeyValueStoreBackend(BaseBackend):
implements_incr = False

def __init__(self, *args, **kwargs):
if hasattr(self.key_t, '__func__'):
if hasattr(self.key_t, '__func__'): # pragma: no cover
self.key_t = self.key_t.__func__ # remove binding
self._encode_prefixes()
super(KeyValueStoreBackend, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -583,7 +583,7 @@ def on_chord_part_return(self, request, state, result, **kwargs):
)
val = self.incr(key)
size = len(deps)
if val > size:
if val > size: # pragma: no cover
logger.warning('Chord counter incremented too many times for %r',
gid)
elif val == size:
Expand Down
7 changes: 5 additions & 2 deletions celery/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
You need to install the redis library in order to use \
the Redis result store backend."""

E_LOST = """\
Connection to Redis lost: Retry (%s/%s) %s.\
"""

logger = get_logger(__name__)
error = logger.error

Expand Down Expand Up @@ -137,8 +141,7 @@ def ensure(self, fun, args, **policy):

def on_connection_error(self, max_retries, exc, intervals, retries):
tts = next(intervals)
error('Connection to Redis lost: Retry (%s/%s) %s.',
retries, max_retries or 'Inf',
error(E_LOST, retries, max_retries or 'Inf',
humanize_seconds(tts, 'in '))
return tts

Expand Down
2 changes: 1 addition & 1 deletion celery/bin/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@

try:
input = raw_input
except NameError:
except NameError: # pragma: no cover
pass

# always enable DeprecationWarnings, so our users can see them.
Expand Down
4 changes: 2 additions & 2 deletions celery/bin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,13 +740,13 @@ def _relocate_args_from_start(self, argv, index=0):
# is (maybe) a value for this option
rest.extend([value, nxt])
index += 1
except IndexError:
except IndexError: # pragma: no cover
rest.append(value)
break
else:
break
index += 1
if argv[index:]:
if argv[index:]: # pragma: no cover
# if there are more arguments left then divide and swap
# we assume the first argument in argv[i:] is the command
# name.
Expand Down
5 changes: 3 additions & 2 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ def set_parent_id(self, parent_id):
def apply_async(self, args=(), kwargs={}, route_name=None, **options):
try:
_apply = self._apply_async
except IndexError: # no tasks for chain, etc to find type
except IndexError: # pragma: no cover
# no tasks for chain, etc to find type
return
# For callbacks: extra args are prepended to the stored args.
if args or kwargs or options:
Expand Down Expand Up @@ -337,7 +338,7 @@ def election(self):
def __repr__(self):
return self.reprcall()

if JSON_NEEDS_UNICODE_KEYS:
if JSON_NEEDS_UNICODE_KEYS: # pragma: no cover
def items(self):
for k, v in dict.items(self):
yield k.decode() if isinstance(k, bytes) else k, v
Expand Down
4 changes: 2 additions & 2 deletions celery/events/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def event(type_, timestamp=None,
if drift > max_drift:
_warn_drift(self.hostname, drift,
local_received, timestamp)
if local_received:
if local_received: # pragma: no cover
hearts = len(heartbeats)
if hearts > hbmax - 1:
hb_pop(0)
Expand Down Expand Up @@ -218,7 +218,7 @@ class Task(object):
'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
'clock', 'client', 'root_id', 'parent_id',
)
if not PYPY:
if not PYPY: # pragma: no cover
__slots__ = ('__dict__', '__weakref__')

#: How to merge out of order events.
Expand Down
40 changes: 24 additions & 16 deletions celery/fixups/django.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
from StringIO import StringIO
else:
else: # pragma: no cover
from io import StringIO


Expand Down Expand Up @@ -66,12 +66,16 @@ def install(self):
signals.worker_init.connect(self.on_worker_init)
return self

@cached_property
@property
def worker_fixup(self):
if self._worker_fixup is None:
self._worker_fixup = DjangoWorkerFixup(self.app)
return self._worker_fixup

@worker_fixup.setter
def worker_fixup(self, value):
self._worker_fixup = value

def on_import_modules(self, **kwargs):
# call django.setup() before task modules are imported
self.worker_fixup.validate_models()
Expand Down Expand Up @@ -160,36 +164,40 @@ def __init__(self, app):
_oracle_database_errors
)

def validate_models(self):
def django_setup(self):
import django
try:
django_setup = django.setup
except AttributeError:
except AttributeError: # pragma: no cover
pass
else:
django_setup()
s = StringIO()

def validate_models(self):
self.django_setup()
try:
from django.core.management.validation import get_validation_errors
except ImportError:
from django.core.management.base import BaseCommand
cmd = BaseCommand()
try:
# since django 1.5
from django.core.management.base import OutputWrapper
cmd.stdout = OutputWrapper(sys.stdout)
cmd.stderr = OutputWrapper(sys.stderr)
except ImportError:
cmd.stdout, cmd.stderr = sys.stdout, sys.stderr

cmd.check()
self._validate_models_django17()
else:
s = StringIO()
num_errors = get_validation_errors(s, None)
if num_errors:
raise RuntimeError(
'One or more Django models did not validate:\n{0}'.format(
s.getvalue()))

def _validate_models_django17(self):
from django.core.management import base
print(base)
cmd = base.BaseCommand()
try:
cmd.stdout = base.OutputWrapper(sys.stdout)
cmd.stderr = base.OutputWrapper(sys.stderr)
except ImportError: # before django 1.5
cmd.stdout, cmd.stderr = sys.stdout, sys.stderr
cmd.check()

def install(self):
signals.beat_embedded_init.connect(self.close_database)
signals.worker_ready.connect(self.on_worker_ready)
Expand Down
9 changes: 5 additions & 4 deletions celery/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ def _get_current_object(self):
loc = object.__getattribute__(self, '_Proxy__local')
if not hasattr(loc, '__release_local__'):
return loc(*self.__args, **self.__kwargs)
try:
try: # pragma: no cover
# not sure what this is about
return getattr(loc, self.__name__)
except AttributeError:
except AttributeError: # pragma: no cover
raise RuntimeError('no object bound to {0.__name__}'.format(self))

@property
Expand Down Expand Up @@ -286,7 +287,7 @@ def __exit__(self, *a, **kw):
def __reduce__(self):
return self._get_current_object().__reduce__()

if not PY3:
if not PY3: # pragma: no cover
def __cmp__(self, other):
return cmp(self._get_current_object(), other) # noqa

Expand Down Expand Up @@ -361,7 +362,7 @@ def __evaluate__(self,
finally:
try:
object.__delattr__(self, '__pending__')
except AttributeError:
except AttributeError: # pragma: no cover
pass
return thing

Expand Down
12 changes: 7 additions & 5 deletions celery/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@

from collections import namedtuple

try:
from billiard.process import current_process
except ImportError:
current_process = None
from billiard.compat import get_fdmax, close_open_fds
# fileno used to be in this module
from kombu.utils import maybe_fileno
Expand All @@ -34,6 +30,11 @@
from .local import try_import
from .five import items, reraise, string_t

try:
from billiard.process import current_process
except ImportError: # pragma: no cover
current_process = None

_setproctitle = try_import('setproctitle')
resource = try_import('resource')
pwd = try_import('pwd')
Expand Down Expand Up @@ -340,7 +341,8 @@ def close(self, *args):
def _detach(self):
if os.fork() == 0: # first child
os.setsid() # create new session
if os.fork() > 0: # second child
if os.fork() > 0: # pragma: no cover
# second child
os._exit(0)
else:
os._exit(0)
Expand Down
1 change: 0 additions & 1 deletion celery/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from __future__ import absolute_import

import time
import warnings

from collections import OrderedDict, deque
from contextlib import contextmanager
Expand Down
37 changes: 37 additions & 0 deletions celery/tests/app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,43 @@ def test_pending_configuration__compat_settings(self):
self.assertEqual(app.conf.broker_url, 'foo://bar')
self.assertEqual(app.conf.result_backend, 'foo')

def test_pending_configuration__compat_settings_mixing(self):
with self.Celery(broker='foo://bar', backend='foo') as app:
app.conf.update(
CELERY_ALWAYS_EAGER=4,
CELERY_DEFAULT_DELIVERY_MODE=63,
CELERYD_AGENT='foo:Barz',
worker_consumer='foo:Fooz',
)
with self.assertRaises(ImproperlyConfigured):
self.assertEqual(app.conf.task_always_eager, 4)

def test_pending_configuration__compat_settings_mixing_new(self):
with self.Celery(broker='foo://bar', backend='foo') as app:
app.conf.update(
task_always_eager=4,
task_default_delivery_mode=63,
worker_agent='foo:Barz',
CELERYD_CONSUMER='foo:Fooz',
CELERYD_AUTOSCALER='foo:Xuzzy',
)
with self.assertRaises(ImproperlyConfigured):
self.assertEqual(app.conf.worker_consumer, 'foo:Fooz')

def test_pending_configuration__compat_settings_mixing_alt(self):
with self.Celery(broker='foo://bar', backend='foo') as app:
app.conf.update(
task_always_eager=4,
task_default_delivery_mode=63,
worker_agent='foo:Barz',
CELERYD_CONSUMER='foo:Fooz',
worker_consumer='foo:Fooz',
CELERYD_AUTOSCALER='foo:Xuzzy',
worker_autoscaler='foo:Xuzzy'
)
self.assertEqual(app.conf.task_always_eager, 4)
self.assertEqual(app.conf.worker_autoscaler, 'foo:Xuzzy')

def test_pending_configuration__setdefault(self):
with self.Celery(broker='foo://bar') as app:
app.conf.setdefault('worker_agent', 'foo:Bar')
Expand Down
12 changes: 12 additions & 0 deletions celery/tests/app/test_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ def test_repr(self):
entry = self.create_entry()
self.assertIn('<Entry:', repr(entry))

def test_reduce(self):
entry = self.create_entry(schedule=timedelta(seconds=10))
fun, args = entry.__reduce__()
res = fun(*args)
self.assertEqual(res.schedule, entry.schedule)

def test_lt(self):
e1 = self.create_entry(schedule=timedelta(seconds=10))
e2 = self.create_entry(schedule=timedelta(seconds=2))
self.assertLess(e2, e1)
self.assertTrue(e1 < object())

def test_update(self):
entry = self.create_entry()
self.assertEqual(entry.schedule, timedelta(seconds=10))
Expand Down
Loading

0 comments on commit a28d300

Please sign in to comment.