Skip to content

Commit

Permalink
Introducing Task classes. (after() not implemented yet, but that's go…
Browse files Browse the repository at this point in the history
…ing to be really awesome)

The arguments to tasks.register is now changed:

Instead of

    task.register(task_name, task_func)

 it's now

    task.register(task_func, task_name)

or alternatively

    task.register(TaskClass)
  • Loading branch information
Ask Solem committed Apr 24, 2009
1 parent ae3eeed commit 462d470
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 9 deletions.
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
include AUTHORS
include README
include README.rst
include MANIFEST.in
include LICENSE
include Changelog
Expand Down
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ Defining tasks
>>> def do_something(some_arg, **kwargs):
... logger = setup_logger(**kwargs)
... logger.info("Did something: %s" % some_arg)
>>> task.register("do_something", some_arg=do_something)
>>> task.register(do_something, "do_something")

*Note* Task functions only supports keyword arguments.

Tell the crunch daemon to run a task
-------------------------------------

>>> from crunchy.task import delay_task
>>> delay_task("do_something", "foo bar baz")
>>> delay_task("do_something", some_arg="foo bar baz")


Running the crunch daemon
Expand Down Expand Up @@ -111,7 +111,7 @@ Then you can add new tasks in your applications ``tasks.py`` module,
clicks_for_url.save()
logger.info("Incremented click count for %s (not at %d)" % (
for_url, clicks_for_url.clicks)
tasks.register("increment_click", increment_click)
tasks.register(increment_click, "increment_click")

License
=======
Expand Down
2 changes: 1 addition & 1 deletion crunchy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Distributed Task Queue for Django"""
VERSION = (0, 1, 0)
VERSION = (0, 1, 1)
__version__ = ".".join(map(str, VERSION))
__author__ = "Ask Solem"
__contact__ = "[email protected]"
Expand Down
15 changes: 12 additions & 3 deletions crunchy/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@ def __init__(self):
def autodiscover(self):
discovery.autodiscover()

def register(self, task_name, task_func):
def register(self, task, task_name=None):
is_class = False
if hasattr(task, "run"):
is_class = True
task_name = task.name
if task_name in self.data:
raise self.AlreadyRegistered(
"Task with name %s is already registered." % task_name)

self.data[task_name] = task_func

if is_class:
self.data[task_name] = task() # instantiate Task class
else:
self.data[task_name] = task

def unregister(self, task_name):
if hasattr(task_name, "run"):
task_name = task_name.name
if task_name not in self.data:
raise self.NotRegistered(
"Task with name %s is not registered." % task_name)
Expand Down
53 changes: 52 additions & 1 deletion crunchy/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from carrot.connection import DjangoAMQPConnection
from crunchy.messaging import TaskPublisher, TaskConsumer
from crunchy.log import setup_logger
from crunchy.registry import tasks
from crunchy.messaging import TaskPublisher, TaskConsumer


def delay_task(task_name, **kwargs):
Expand All @@ -18,3 +19,53 @@ def discard_all():
discarded_count = consumer.discard_all()
consumer.close()
return discarded_count


class Task(object):
name = None

def __init__(self):
if not self.name:
raise NotImplementedError("Tasks must define a name attribute.")

def __call__(self, **kwargs):
return self.run(**kwargs)

def run(self, **kwargs):
raise NotImplementedError("Tasks must define a run method.")

def after(self, task_id):
"""This method is called when the task is sucessfully executed."""
pass

def get_logger(self, **kwargs):
"""Get a process-aware logger object."""
return setup_logger(**kwargs)

def get_publisher(self):
"""Get a crunchy task message publisher."""
return TaskPublisher(connection=DjangoAMQPConnection)

def get_consumer(self):
"""Get a crunchy task message consumer."""
return TaskConsumer(connection=DjangoAMQPConnection)

class TaskExecutedTask(Task):
name = "crunchy-task-executed"

def run(self, task_id, task_name, **kwargs):
logger = self.get_logger(**kwargs)
logger.info("Task %s[%s] executed successfully." % (task_id, task_name))
tasks.register(TaskExecutedTask)

class TestTask(Task):
name = "crunchy-test-task"

def run(self, some_arg, **kwargs):
logger = self.get_logger(**kwargs)
logger.info("TestTask got some_arg=%s" % some_arg)

def after(self, task_id):
logger = self.get_logger(**kwargs)
logger.info("TestTask with id %s was successfully executed." % task_id)
tasks.register(TestTask)

0 comments on commit 462d470

Please sign in to comment.