Skip to content

Commit

Permalink
And here comes the code
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed Apr 24, 2009
1 parent dd5db41 commit 8dd7ac7
Show file tree
Hide file tree
Showing 18 changed files with 747 additions and 0 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ask Solem <[email protected]>
8 changes: 8 additions & 0 deletions Changelog
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
==============
Change history
==============

0.1.0 :date:`2009-04-24 11:28 A.M CET` :author:[email protected]
--------------------------------------------------------------

* Initial release
28 changes: 28 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Copyright (c) 2009, Ask Solem
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

Neither the name of Ask Solem nor the names of its contributors may be used
to endorse or promote products derived from this software without specific
prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

6 changes: 6 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include AUTHORS
include README
include MANIFEST.in
include LICENSE
include Changelog
recursive-include crunchy *
121 changes: 121 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
============================================
crunchy - Distributed Task Queue for Django.
============================================

:Authors:
Ask Solem ([email protected])
:Version: 0.1.0

Introduction
------------

``crunchy`` is a distributed task queue framework for Django.
More information will follow.

Installation
=============

You can install ``crunchy`` either via the Python Package Index (PyPI)
or from source.

To install using ``pip``,::

$ pip install crunchy

To install using ``easy_install``,::

$ easy_install crunchy

If you have downloaded a source tarball you can install it
by doing the following,::

$ python setup.py build
# python setup.py install # as root

Usage
=====

Have to write a cool tutorial, but here is some simple usage info.

*Note* You need to have a AMQP message broker running, like `RabbitMQ`_,
and you need to have the amqp server setup in your settings file, as described
in the `carrot distribution README`_.

*Note* If you're running ``SQLite`` as the database backend, ``crunchd`` will
only be able to process one message at a time, this because ``SQLite`` doesn't
allow concurrent writes.

.. _`RabbitMQ`: http://www.rabbitmq.com
.. _`carrot distribution README`: http://pypi.python.org/pypi/carrot/0.3.3


Defining tasks
--------------

>>> from crunchy.task import tasks
>>> from crunchy.log import setup_logger
>>> def do_something(some_arg, **kwargs):
... logger = setup_logger(**kwargs)
... logger.info("Did something: %s" % some_arg)
>>> task.register("do_something", some_arg=do_something)

*Note* Task functions only supports keyword arguments.

Tell the crunch daemon to run a task
-------------------------------------

>>> from crunchy.task import delay_task
>>> delay_task("do_something", "foo bar baz")


Running the crunch daemon
--------------------------

::

$ cd mydjangoproject
$ env DJANGO_SETTINGS_MODULE=settings crunchd
[....]
[2009-04-23 17:44:05,115: INFO/Process-1] Did something: foo bar baz
[2009-04-23 17:44:05,118: INFO/MainProcess] Waiting for queue.




Autodiscovery of tasks
-----------------------

``crunchy`` has an autodiscovery feature like the Django Admin, that
automatically loads any ``tasks.py`` module in the applications listed
in ``settings.INSTALLED_APPS``.

A good place to add this command could be in your ``urls.py``,
::

from crunchy.task import tasks
tasks.autodiscover()



Then you can add new tasks in your applications ``tasks.py`` module,
::

from crunchy.task import tasks
from crunchy.log import setup_logger
from clickcounter.models import ClickCount

