Skip to content

Commit

Permalink
Group is now lazy until .apply_async, but having regen support lazy _…
Browse files Browse the repository at this point in the history
…_getitem__ for iterators
  • Loading branch information
ask committed Nov 20, 2015
1 parent 89fa04c commit 9982773
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
48 changes: 25 additions & 23 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from celery.result import GroupResult
from celery.utils import abstract
from celery.utils.functional import (
maybe_list, is_list, regen, chunks as _chunks,
maybe_list, is_list, _regen, regen, chunks as _chunks,
)
from celery.utils.text import truncate

Expand Down Expand Up @@ -661,7 +661,7 @@ def _maybe_group(tasks, app):
elif isinstance(tasks, abstract.CallableSignature):
tasks = [tasks]
else:
tasks = [signature(t, app=app) for t in regen(tasks)]
tasks = [signature(t, app=app) for t in tasks]
return tasks


Expand All @@ -670,9 +670,12 @@ class group(Signature):
tasks = _getitem_property('kwargs.tasks')

def __init__(self, *tasks, **options):
app = options.get('app')
if len(tasks) == 1:
tasks = _maybe_group(tasks[0], app)
tasks = tasks[0]
if isinstance(tasks, group):
tasks = tasks.tasks
if not isinstance(tasks, _regen):
tasks = regen(tasks)
Signature.__init__(
self, 'celery.group', (), {'tasks': tasks}, **options
)
Expand All @@ -691,25 +694,24 @@ def _prepared(self, tasks, partial_args, group_id, root_id, app, dict=dict,
CallableSignature=abstract.CallableSignature,
from_dict=Signature.from_dict):
for task in tasks:
if isinstance(task, dict):
if isinstance(task, CallableSignature):
# local sigs are always of type Signature, and we
# clone them to make sure we do not modify the originals.
task = task.clone()
else:
# serialized sigs must be converted to Signature.
task = from_dict(task, app=app)
if isinstance(task, group):
# needs yield_from :(
unroll = task._prepared(
task.tasks, partial_args, group_id, root_id, app,
)
for taskN, resN in unroll:
yield taskN, resN
else:
if partial_args and not task.immutable:
task.args = tuple(partial_args) + tuple(task.args)
yield task, task.freeze(group_id=group_id, root_id=root_id)
if isinstance(task, CallableSignature):
# local sigs are always of type Signature, and we
# clone them to make sure we do not modify the originals.
task = task.clone()
else:
# serialized sigs must be converted to Signature.
task = from_dict(task, app=app)
if isinstance(task, group):
# needs yield_from :(
unroll = task._prepared(
task.tasks, partial_args, group_id, root_id, app,
)
for taskN, resN in unroll:
yield taskN, resN
else:
if partial_args and not task.immutable:
task.args = tuple(partial_args) + tuple(task.args)
yield task, task.freeze(group_id=group_id, root_id=root_id)

def _apply_tasks(self, tasks, producer=None, app=None,
add_to_parent=None, **options):
Expand Down
32 changes: 27 additions & 5 deletions celery/utils/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
from collections import OrderedDict
from functools import partial, wraps
from inspect import getargspec, isfunction
from itertools import islice
from itertools import chain, islice

from amqp import promise
from kombu.utils import cached_property
from kombu.utils.functional import lazy, maybe_evaluate, is_list, maybe_list

from celery.five import UserDict, UserList, items, keys, range
Expand Down Expand Up @@ -320,16 +319,39 @@ class _regen(UserList, list):
# must be subclass of list so that json can encode.
def __init__(self, it):
self.__it = it
self.__index = 0
self.__consumed = []

def __reduce__(self):
return list, (self.data,)

def __length_hint__(self):
return self.__it.__length_hint__()

@cached_property
def __iter__(self):
return chain(self.__consumed, self.__it)

def __getitem__(self, index):
if index < 0:
return self.data[index]
try:
return self.__consumed[index]
except IndexError:
try:
for i in range(self.__index, index + 1):
self.__consumed.append(next(self.__it))
except StopIteration:
raise IndexError(index)
else:
return self.__consumed[index]

@property
def data(self):
return list(self.__it)
try:
self.__consumed.extend(list(self.__it))
except StopIteration:
pass
return self.__consumed


def dictfilter(d=None, **kw):
Expand Down Expand Up @@ -365,7 +387,7 @@ def head_from_fun(fun, bound=False, debug=False):
fun_args=_argsfromspec(getargspec(fun)),
fun_value=1,
)
if debug:
if debug: # pragma: no cover
print(definition, file=sys.stderr)
namespace = {'__name__': 'headof_{0}'.format(name)}
exec(definition, namespace)
Expand Down

0 comments on commit 9982773

Please sign in to comment.