Skip to content

Commit

Permalink
worker: Split out worker sampling rate, and add Sentry transactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmv authored and timabbott committed Mar 21, 2024
1 parent f64b947 commit 9451d08
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 62 deletions.
122 changes: 63 additions & 59 deletions zerver/worker/queue_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from django.utils.translation import override as override_language
from psycopg2.sql import SQL, Literal
from returns.curry import partial
from sentry_sdk import add_breadcrumb, configure_scope
from typing_extensions import override
from zulip_bots.lib import extract_query_without_mention

Expand Down Expand Up @@ -248,6 +247,7 @@ def initialize_statistics(self) -> None:

self.update_statistics()

@sentry_sdk.trace
def update_statistics(self) -> None:
total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
total_events = sum(events_number for events_number, _ in self.recent_consume_times)
Expand Down Expand Up @@ -292,70 +292,74 @@ def do_consume(
self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]]
) -> None:
consume_time_seconds: Optional[float] = None
with configure_scope() as scope:
scope.clear_breadcrumbs()
add_breadcrumb(
with sentry_sdk.start_transaction(
op="task",
name=f"consume {self.queue_name}",
custom_sampling_context={"queue": self.queue_name},
):
sentry_sdk.add_breadcrumb(
type="debug",
category="queue_processor",
message=f"Consuming {self.queue_name}",
data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()},
)
try:
if self.idle:
# We're reactivating after having gone idle due to emptying the queue.
# We should update the stats file to keep it fresh and to make it clear
# that the queue started processing, in case the event we're about to process
# makes us freeze.
self.idle = False
self.update_statistics()

time_start = time.time()
if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout:
try:
signal.signal(
signal.SIGALRM,
partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
)
try:
if self.idle:
# We're reactivating after having gone idle due to emptying the queue.
# We should update the stats file to keep it fresh and to make it clear
# that the queue started processing, in case the event we're about to process
# makes us freeze.
self.idle = False
self.update_statistics()

time_start = time.time()
if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout:
try:
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
consume_func(events)
signal.signal(
signal.SIGALRM,
partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
)
try:
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
consume_func(events)
finally:
signal.alarm(0)
finally:
signal.alarm(0)
finally:
signal.signal(signal.SIGALRM, signal.SIG_DFL)
else:
consume_func(events)
consume_time_seconds = time.time() - time_start
self.consumed_since_last_emptied += len(events)
except Exception as e:
self._handle_consume_exception(events, e)
finally:
flush_per_request_caches()
reset_queries()

if consume_time_seconds is not None:
self.recent_consume_times.append((len(events), consume_time_seconds))

remaining_local_queue_size = self.get_remaining_local_queue_size()
if remaining_local_queue_size == 0:
self.queue_last_emptied_timestamp = time.time()
self.consumed_since_last_emptied = 0
# We've cleared all the events from the queue, so we don't
# need to worry about the small overhead of doing a disk write.
# We take advantage of this to update the stats file to keep it fresh,
# especially since the queue might go idle until new events come in.
self.update_statistics()
self.idle = True
else:
self.consume_iteration_counter += 1
if (
self.consume_iteration_counter
>= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
or time.time() - self.last_statistics_update_time
>= self.MAX_SECONDS_BEFORE_UPDATE_STATS
):
self.consume_iteration_counter = 0
self.update_statistics()
signal.signal(signal.SIGALRM, signal.SIG_DFL)
else:
consume_func(events)
consume_time_seconds = time.time() - time_start
self.consumed_since_last_emptied += len(events)
except Exception as e:
self._handle_consume_exception(events, e)
finally:
flush_per_request_caches()
reset_queries()

with sentry_sdk.start_span(description="statistics"):
if consume_time_seconds is not None:
self.recent_consume_times.append((len(events), consume_time_seconds))

remaining_local_queue_size = self.get_remaining_local_queue_size()
if remaining_local_queue_size == 0:
self.queue_last_emptied_timestamp = time.time()
self.consumed_since_last_emptied = 0
# We've cleared all the events from the queue, so we don't
# need to worry about the small overhead of doing a disk write.
# We take advantage of this to update the stats file to keep it fresh,
# especially since the queue might go idle until new events come in.
self.update_statistics()
self.idle = True
else:
self.consume_iteration_counter += 1
if (
self.consume_iteration_counter
>= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
or time.time() - self.last_statistics_update_time
>= self.MAX_SECONDS_BEFORE_UPDATE_STATS
):
self.consume_iteration_counter = 0
self.update_statistics()

def consume_single_event(self, event: Dict[str, Any]) -> None:
consume_func = lambda events: self.consume(events[0])
Expand All @@ -372,7 +376,7 @@ def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exc
# is needed and the worker can proceed.
return

with configure_scope() as scope:
with sentry_sdk.configure_scope() as scope:
scope.set_context(
"events",
{
Expand Down
3 changes: 2 additions & 1 deletion zproject/default_settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from email.headerregistry import Address
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple, Union

from django_auth_ldap.config import GroupOfUniqueNamesType, LDAPGroupType

Expand Down Expand Up @@ -138,6 +138,7 @@

# Sentry.io error defaults to off
SENTRY_DSN: Optional[str] = None
SENTRY_TRACE_WORKER_RATE: Union[float, Dict[str, float]] = 0.0
SENTRY_TRACE_RATE: float = 0.0
SENTRY_PROFILE_RATE: float = 0.1
SENTRY_FRONTEND_DSN: Optional[str] = None
Expand Down
17 changes: 15 additions & 2 deletions zproject/sentry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

import sentry_sdk
from django.utils.translation import override as override_language
Expand Down Expand Up @@ -50,6 +50,19 @@ def add_context(event: "Event", hint: "Hint") -> Optional["Event"]:
return event


def traces_sampler(sampling_context: Dict[str, Any]) -> Union[float, bool]:
from django.conf import settings

queue = sampling_context.get("queue")
if queue is not None and isinstance(queue, str):
if isinstance(settings.SENTRY_TRACE_WORKER_RATE, float):
return settings.SENTRY_TRACE_WORKER_RATE
else:
return settings.SENTRY_TRACE_WORKER_RATE.get(queue, 0.0)
else:
return settings.SENTRY_TRACE_RATE


def setup_sentry(dsn: Optional[str], environment: str) -> None:
from django.conf import settings

Expand Down Expand Up @@ -82,7 +95,7 @@ def setup_sentry(dsn: Optional[str], environment: str) -> None:
# PII while having the identifiers needed to determine that an
# exception only affects a small subset of users or realms.
send_default_pii=True,
traces_sample_rate=settings.SENTRY_TRACE_RATE,
traces_sampler=traces_sampler,
profiles_sample_rate=settings.SENTRY_PROFILE_RATE,
)

Expand Down

0 comments on commit 9451d08

Please sign in to comment.