Skip to content

Commit

Permalink
Implement queue to forward traces to metrics-generator (grafana#1331)
Browse files Browse the repository at this point in the history
* Implement queue to forward traces to metrics-generator

Use eviciting queue to buffer and send push requests from the distributor
to the generator.

This queue will drop the oldest item in the queue if it's full and a new
request is pushed.

* Format

* Evicting queue improvements and fixes

* Change implementation away from subscriptions

* Implement circular queue

* Lint

* Add forwarder

* Changelog entry

* ShutdownCh unused

* Add TODOs

* Add overwrite function to queue

* Add overwrites metric

* Switch to channels approach

* Refactor to address comments

* minor fix

* lint

* Address last comments

* Add metric for queue length

* Address comments

* Use RWMutex

* Record metrics synchronously

* Add queueManager default config

* Rename forwarder metric

* Remove unused method

* Fix metric name

* Address last comments
  • Loading branch information
mapno authored Apr 22, 2022
1 parent b1fd169 commit f2406df
Show file tree
Hide file tree
Showing 7 changed files with 607 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [ENHANCEMENT] Partially persist traces that exceed `max_bytes_per_trace` during compaction [#1317](https://github.com/grafana/tempo/pull/1317) (@joe-elliott)
* [ENHANCEMENT] Make search respect per tenant `max_bytes_per_trace` and added `skippedTraces` to returned search metrics. [#1318](https://github.com/grafana/tempo/pull/1318) (@joe-elliott)
* [ENHANCEMENT] Improve serverless consistency by forcing a GC before returning. [#1324](https://github.com/grafana/tempo/pull/1324) (@joe-elliott)
* [ENHANCEMENT] Add forwarding queue from distributor to metrics-generator. [#1331](https://github.com/grafana/tempo/pull/1331) (@mapno)
* [ENHANCEMENT] Add hedging to queries to external endpoints. [#1350](https://github.com/grafana/tempo/pull/1350) (@joe-elliott)
New config options and defaults:
```
Expand Down
50 changes: 23 additions & 27 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type Distributor struct {
generatorClientCfg generator_client.Config
generatorsRing ring.ReadRing
generatorsPool *ring_client.Pool
generatorForwarder *forwarder

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
Expand Down Expand Up @@ -181,22 +182,6 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi

subservices = append(subservices, pool)

var generatorsPool *ring_client.Pool
if metricsGeneratorEnabled {
generatorsPool = ring_client.NewPool(
"distributor_metrics_generator_pool",
generatorClientCfg.PoolConfig,
ring_client.NewRingServiceDiscovery(generatorsRing),
func(addr string) (ring_client.PoolClient, error) {
return generator_client.New(addr, generatorClientCfg)
},
metricGeneratorClients,
log.Logger,
)

subservices = append(subservices, generatorsPool)
}

// turn list into map for efficient checking
tagsToDrop := map[string]struct{}{}
for _, tag := range cfg.SearchTagsDenyList {
Expand All @@ -214,12 +199,29 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
metricsGeneratorEnabled: metricsGeneratorEnabled,
generatorClientCfg: generatorClientCfg,
generatorsRing: generatorsRing,
generatorsPool: generatorsPool,
globalTagsToDrop: tagsToDrop,
overrides: o,
traceEncoder: model.MustNewSegmentDecoder(model.CurrentEncoding),
}

if metricsGeneratorEnabled {
d.generatorsPool = ring_client.NewPool(
"distributor_metrics_generator_pool",
generatorClientCfg.PoolConfig,
ring_client.NewRingServiceDiscovery(generatorsRing),
func(addr string) (ring_client.PoolClient, error) {
return generator_client.New(addr, generatorClientCfg)
},
metricGeneratorClients,
log.Logger,
)

subservices = append(subservices, d.generatorsPool)

d.generatorForwarder = newForwarder(d.sendToGenerators, o)
subservices = append(subservices, d.generatorForwarder)
}

cfgReceivers := cfg.Receivers
if len(cfgReceivers) == 0 {
cfgReceivers = defaultReceivers
Expand Down Expand Up @@ -333,20 +335,14 @@ func (d *Distributor) PushBatches(ctx context.Context, batches []*v1.ResourceSpa
err = d.sendToIngestersViaBytes(ctx, userID, rebatchedTraces, searchData, keys)
if err != nil {
recordDiscaredSpans(err, userID, spanCount)
return nil, err
}

if d.metricsGeneratorEnabled && len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 && err == nil {
// Handle requests sent to the metrics-generator in a separate goroutine, this way we don't
// influence the overall write
go func() {
genErr := d.sendToGenerators(context.Background(), userID, keys, rebatchedTraces)
if genErr != nil {
level.Error(log.Logger).Log("msg", "pushing to metrics-generators failed", "err", genErr)
}
}()
if d.metricsGeneratorEnabled && len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
}

return nil, err // PushRequest is ignored, so no reason to create one
return nil, nil // PushRequest is ignored, so no reason to create one
}

func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*rebatchedTrace, searchData [][]byte, keys []uint32) error {
Expand Down
Loading

0 comments on commit f2406df

Please sign in to comment.