Skip to content
This repository has been archived by the owner on May 22, 2022. It is now read-only.

Commit

Permalink
event-collector queue: Pre-calculate content size
Browse files Browse the repository at this point in the history
The max Content-Length the event-collector endpoint will take
is static and known, so pre-calculate the event batch being sent
and keep the total under the max size.
  • Loading branch information
kemitche committed May 5, 2015
1 parent 3718d7a commit 11ee44c
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions r2/r2/lib/eventcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ def _split_list(some_list):

class EventPublisher(object):
def __init__(self, url, signature_key, secret, user_agent, stats,
timeout=None):
max_content_length=40 * 1024, timeout=None):
self.url = url
self.signature_key = signature_key
self.secret = secret
self.user_agent = user_agent
self.timeout = timeout
self.stats = stats
self.max_content_length = max_content_length

self.session = requests.Session()

Expand All @@ -178,6 +179,8 @@ def _make_signature(self, payload):
return "key={key}, mac={mac}".format(key=self.signature_key, mac=mac)

def _publish(self, events):
# Note: If how the JSON payload is created is changed,
# update the content-length estimations in `_chunk_events`
events_json = "[" + ", ".join(events) + "]"
headers = {
"Date": _make_http_date(),
Expand All @@ -191,18 +194,31 @@ def _publish(self, events):
headers=headers, timeout=self.timeout)
return resp

def _chunk_events(self, events):
to_send = []
# base content-length is 2 for the `[` and `]`
send_size = 2
for event in events:
# increase estimated content-length by length of message,
# plus the length of the `, ` used to join the events JSON
send_size += len(event) + len(", ")

# If adding this event would put us over the batch limit,
# yield the current set of events first
if send_size >= self.max_content_length:
yield to_send
to_send = []
send_size = 2 + len(event) + len(", ")

to_send.append(event)

if to_send:
yield to_send

def publish(self, events):
events_stack = [events]
while events_stack:
some_events = events_stack.pop()
for some_events in self._chunk_events(events):
resp = self._publish(some_events)
if resp.status_code == 413 and len(some_events) > 1:
# If event-collector said we sent too many events at once,
# split the events in 2 and re-send.
# If a single event was too large, there's nothing to be done.
events_stack.extend(_split_list(some_events))
else:
yield resp, some_events
yield resp, some_events


def process_events(g, timeout=5.0, max_event_size=5120, **kw):
Expand Down

0 comments on commit 11ee44c

Please sign in to comment.