Skip to content

Commit

Permalink
flakes
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Sep 9, 2013
1 parent 66fcd90 commit e50f52e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
1 change: 0 additions & 1 deletion celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from billiard.util import register_after_fork
from kombu.clocks import LamportClock
from kombu.common import oid_from
from kombu.serialization import enable_insecure_serializers
from kombu.utils import cached_property

from celery import platforms
Expand Down
5 changes: 3 additions & 2 deletions celery/concurrency/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def on_partial_read(self, job, proc):
assert len(self._queues) == before

def destroy_queues(self, queues):
"""Destroyes queues that can no longer be used and that will
"""Destroy queues that can no longer be used, so that they
be replaced by new sockets."""
removed = 1
try:
Expand Down Expand Up @@ -623,7 +623,8 @@ def per(v, total):
return {
'total': total,
'avg': per(total / len(self.write_stats) if total else 0, total),
'all': ', '.join(per(v, total) for v in vals)
'all': ', '.join(per(v, total) for v in vals),
'raw': ', '.join(map(str, vals)),
}

def on_poll_init(self, w, hub):
Expand Down
66 changes: 34 additions & 32 deletions celery/worker/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class BoundedSemaphore(object):
"""Asynchronous Bounded Semaphore.
Bounded means that the value will stay within the specified
range even if it is released more times than it was acquired.
This type is *not thread safe*.
range even if released more times than it was acquired.
Example:
Expand Down Expand Up @@ -70,8 +68,8 @@ def __init__(self, value):
self._waiting = []

def acquire(self, callback, *partial_args):
"""Acquire semaphore, applying ``callback`` when
the semaphore is ready.
"""Acquire semaphore, applying ``callback`` if
the resource is available.
:param callback: The callback to apply.
:param \*partial_args: partial arguments to callback.
Expand All @@ -88,8 +86,8 @@ def acquire(self, callback, *partial_args):
def release(self):
"""Release semaphore.
This will apply any waiting callbacks from previous
calls to :meth:`acquire` done when the semaphore was busy.
If there are any waiters this will apply the first waiter
that is waiting for the resource (FIFO order).
"""
self.value = min(self.value + 1, self.initial_value)
Expand All @@ -98,18 +96,18 @@ def release(self):
waiter(*args)

def grow(self, n=1):
"""Change the size of the semaphore to hold more values."""
"""Change the size of the semaphore to accept more users."""
self.initial_value += n
self.value += n
[self.release() for _ in range(n)]

def shrink(self, n=1):
"""Change the size of the semaphore to hold less values."""
"""Change the size of the semaphore to accept less users."""
self.initial_value = max(self.initial_value - n, 0)
self.value = max(self.value - n, 0)

def clear(self):
"""Reset the sempahore, including wiping out any waiting callbacks."""
"""Reset the sempahore, which also wipes out any waiting callbacks."""
self._waiting[:] = []
self.value = self.initial_value

Expand Down Expand Up @@ -149,6 +147,14 @@ def __init__(self, timer=None):
self.on_init = []
self.on_close = []
self.on_task = []

# The eventloop (in celery.worker.loops)
# will merge fds in this set and then instead of calling
# the callback for each ready fd it will call the
# :attr:`consolidate_callback` with the list of ready_fds
# as an argument. This API is internal and is only
# used by the multiprocessing pool to find inqueues
# that are ready to write.
self.consolidate = set()
self.consolidate_callback = None

Expand Down Expand Up @@ -180,16 +186,6 @@ def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
logger.error('Error in timer: %r', exc, exc_info=1)
return min(max(delay or 0, min_delay), max_delay)

def _add(self, fd, cb, flags, consolidate=False):
#if flags & WRITE:
# ex = self.writers.get(fd)
# if ex and ex.__name__ == '_write_job':
# assert not ex.gi_frame or ex.gi_frame == -1
self.poller.register(fd, flags)
(self.readers if flags & READ else self.writers)[fileno(fd)] = cb
if consolidate:
self.consolidate.add(fd)

def add(self, fds, callback, flags, consolidate=False):
for fd in maybe_list(fds, None):
try:
Expand Down Expand Up @@ -220,12 +216,6 @@ def _unregister(self, fd):
except (KeyError, OSError):
pass

def _discard(self, fd):
fd = fileno(fd)
self.readers.pop(fd, None)
self.writers.pop(fd, None)
self.consolidate.discard(fd)

def close(self, *args):
[self._unregister(fd) for fd in self.readers]
self.readers.clear()
Expand All @@ -234,13 +224,17 @@ def close(self, *args):
for callback in self.on_close:
callback(self)

def _repr_readers(self):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
for fd, cb in items(self.readers)]
def _add(self, fd, cb, flags, consolidate=False):
self.poller.register(fd, flags)
(self.readers if flags & READ else self.writers)[fileno(fd)] = cb
if consolidate:
self.consolidate.add(fd)

def _repr_writers(self):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
for fd, cb in items(self.writers)]
def _discard(self, fd):
fd = fileno(fd)
self.readers.pop(fd, None)
self.writers.pop(fd, None)
self.consolidate.discard(fd)

def repr_active(self):
return ', '.join(self._repr_readers() + self._repr_writers())
Expand All @@ -254,6 +248,14 @@ def repr_events(self, events):
for fd, fl in events
)

def _repr_readers(self):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
for fd, cb in items(self.readers)]

def _repr_writers(self):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
for fd, cb in items(self.writers)]

def _callback_for(self, fd, flag, *default):
try:
if flag & READ:
Expand Down

0 comments on commit e50f52e

Please sign in to comment.