Skip to content

Commit

Permalink
Keep rates in the ratestore for 10 minutes (grafana#7599)
Browse files Browse the repository at this point in the history
This PR accounts for intermittent streams that might get dropped on a
per-second update interval

Co-authored-by: Dylan Guedes <[email protected]>
  • Loading branch information
MasslessParticle and DylanGuedes authored Nov 4, 2022
1 parent de8ed49 commit af68dbc
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
40 changes: 30 additions & 10 deletions pkg/distributor/ratestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ type ingesterClient struct {
client logproto.StreamDataClient
}

type expiringRate struct {
createdAt time.Time
rate int64
}

type rateStore struct {
services.Service

ring ring.ReadRing
clientPool poolClientFactory
rates map[string]int64
rates map[string]expiringRate
rateLock sync.RWMutex
rateKeepAlive time.Duration
ingesterTimeout time.Duration
maxParallelism int
limits Limits
Expand All @@ -67,8 +73,10 @@ func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l
clientPool: cf,
maxParallelism: cfg.MaxParallelism,
ingesterTimeout: cfg.IngesterReqTimeout,
rateKeepAlive: 10 * time.Minute,
limits: l,
metrics: newRateStoreMetrics(registerer),
rates: make(map[string]expiringRate),
}

rateCollectionInterval := util.DurationWithJitter(cfg.StreamRateUpdateInterval, 0.2)
Expand Down Expand Up @@ -98,11 +106,28 @@ func (s *rateStore) updateAllRates(ctx context.Context) error {
streamRates := s.getRates(ctx, clients)
rates := s.aggregateByShard(streamRates)

s.metrics.streamCount.Set(float64(len(rates)))

s.rateLock.Lock()
defer s.rateLock.Unlock()
s.rates = rates

for stream, rate := range rates {
s.rates[stream] = expiringRate{
rate: rate,
createdAt: time.Now(),
}
}

var maxRate int64
for stream, rate := range s.rates {
if time.Since(rate.createdAt) > s.rateKeepAlive {
delete(s.rates, stream)
continue
}

maxRate = max(maxRate, rate.rate)
}

s.metrics.maxStreamRate.Set(float64(maxRate))
s.metrics.streamCount.Set(float64(len(s.rates)))

return nil
}
Expand All @@ -124,7 +149,6 @@ func (s *rateStore) anyShardingEnabled() bool {
}

func (s *rateStore) aggregateByShard(streamRates map[string]*logproto.StreamRate) map[string]int64 {
var maxRate int64
shardCount := make(map[string]int)
rates := make(map[string]int64)

Expand All @@ -134,21 +158,17 @@ func (s *rateStore) aggregateByShard(streamRates map[string]*logproto.StreamRate

if _, ok := rates[key]; ok {
rates[key] += sr.Rate
maxRate = max(rates[key], maxRate)

continue
}

rates[key] = sr.Rate
maxRate = max(rates[key], maxRate)
}

var maxShards int64
for _, v := range shardCount {
maxShards = max(maxShards, int64(v))
}

s.metrics.maxStreamRate.Set(float64(maxRate))
s.metrics.maxStreamShardCount.Set(float64(maxShards))
return rates
}
Expand Down Expand Up @@ -244,7 +264,7 @@ func (s *rateStore) RateFor(tenant string, streamHash uint64) int64 {
s.rateLock.RLock()
defer s.rateLock.RUnlock()

return s.rates[key(tenant, streamHash)]
return s.rates[key(tenant, streamHash)].rate
}

func key(tenant string, hash uint64) string {
Expand Down
46 changes: 42 additions & 4 deletions pkg/distributor/ratestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,33 @@ func TestRateStore(t *testing.T) {
time.Sleep(time.Second)
require.Equal(t, int64(0), tc.rateStore.RateFor("tenant 1", 0))
})

t.Run("it clears the rate after an interval", func(t *testing.T) {
tc := setup(true)
tc.ring.replicationSet = ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Addr: "ingester0"},
},
}

tc.clientPool.clients = map[string]client.PoolClient{
"ingester0": newRateClient([]*logproto.StreamRate{
{Tenant: "tenant 1", StreamHash: 1, StreamHashNoShard: 0, Rate: 25},
}, 1),
}

tc.rateStore.rateKeepAlive = 2 * time.Second
_ = tc.rateStore.StartAsync(context.Background())
defer tc.rateStore.StopAsync()

require.Eventually(t, func() bool {
return 25 == tc.rateStore.RateFor("tenant 1", 0)
}, time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
return 0 == tc.rateStore.RateFor("tenant 1", 0)
}, 3*time.Second, 250*time.Millisecond)
})
}

func newFakeRing() *fakeRing {
Expand Down Expand Up @@ -188,18 +215,29 @@ func (p *fakeClientPool) GetClientFor(addr string) (client.PoolClient, error) {
return p.clients[addr], p.err
}

func newRateClient(rates []*logproto.StreamRate) client.PoolClient {
func newRateClient(rates []*logproto.StreamRate, maxResponses ...int) client.PoolClient {
var maxResp int
if len(maxResponses) > 0 {
maxResp = maxResponses[0]
}

return client2.ClosableHealthAndIngesterClient{
StreamDataClient: &fakeStreamDataClient{resp: &logproto.StreamRatesResponse{StreamRates: rates}},
StreamDataClient: &fakeStreamDataClient{resp: &logproto.StreamRatesResponse{StreamRates: rates}, maxResponses: maxResp},
}
}

type fakeStreamDataClient struct {
resp *logproto.StreamRatesResponse
err error
resp *logproto.StreamRatesResponse
err error
maxResponses int
callCount int
}

func (c *fakeStreamDataClient) GetStreamRates(ctx context.Context, in *logproto.StreamRatesRequest, opts ...grpc.CallOption) (*logproto.StreamRatesResponse, error) {
if c.maxResponses > 0 && c.callCount > c.maxResponses {
return nil, c.err
}
c.callCount++
return c.resp, c.err
}

Expand Down

0 comments on commit af68dbc

Please sign in to comment.