def increment_click(for_url, **kwargs):
logger = setup_logger(**kwargs)
clicks_for_url, cr = ClickCount.objects.get_or_create(url=for_url)
clicks_for_url.clicks = clicks_for_url.clicks + 1
clicks_for_url.save()
logger.info("Incremented click count for %s (not at %d)" % (
for_url, clicks_for_url.clicks)

License
=======

This software is licensed under the ``New BSD License``. See the ``LICENSE``
file in the top distribution directory for the full license text.

.. # vim: syntax=rst expandtab tabstop=4 shiftwidth=4 shiftround
7 changes: 7 additions & 0 deletions crunchy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Distributed Task Queue for Django"""
VERSION = (0, 1, 0)
__version__ = ".".join(map(str, VERSION))
__author__ = "Ask Solem"
__contact__ = "[email protected]"
__homepage__ = "http://github.com/ask/crunchy/"
__docformat__ = "restructuredtext"
Empty file added crunchy/bin/__init__.py
Empty file.
83 changes: 83 additions & 0 deletions crunchy/bin/crunchd
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
import os
import sys
sys.path.append(os.getcwd())
from django.conf import settings
from crunchy.platform import PIDFile, daemonize, remove_pidfile
from crunchy.log import setup_logger
from crunchy.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
from crunchy.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
from crunchy.conf import QUEUE_WAKEUP_AFTER
from crunchy import discovery
from crunchy.worker import TaskDaemon
import traceback
import optparse
import atexit


def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE,
pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
import warnings
warnings.warn("The sqlite3 database engine doesn't support "
"concurrency. We'll be using a single process only.",
UserWarning)
concurrency = 1
if daemon:
sys.stderr.write("Launching crunchd in the background...\n")
pidfile_handler = PIDFile(pidfile)
pidfile_handler.check()
daemonize(pidfile=pidfile_handler)
atexit.register(remove_pidfile, pidfile)
else:
logfile = None # log to stderr when not running as daemon.

discovery.autodiscover()
crunchd = TaskDaemon(concurrency=concurrency,
loglevel=loglevel,
logfile=logfile,
queue_wakeup_after=queue_wakeup_after)
try:
crunchd.run()
except Exception, e:
raise
emergency_error(logfile, "crunchd raised exception %s: %s\n%s" % (
e.__class__, e, traceback.format_exc()))


def parse_options(arguments):
parser = optparse.OptionParser()
parser.add_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
action="store", dest="concurrency", type="int",
help="Number of child processes processing the queue.")
parser.add_option('-f', '--logfile', default=DAEMON_LOG_FILE,
action="store", dest="logfile",
help="Path to log file.")
parser.add_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
action="store", dest="loglevel",
help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL.")
parser.add_option('-p', '--pidfile', default=DAEMON_PID_FILE,
action="store", dest="pidfile",
help="Path to PID file.")
parser.add_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
action="store", dest="queue_wakeup_after",
help="If the queue is empty, this is the time *in seconds* the "
"daemon sleeps until it wakes up to check if there's any "
"new messages on the queue.")
parser.add_option('-d', '--daemon', default=False,
action="store_true", dest="daemon",
help="Run in background as a daemon.")
options, values = parser.parse_args(arguments)
if not isinstance(options.loglevel, int):
options.loglevel = LOG_LEVELS[options.loglevel.upper()]
return options

if __name__ == "__main__":
options = parse_options(sys.argv[1:])
main(concurrency=options.concurrency,
daemon=options.daemon,
logfile=options.logfile,
loglevel=options.loglevel,
pidfile=options.pidfile,
queue_wakeup_after=options.queue_wakeup_after)
51 changes: 51 additions & 0 deletions crunchy/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from django.conf import settings
import logging

# The number of processes to work simultaneously at processing the queue.
DEFAULT_DAEMON_CONCURRENCY = 10

# If the queue is empty, this is the time *in seconds* the daemon sleeps
# until it wakes up to check if there's any new messages on the queue.
DEFAULT_QUEUE_WAKEUP_AFTER = 0.3

# As long as the queue is empty, the daemon logs a "Queue is empty" message
# every ``EMPTY_MSG_EMIT_EVERY`` *seconds*.
DEFAULT_EMPTY_MSG_EMIT_EVERY = 5

DEFAULT_DAEMON_PID_FILE = "crunchd.pid"

# The format we log messages in.
DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'

# Default log level [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]
DEFAULT_DAEMON_LOG_LEVEL = "INFO"

# Default log file
DEFAULT_DAEMON_LOG_FILE = "refreshd.log"

# Table of loglevels to constants for use in settings.py.
LOG_LEVELS = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"WARN": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
"FATAL": logging.FATAL,
}

LOG_FORMAT = getattr(settings, "DJANGOFEEDS_DAEMON_LOG_FORMAT",
DEFAULT_LOG_FMT)
DAEMON_LOG_FILE = getattr(settings, "DJANGOFEEDS_LOG_FILE",
DEFAULT_DAEMON_LOG_FILE)
DAEMON_LOG_LEVEL = LOG_LEVELS[getattr(settings, "DJANGOFEEDS_DAEMON_LOG_LEVEL",
DEFAULT_DAEMON_LOG_LEVEL).upper()]

QUEUE_WAKEUP_AFTER = getattr(settings, "CRUNCHD_QUEUE_WAKEUP_AFTER",
DEFAULT_QUEUE_WAKEUP_AFTER)
EMPTY_MSG_EMIT_EVERY = getattr(settings, "CRUNCHD_EMPTY_MSG_EMIT_EVERY",
DEFAULT_EMPTY_MSG_EMIT_EVERY)
DAEMON_PID_FILE = getattr("settings", "CRUNCHD_PID_FILE",
DEFAULT_DAEMON_PID_FILE)
DAEMON_CONCURRENCY = getattr("settings", "CRUNCHD_CONCURRENCY",
DEFAULT_DAEMON_CONCURRENCY)
38 changes: 38 additions & 0 deletions crunchy/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import imp
from django.conf import settings
from django.core import exceptions


def autodiscover():
"""Include tasks for all applications in settings.INSTALLED_APPS."""
return filter(None, [tasks_for_app(app)
for app in settings.INSTALLED_APPS])


def tasks_for_app(app):
"""Given an application name, imports any tasks.py file for that app."""

def found_tasks_module_handler(app_path, app_basename):
return __import__("%s.tasks" % app)

return find_related_module(app, "tasks", found_tasks_module_handler)


def find_related_module(app, related_name, handler):
"""Given an application name and a module name, tries to find that
module in the application, and running handler' if it finds it.
"""

# See django.contrib.admin.autodiscover for an explanation of this code.
try:
app_basename = app.split('.')[-1]
app_path = __import__(app, {}, {}, app_basename).__path__
except AttributeError:
return None

try:
imp.find_module(related_name, app_path)
except ImportError:
return None

return handler(app_path, app_basename)
32 changes: 32 additions & 0 deletions crunchy/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import multiprocessing
import os
import time
import logging
from crunchy.conf import LOG_FORMAT, DAEMON_LOG_LEVEL


def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT):
"""Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
``stderr`` is used.
Returns logger object.
"""
logger = multiprocessing.get_logger()
if logfile:
log_file_handler = logging.FileHandler(logfile)
formatter = logging.Formatter(format)
log_file_handler.setFormatter(formatter)
logger.addHandler(log_file_handler)
else:
multiprocessing.log_to_stderr()
logger.setLevel(loglevel)
return logger


def emergency_error(logfile, message):
logfh = open(logfile, "a")
logfh.write("[%(asctime)s: FATAL/%(pid)d]: %(message)s\n" % {
"asctime": time.asctime(),
"pid": os.getpid(),
"message": message})
logfh.close()
Loading

0 comments on commit 8dd7ac7

Please sign in to comment.