Skip to content

Commit

Permalink
Merge branch '3.0'
Browse files Browse the repository at this point in the history
Conflicts:
	README.rst
	celery/__init__.py
	celery/app/utils.py
	celery/worker/__init__.py
	docs/includes/introduction.txt
	requirements/default.txt
	setup.cfg
  • Loading branch information
ask committed Aug 17, 2012
2 parents c178568 + fb4dae1 commit 3b17a2e
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 36 deletions.
27 changes: 23 additions & 4 deletions Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,32 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.

3.0.6
=====
:release-date: 2012-09-XX XX:XX X.M BST
:release-date: 2012-09-17 11:00 P.M BST

- Now depends on kombu 2.4.0

- Now depends on billiard 2.7.3.12

- Redis: Celery now tries to restore messages whenever there are no messages
in the queue.

- Crontab schedules now properly respects :setting:`CELERY_TIMEZONE` setting.

It's important to note that crontab schedules uses UTC time by default
unless this setting is set.

Issue #904 and django-celery #150.

- ``billiard.enable_forking`` is now only set by the processes pool.

- The transport is now properly shown by :program:`celery report`
(Issue #913).

- The `--app` argument now works if the last part is a module name
(Issue #921).

- Fixed problem with unpickleable exceptions (billiard #12).

- Adds ``task_name`` attribute to ``EagerResult`` which is always
:const:`None` (Issue #907).

Expand All @@ -54,9 +76,6 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
- The argument to :class:`~celery.exceptions.TaskRevokedError` is now one
of the reasons ``revoked``, ``expired`` or ``terminated``.

- Redis: Celery now tries to restore messages whenever there are no messages
in the queue.

- Old Task class does no longer use classmethods for push_request and
pop_request (Issue #912).

Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Celery is...

celery = Celery('hello', broker='amqp://guest@localhost//')

@celery.task()
@celery.task
def hello():
return 'hello world'

Expand Down
15 changes: 11 additions & 4 deletions celery/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def BROKER_HOST(self):
return (os.environ.get('CELERY_BROKER_URL') or
self.first('BROKER_URL', 'BROKER_HOST'))

@property
def CELERY_TIMEZONE(self):
# this way we also support django's time zone.
return self.first('CELERY_TIMEZONE', 'TIME_ZONE')

def without_defaults(self):
"""Returns the current configuration, but without defaults."""
# the last stash is the default settings, so just skip that
Expand Down Expand Up @@ -131,10 +136,12 @@ def bugreport(app):
import kombu

try:
trans = app.connection().transport
driver_v = '{0}:{1}'.format(trans.driver_name, trans.driver_version())
conn = app.connection()
driver_v = '{0}:{1}'.format(conn.transport.driver_name,
conn.transport.driver_version())
transport = conn.transport_cls
except Exception:
driver_v = ''
transport = driver_v = ''

return BUGREPORT_INFO.format(
system=_platform.system(),
Expand All @@ -145,7 +152,7 @@ def bugreport(app):
billiard_v=billiard.__version__,
py_v=_platform.python_version(),
driver_v=driver_v,
transport=app.conf.BROKER_TRANSPORT or 'amqp',
transport=transport,
results=app.conf.CELERY_RESULT_BACKEND or 'disabled',
human_settings=app.conf.humanize(),
loader=qualname(app.loader.__class__),
Expand Down
4 changes: 3 additions & 1 deletion celery/concurrency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ class BasePool(object):
#: only used by multiprocessing pool
uses_semaphore = False

def __init__(self, limit=None, putlocks=True, **options):
def __init__(self, limit=None, putlocks=True, forking_enable=True,
**options):
self.limit = limit
self.putlocks = putlocks
self.options = options
self.forking_enable = forking_enable
self._does_debug = logger.isEnabledFor(logging.DEBUG)

def on_start(self):
Expand Down
5 changes: 4 additions & 1 deletion celery/concurrency/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

import os

from billiard import forking_enable
from billiard.pool import Pool, RUN, CLOSE

from celery import platforms
from celery import signals
from celery._state import set_default_app
from celery.concurrency.base import BasePool
from celery.task import trace
from billiard.pool import Pool, RUN, CLOSE

#: List of signals to reset when a child process starts.
WORKER_SIGRESET = frozenset(['SIGTERM',
Expand Down Expand Up @@ -69,6 +71,7 @@ def on_start(self):
Will pre-fork all workers so they're ready to accept tasks.
"""
forking_enable(self.forking_enable)
P = self._pool = self.Pool(processes=self.limit,
initializer=process_initializer,
**self.options)
Expand Down
33 changes: 19 additions & 14 deletions celery/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

from . import current_app
from .utils import is_iterable
from .utils.timeutils import (timedelta_seconds, weekday, maybe_timedelta,
remaining, humanize_seconds, is_naive, to_utc,
timezone)
from .utils.timeutils import (
timedelta_seconds, weekday, maybe_timedelta, remaining,
humanize_seconds, timezone, maybe_make_aware
)
from .datastructures import AttributeDict

CRON_PATTERN_INVALID = """\
Expand Down Expand Up @@ -48,12 +49,8 @@ def now(self):
return (self.nowfun or current_app.now)()

def remaining_estimate(self, last_run_at):
"""Returns when the periodic task should run next as a timedelta."""
now = self.now()
if not is_naive(last_run_at):
now = to_utc(now)
return remaining(last_run_at, self.run_every,
relative=self.relative, now=now)
self.relative, maybe_make_aware(self.now()))

def is_due(self, last_run_at):
"""Returns tuple of two items `(is_due, next_time_to_run)`,
Expand Down Expand Up @@ -394,7 +391,10 @@ def __init__(self, minute='*', hour='*', day_of_week='*',
self.day_of_week = self._expand_cronspec(day_of_week, 7)
self.day_of_month = self._expand_cronspec(day_of_month, 31, 1)
self.month_of_year = self._expand_cronspec(month_of_year, 12, 1)
self.nowfun = nowfun or current_app.now
self.nowfun = nowfun

def now(self):
return (self.nowfun or current_app.now)()

def __repr__(self):
return ('<crontab: %s %s %s %s %s (m/h/d/dM/MY)>' %
Expand All @@ -411,10 +411,10 @@ def __reduce__(self):
self._orig_day_of_month,
self._orig_month_of_year), None)

def remaining_estimate(self, last_run_at):
def remaining_estimate(self, last_run_at, tz=None):
"""Returns when the periodic task should run next as a timedelta."""
if not is_naive(last_run_at):
last_run_at = last_run_at.astimezone(timezone.utc)
tz = tz or self.tz
last_run_at = maybe_make_aware(last_run_at)
dow_num = last_run_at.isoweekday() % 7 # Sunday is day 0, not day 7

execute_this_date = (last_run_at.month in self.month_of_year and
Expand Down Expand Up @@ -463,7 +463,8 @@ def remaining_estimate(self, last_run_at):
delta = self._delta_to_next(last_run_at,
next_hour, next_minute)

return remaining(last_run_at, delta, now=self.nowfun())
return remaining(timezone.to_local(last_run_at, tz),
delta, timezone.to_local(self.now(), tz))

def is_due(self, last_run_at):
"""Returns tuple of two items `(is_due, next_time_to_run)`,
Expand All @@ -476,7 +477,7 @@ def is_due(self, last_run_at):
rem = timedelta_seconds(rem_delta)
due = rem == 0
if due:
rem_delta = self.remaining_estimate(last_run_at=self.nowfun())
rem_delta = self.remaining_estimate(self.now())
rem = timedelta_seconds(rem_delta)
return due, rem

Expand All @@ -489,6 +490,10 @@ def __eq__(self, other):
other.minute == self.minute)
return other is self

@property
def tz(self):
return current_app.conf.CELERY_TIMEZONE


def maybe_schedule(s, relative=False):
if isinstance(s, int):
Expand Down
1 change: 1 addition & 0 deletions celery/tests/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)

CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'UTC'

CELERYD_LOG_COLOR = False

Expand Down
6 changes: 6 additions & 0 deletions celery/utils/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ def from_exception(cls, exc):

def get_pickleable_exception(exc):
"""Make sure exception is pickleable."""
try:
pickle.loads(pickle.dumps(exc))
except Exception:
pass
else:
return exc
nearest = find_nearest_pickleable_exception(exc)
if nearest:
return nearest
Expand Down
26 changes: 21 additions & 5 deletions celery/utils/timeutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def tz_or_local(self, tzinfo=None):

def to_local(self, dt, local=None, orig=None):
if is_naive(dt):
dt = set_tz(dt, orig or self.utc)
return dt.astimezone(self.tz_or_local(local))
dt = make_aware(dt, orig or self.utc)
return localize(dt, self.tz_or_local(local))

def get_timezone(self, zone):
if isinstance(zone, basestring):
Expand Down Expand Up @@ -140,7 +140,6 @@ def remaining(start, ends_in, now=None, relative=False):
"""
now = now or datetime.utcnow()

end_date = start + ends_in
if relative:
end_date = delta_resolution(end_date, ends_in)
Expand Down Expand Up @@ -202,7 +201,7 @@ def is_naive(dt):
return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None


def set_tz(dt, tz):
def make_aware(dt, tz):
"""Sets the timezone for a datetime object."""
try:
localize = tz.localize
Expand All @@ -213,6 +212,23 @@ def set_tz(dt, tz):
return localize(dt, is_dst=None)


def localize(dt, tz):
"""Convert aware datetime to another timezone."""
dt = dt.astimezone(tz)
try:
normalize = tz.normalize
except AttributeError:
return dt
else:
return normalize(dt) # pytz


def to_utc(dt):
"""Converts naive datetime to UTC"""
return set_tz(dt, timezone.utc)
return make_aware(dt, timezone.utc)


def maybe_make_aware(dt, tz=None):
if is_naive(dt):
return to_utc(dt)
return localize(dt, timezone.utc if tz is None else tz)
1 change: 1 addition & 0 deletions celery/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from threading import Event

from billiard import cpu_count
from billiard.exceptions import WorkerLostError
from kombu.syn import detect_environment
from kombu.utils.finalize import Finalize

Expand Down
4 changes: 2 additions & 2 deletions celery/worker/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from functools import partial

from billiard import forking_enable
from billiard.exceptions import WorkerLostError

from celery.utils.log import get_logger
Expand Down Expand Up @@ -107,8 +106,8 @@ def on_timeout_cancel(result):

def create(self, w, semaphore=None, max_restarts=None):
threaded = not w.use_eventloop
forking_enable(not threaded or (w.no_execv or not w.force_execv))
procs = w.min_concurrency
forking_enable = not threaded or (w.no_execv or not w.force_execv)
if not threaded:
semaphore = w.semaphore = BoundedSemaphore(procs)
w._quick_acquire = w.semaphore.acquire
Expand All @@ -125,6 +124,7 @@ def create(self, w, semaphore=None, max_restarts=None):
threads=threaded,
max_restarts=max_restarts,
allow_restart=allow_restart,
forking_enable=forking_enable,
semaphore=semaphore)
if w.hub:
w.hub.on_init.append(partial(self.on_poll_init, pool))
Expand Down
9 changes: 9 additions & 0 deletions docs/django/first-steps-with-django.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ Our example task is pretty pointless, it just returns the sum of two
arguments, but it will do for demonstration, and it is referred to in many
parts of the Celery documentation.

.. admonition:: Relative Imports

You have to consistent in how you import the task module, e.g. if
you have ``project.app`` in ``INSTALLED_APPS`` then you also
need to import the tasks ``from project.app`` or else the names
of the tasks will be different.

See :ref:`task-naming-relative-imports`

Starting the worker process
===========================

Expand Down
1 change: 1 addition & 0 deletions extra/release/doc4allmods
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
PACKAGE="$1"
SKIP_PACKAGES="$PACKAGE tests management urls"
SKIP_FILES="celery.__compat__.rst
celery.__main__.rst
celery.task.sets.rst
celery.bin.rst
celery.bin.celeryd_detach.rst
Expand Down
2 changes: 1 addition & 1 deletion requirements/default-py3k.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
billiard>=2.7.3.12
python-dateutil>=2.1
pytz
kombu>=2.3
kombu>=2.4.0,<3.0
2 changes: 1 addition & 1 deletion requirements/default.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
billiard>=2.7.3.12
python-dateutil>=2.1
kombu>=2.3.1,<3.0
kombu>=2.4.0,<3.0
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ upload-dir = docs/.build/html
[bdist_rpm]
requires = uuid
importlib
billiard>=2.7.3.12
billiard >= 2.7.3.12
python-dateutil >= 2.1
kombu >= 2.3.1
kombu >= 2.4.0
ordereddict

0 comments on commit 3b17a2e

Please sign in to comment.