Skip to content

Commit

Permalink
rename EventDispatcher.copy_buffer -> extend_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jul 9, 2013
1 parent e6e016a commit 1f35d6f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
16 changes: 8 additions & 8 deletions celery/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ def get_exchange(conn):
return ex


def Event(type, _fields=None, **fields):
def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
"""Create an event.
An event is a dictionary, the only required field is ``type``.
A ``timestamp`` field will be set to the current time if not provided.
"""
event = dict(_fields or {}, type=type, **fields)
event = __dict__(_fields or {}, type=type, **fields)
if 'timestamp' not in event:
event['timestamp'] = time.time()
event['timestamp'] = __now__()
return event


Expand Down Expand Up @@ -225,9 +225,9 @@ def flush(self):
return
self.send(type, **fields)

def copy_buffer(self, other):
def extend_buffer(self, other):
"""Copies the outbound buffer of another instance."""
self._outbound_buffer = other._outbound_buffer
self._outbound_buffer.extend(other._outbound_buffer)

def close(self):
"""Close the event dispatcher."""
Expand Down Expand Up @@ -347,7 +347,7 @@ def State(self):
@contextmanager
def default_dispatcher(self, hostname=None, enabled=True,
buffer_while_offline=False):
with self.app.amqp.producer_pool.acquire(block=True) as pub:
with self.Dispatcher(pub.connection, hostname, enabled,
pub.channel, buffer_while_offline) as d:
with self.app.amqp.producer_pool.acquire(block=True) as prod:
with self.Dispatcher(prod.connection, hostname, enabled,
prod.channel, buffer_while_offline) as d:
yield d
2 changes: 1 addition & 1 deletion celery/worker/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def start(self, c):
enabled=self.send_events, groups=self.groups,
)
if prev:
dis.copy_buffer(prev)
dis.extend_buffer(prev)
dis.flush()

def stop(self, c):
Expand Down

0 comments on commit 1f35d6f

Please sign in to comment.