Skip to content

Commit

Permalink
feat(metrics_indexer): Wire up the writes limiter to postgres [INGEST…
Browse files Browse the repository at this point in the history
…-1380] (getsentry#37174)
  • Loading branch information
untitaker authored Aug 3, 2022
1 parent d25f4fc commit ef0c0df
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 13 deletions.
40 changes: 28 additions & 12 deletions src/sentry/sentry_metrics/indexer/postgres_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry.sentry_metrics.indexer.cache import indexer_cache
from sentry.sentry_metrics.indexer.models import BaseIndexer, PerfStringIndexer
from sentry.sentry_metrics.indexer.models import StringIndexer as StringIndexerTable
from sentry.sentry_metrics.indexer.ratelimiters import writes_limiter
from sentry.sentry_metrics.indexer.strings import REVERSE_SHARED_STRINGS, SHARED_STRINGS
from sentry.utils import metrics

Expand Down Expand Up @@ -136,24 +137,39 @@ def bulk_record(
indexer_cache.set_many(new_results_to_cache, use_case_id.value)
return cache_key_results.merge(db_read_key_results)

new_records = []
for write_pair in db_write_keys.as_tuples():
organization_id, string = write_pair
new_records.append(
self._table(use_case_id)(organization_id=int(organization_id), string=string)
)
with writes_limiter.check_write_limits(use_case_id, db_write_keys) as writes_limiter_state:
# After the DB has successfully committed writes, we exit this
# context manager and consume quotas. If the DB crashes we
# shouldn't consume quota.
filtered_db_write_keys = writes_limiter_state.accepted_keys
del db_write_keys
rate_limited_write_results = writes_limiter_state.dropped_strings

new_records = []
for write_pair in filtered_db_write_keys.as_tuples():
organization_id, string = write_pair
new_records.append(
self._table(use_case_id)(organization_id=int(organization_id), string=string)
)

with metrics.timer("sentry_metrics.indexer.pg_bulk_create"):
# We use `ignore_conflicts=True` here to avoid race conditions where metric indexer
# records might have be created between when we queried in `bulk_record` and the
# attempt to create the rows down below.
self._table(use_case_id).objects.bulk_create(new_records, ignore_conflicts=True)
with metrics.timer("sentry_metrics.indexer.pg_bulk_create"):
# We use `ignore_conflicts=True` here to avoid race conditions where metric indexer
# records might have be created between when we queried in `bulk_record` and the
# attempt to create the rows down below.
self._table(use_case_id).objects.bulk_create(new_records, ignore_conflicts=True)

db_write_key_results = KeyResults()
for dropped_string in rate_limited_write_results:
db_write_key_results.add_key_result(
dropped_string.key_result,
fetch_type=dropped_string.fetch_type,
fetch_type_ext=dropped_string.fetch_type_ext,
)

db_write_key_results.add_key_results(
[
KeyResult(org_id=db_obj.organization_id, string=db_obj.string, id=db_obj.id)
for db_obj in self._get_db_records(use_case_id, db_write_keys)
for db_obj in self._get_db_records(use_case_id, filtered_db_write_keys)
],
fetch_type=FetchType.FIRST_SEEN,
)
Expand Down
67 changes: 66 additions & 1 deletion tests/sentry/sentry_metrics/test_postgres_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from sentry.sentry_metrics.configuration import UseCaseKey
from sentry.sentry_metrics.indexer.base import FetchType, KeyCollection, Metadata
from sentry.sentry_metrics.indexer.base import FetchType, FetchTypeExt, KeyCollection, Metadata
from sentry.sentry_metrics.indexer.cache import indexer_cache
from sentry.sentry_metrics.indexer.models import MetricsKeyIndexer, StringIndexer
from sentry.sentry_metrics.indexer.postgres import PGStringIndexer
Expand All @@ -13,6 +13,7 @@
)
from sentry.sentry_metrics.indexer.strings import SHARED_STRINGS
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.options import override_options
from sentry.utils.cache import cache


Expand Down Expand Up @@ -268,3 +269,67 @@ def test_get_db_records(self):

assert indexer_cache.get(string.id, self.cache_namespace) is None
assert indexer_cache.get(key, self.cache_namespace) is None

def test_rate_limited(self):
"""
Assert that rate limits per-org and globally are applied at all.
Since we don't have control over ordering in sets/dicts, we have no
control over which string gets rate-limited. That makes assertions
quite awkward and imprecise.
"""
org_strings = {1: {"a", "b", "c"}, 2: {"e", "f"}, 3: {"g"}}

with override_options(
{
"sentry-metrics.writes-limiter.limits.releasehealth.per-org": [
{"window_seconds": 10, "granularity_seconds": 10, "limit": 1}
],
}
):
results = self.indexer.bulk_record(
use_case_id=self.use_case_id, org_strings=org_strings
)

assert len(results[1]) == 3
assert len(results[2]) == 2
assert len(results[3]) == 1
assert results[3]["g"] is not None

rate_limited_strings = set()

for org_id in 1, 2, 3:
for k, v in results[org_id].items():
if v is None:
rate_limited_strings.add((org_id, k))

assert len(rate_limited_strings) == 3
assert (3, "g") not in rate_limited_strings

for org_id, string in rate_limited_strings:
assert results.get_fetch_metadata()[org_id][string] == Metadata(
id=None,
fetch_type=FetchType.RATE_LIMITED,
fetch_type_ext=FetchTypeExt(is_global=False),
)

org_strings = {1: rate_limited_strings}

with override_options(
{
"sentry-metrics.writes-limiter.limits.releasehealth.global": [
{"window_seconds": 10, "granularity_seconds": 10, "limit": 2}
],
}
):
results = self.indexer.bulk_record(
use_case_id=self.use_case_id, org_strings=org_strings
)

rate_limited_strings2 = set()
for k, v in results[1].items():
if v is None:
rate_limited_strings2.add(k)

assert len(rate_limited_strings2) == 1
assert len(rate_limited_strings - rate_limited_strings2) == 2

0 comments on commit ef0c0df

Please sign in to comment.