Skip to content

Commit

Permalink
89% coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed Dec 23, 2009
1 parent 8377105 commit 87960ce
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 25 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ readme: clean_readme
bump:
contrib/bump -c celery

coverage:
cover:
(cd testproj; python manage.py test --coverage)

quickcoverage:
coverage: cover

quickcover:
(cd testproj; env QUICKTEST=1 SKIP_RLIMITS=1 python manage.py test --coverage)

test:
Expand Down
4 changes: 0 additions & 4 deletions celery/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ class Task(object):

MaxRetriesExceededError = MaxRetriesExceededError

def __init__(self):
if not self.__class__.name:
self.__class__.name = get_full_cls_name(self.__class__)

def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)

Expand Down
109 changes: 109 additions & 0 deletions celery/tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from celery.backends import default_backend
from celery.decorators import task as task_dec
from celery.worker.listener import parse_iso8601
from celery.exceptions import RetryTaskError

def return_True(*args, **kwargs):
# Task run functions can't be closures/lambdas, as they're pickled.
Expand All @@ -21,6 +22,16 @@ def raise_exception(self, **kwargs):
raise Exception("%s error" % self.__class__)


class MockApplyTask(task.Task):

def run(self, x, y):
return x * y

@classmethod
def apply_async(self, *args, **kwargs):
pass


class IncrementCounterTask(task.Task):
name = "c.unittest.increment_counter_task"
count = 0
Expand Down Expand Up @@ -53,6 +64,27 @@ def run(self, arg1, arg2, kwarg=1, **kwargs):
return self.retry(args=[arg1, arg2], kwargs=kwargs, countdown=0)


class RetryTaskMockApply(task.Task):
max_retries = 3
iterations = 0
applied = 0

def run(self, arg1, arg2, kwarg=1, **kwargs):
self.__class__.iterations += 1

retries = kwargs["task_retries"]
if retries >= 3:
return arg1
else:
kwargs.update({"kwarg": kwarg})
return self.retry(args=[arg1, arg2], kwargs=kwargs, countdown=0)


@classmethod
def apply_async(self, *args, **kwargs):
self.applied = 1


class MyCustomException(Exception):
"""Random custom exception."""

Expand Down Expand Up @@ -85,6 +117,23 @@ def test_retry(self):
self.assertEquals(result.get(), 0xFF)
self.assertEquals(RetryTask.iterations, 4)

def test_retry_not_eager(self):
exc = Exception("baz")
try:
RetryTaskMockApply.retry(args=[4, 4], kwargs={},
exc=exc, throw=False)
self.assertTrue(RetryTaskMockApply.applied)
finally:
RetryTaskMockApply.applied = 0

try:
self.assertRaises(RetryTaskError, RetryTaskMockApply.retry,
args=[4, 4], kwargs={}, exc=exc, throw=True)
self.assertTrue(RetryTaskMockApply.applied)
finally:
RetryTaskMockApply.applied = 0


def test_retry_with_kwargs(self):
RetryTaskCustomExc.max_retries = 3
RetryTaskCustomExc.iterations = 0
Expand Down Expand Up @@ -116,6 +165,12 @@ def test_max_retries_exceeded(self):
self.assertEquals(RetryTask.iterations, 2)


class MockPublisher(object):

def __init__(self, *args, **kwargs):
self.kwargs = kwargs


class TestCeleryTasks(unittest.TestCase):

def createTaskCls(self, cls_name, task_name=None):
Expand Down Expand Up @@ -232,6 +287,18 @@ def test_regular_task(self):
publisher = t1.get_publisher()
self.assertTrue(isinstance(publisher, messaging.TaskPublisher))

def test_get_publisher(self):
from celery.task import base
old_pub = base.TaskPublisher
base.TaskPublisher = MockPublisher
try:
p = IncrementCounterTask.get_publisher(exchange="foo",
connection="bar")
self.assertEquals(p.kwargs["exchange"], "foo")
finally:
base.TaskPublisher = old_pub


def test_get_logger(self):
T1 = self.createTaskCls("T1", "c.unittest.t.t1")
t1 = T1()
Expand Down Expand Up @@ -308,3 +375,45 @@ def test_apply(self):
self.assertFalse(f.successful())
self.assertTrue(f.traceback)
self.assertRaises(KeyError, f.get)


class MyPeriodic(task.PeriodicTask):
run_every = timedelta(hours=1)



class TestPeriodicTask(unittest.TestCase):

def test_must_have_run_every(self):
self.assertRaises(NotImplementedError, type, "Foo",
(task.PeriodicTask, ), {"__module__": __name__})

def test_remaining_estimate(self):
self.assertTrue(isinstance(
MyPeriodic().remaining_estimate(datetime.now()),
timedelta))

def test_timedelta_seconds_returns_0_on_negative_time(self):
delta = timedelta(days=-2)
self.assertEquals(MyPeriodic().timedelta_seconds(delta), 0)

def test_timedelta_seconds(self):
deltamap = ((timedelta(seconds=1), 1),
(timedelta(seconds=27), 27),
(timedelta(minutes=3), 3 * 60),
(timedelta(hours=4), 4 * 60 * 60),
(timedelta(days=3), 3 * 86400))
for delta, seconds in deltamap:
self.assertEquals(MyPeriodic().timedelta_seconds(delta), seconds)

