Skip to content
This repository has been archived by the owner on May 22, 2022. It is now read-only.

Commit

Permalink
event-collector: Graceful back off on 413 response
Browse files Browse the repository at this point in the history
In the case of a 413 Too Large response, break the events into smaller
lists and send them independently.
  • Loading branch information
kemitche committed May 5, 2015
1 parent 56c06e9 commit 3718d7a
Showing 1 changed file with 41 additions and 21 deletions.
62 changes: 41 additions & 21 deletions r2/r2/lib/eventcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def vote_event(self, vote, old_vote=None, event_base=None, request=None,
"""
if event_base is None:
event_base = Event.base_from_request(request, context)

event_base["event_topic"] = "vote"
event_base["event_name"] = "vote_server"
event_base["event_ts"] = str(epoch_timestamp(vote._date))
Expand Down Expand Up @@ -156,6 +157,10 @@ def missing_fields(self):
return (f for f in self.REQUIRED_FIELDS if f not in self)


def _split_list(some_list):
return some_list[:len(some_list)/2], some_list[len(some_list)/2:]


class EventPublisher(object):
def __init__(self, url, signature_key, secret, user_agent, stats,
timeout=None):
Expand All @@ -172,8 +177,8 @@ def _make_signature(self, payload):
mac = hmac.new(self.secret, payload, hashlib.sha256).hexdigest()
return "key={key}, mac={mac}".format(key=self.signature_key, mac=mac)

def publish(self, events):
events_json = json.dumps(events)
def _publish(self, events):
events_json = "[" + ", ".join(events) + "]"
headers = {
"Date": _make_http_date(),
"User-Agent": self.user_agent,
Expand All @@ -184,11 +189,23 @@ def publish(self, events):
with self.stats.get_timer("providers.event_collector"):
resp = self.session.post(self.url, data=events_json,
headers=headers, timeout=self.timeout)
return resp

return resp


def process_events(g, timeout=5.0, **kw):
def publish(self, events):
events_stack = [events]
while events_stack:
some_events = events_stack.pop()
resp = self._publish(some_events)
if resp.status_code == 413 and len(some_events) > 1:
# If event-collector said we sent too many events at once,
# split the events in 2 and re-send.
# If a single event was too large, there's nothing to be done.
events_stack.extend(_split_list(some_events))
else:
yield resp, some_events


def process_events(g, timeout=5.0, max_event_size=5120, **kw):
publisher = EventPublisher(
g.events_collector_url,
g.secrets["events_collector_key"],
Expand All @@ -200,20 +217,23 @@ def process_events(g, timeout=5.0, **kw):

@g.stats.amqp_processor("event_collector")
def processor(msgs, chan):
events = [json.loads(msg.body) for msg in msgs]
response = publisher.publish(events)
if response.ok:
g.log.info("%s - Published %s events",
response.status_code, len(events))
else:
g.log.warning(
"Event send failed %s - %s",
response.status_code,
response.raw.reason,
)
g.log.warning("%r", response.content)
g.log.warning("%r", response.request.headers)
g.log.warning("%r", response.request.data)
response.raise_for_status()
events = []
for msg in msgs:
if len(msg.body) <= max_event_size:
events.append(msg.body)
else:
g.log.warning("Event too large (%s); dropping", len(msg.body))
g.log.warning("%r", msg.body)
for response, sent in publisher.publish(events):
if response.ok:
g.log.info("Published %s events", len(sent))
else:
g.log.warning(
"Event send failed %s - %s",
response.status_code,
response.reason,
)
g.log.warning("Response headers: %r", response.headers)
response.raise_for_status()

r2.lib.amqp.handle_items("event_collector", processor, **kw)

0 comments on commit 3718d7a

Please sign in to comment.