Skip to content

Commit

Permalink
event hub now supports writers
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed May 21, 2012
1 parent 7242818 commit 2d93e31
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions celery/worker/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import deque

from kombu.utils import cached_property
from kombu.utils.eventio import poll, POLL_READ, POLL_ERR
from kombu.utils.eventio import poll, POLL_READ, POLL_ERR, POLL_WRITE

from celery.utils.timer2 import Schedule

Expand Down Expand Up @@ -72,8 +72,17 @@ def add(self, fd, callback, flags=None):
fileno = fd
self.fdmap[fileno] = callback

def update(self, *maps):
[self.add(*x) for row in maps for x in row.iteritems()]
def add_reader(self, fd, callback):
return self.add(fd, callback, POLL_READ|POLL_ERR)

def add_writer(self, fd, callback):
return self.add(fd, callback, POLL_WRITE)

def update_readers(self, *maps):
[self.add_reader(*x) for row in maps for x in row.iteritems()]

def update_writers(self, *maps):
[self.add_writer(*x) for row in maps for x in row.iteritems()]

def remove(self, fd):
try:
Expand All @@ -83,6 +92,7 @@ def remove(self, fd):

def close(self):
[self.remove(fd) for fd in self.fdmap.keys()]
self.poller.close()

@cached_property
def scheduler(self):
Expand Down

0 comments on commit 2d93e31

Please sign in to comment.