def test_is_due_not_due(self):
due, remaining = MyPeriodic().is_due(datetime.now())
self.assertFalse(due)
self.assertTrue(remaining > 60)

def test_is_due(self):
p = MyPeriodic()
due, remaining = p.is_due(datetime.now() - p.run_every)
self.assertTrue(due)
self.assertEquals(remaining, p.timedelta_seconds(p.run_every))

98 changes: 94 additions & 4 deletions celery/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,112 @@
import sys
import socket
import unittest
from celery.utils import chunks

from billiard.utils.functional import wraps

from celery import utils


class TestChunks(unittest.TestCase):

def test_chunks(self):

# n == 2
x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
self.assertEquals(list(x),
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]])

# n == 3
x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
self.assertEquals(list(x),
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]])

# n == 2 (exact)
x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), 2)
x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), 2)
self.assertEquals(list(x),
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]])


class TestGenUniqueId(unittest.TestCase):

def test_gen_unique_id_without_ctypes(self):
from celery.tests.utils import mask_modules
old_utils = sys.modules.pop("celery.utils")
try:
with mask_modules("ctypes"):
from celery.utils import ctypes, gen_unique_id
self.assertTrue(ctypes is None)
uuid = gen_unique_id()
self.assertTrue(uuid)
self.assertTrue(isinstance(uuid, basestring))
finally:
sys.modules["celery.utils"] = old_utils


class TestDivUtils(unittest.TestCase):

def test_repeatlast(self):
items = range(6)
it = utils.repeatlast(items)
for i in items:
self.assertEquals(it.next(), i)
for j in items:
self.assertEquals(it.next(), i)


def sleepdeprived(fun):

@wraps(fun)
def _sleepdeprived(*args, **kwargs):
import time
old_sleep = time.sleep
time.sleep = utils.noop
try:
return fun(*args, **kwargs)
finally:
time.sleep = old_sleep

return _sleepdeprived


class TestRetryOverTime(unittest.TestCase):

def test_returns_retval_on_success(self):

def _fun(x, y):
return x * y

ret = utils.retry_over_time(_fun, (socket.error, ), args=[16, 16],
max_retries=3)

self.assertEquals(ret, 256)

@sleepdeprived
def test_raises_on_unlisted_exception(self):

def _fun(x, y):
raise KeyError("bar")

self.assertRaises(KeyError, utils.retry_over_time, _fun,
(socket.error, ), args=[32, 32], max_retries=3)


@sleepdeprived
def test_retries_on_failure(self):

iterations = [0]

def _fun(x, y):
iterations[0] += 1
if iterations[0] == 3:
return x * y
raise socket.error("foozbaz")

ret = utils.retry_over_time(_fun, (socket.error, ), args=[32, 32],
max_retries=None)

self.assertEquals(iterations[0], 3)
self.assertEquals(ret, 1024)

self.assertRaises(socket.error, utils.retry_over_time,
_fun, (socket.error, ), args=[32, 32], max_retries=1)
8 changes: 2 additions & 6 deletions celery/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
import ctypes
except ImportError:
ctypes = None
from uuid import UUID, uuid4
try:
from uuid import _uuid_generate_random
except ImportError:
_uuid_generate_random = None
from uuid import UUID, uuid4, _uuid_generate_random
from inspect import getargspec
from itertools import repeat

Expand Down Expand Up @@ -87,7 +83,7 @@ def repeatlast(it):
yield the last value infinitely."""
for item in it:
yield item
for item in repeat(item):
for item in repeat(item): # pragma: no cover
yield item


Expand Down
16 changes: 9 additions & 7 deletions celery/worker/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,19 @@ def close_connection(self):
return
self._state = CLOSE

self.logger.debug("Heart: Going into cardiac arrest...")
self.heart = self.heart and self.heart.stop()
if self.heart:
self.logger.debug("Heart: Going into cardiac arrest...")
self.heart = self.heart.stop()

self.logger.debug("TaskConsumer: Shutting down...")
self.task_consumer = self.task_consumer and self.task_consumer.close()

self.logger.debug("EventDispatcher: Shutting down...")
self.event_dispatcher = self.event_dispatcher and \
self.event_dispatcher.close()
self.logger.debug(
"CarrotListener: Closing connection to broker...")
if self.event_dispatcher:
self.logger.debug("EventDispatcher: Shutting down...")
self.event_dispatcher = self.event_dispatcher.close()

self.logger.debug("CarrotListener: "
"Closing connection to broker...")
self.connection = self.connection and self.connection.close()

def reset_connection(self):
Expand Down
6 changes: 4 additions & 2 deletions examples/pythonproject/demoapp/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
BROKER_HOST = "localhost"
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"
CELERY_BACKEND = "amqp"
BROKER_VHOST = "celery"
CELERY_DEFAULT_EXCHANGE = "celery"
CARROT_BACKEND = "ghettoq.taproot.Redis"
CELERY_BACKEND = "database"
CELERY_IMPORTS = ("tasks", )

0 comments on commit 87960ce

Please sign in to comment.