Skip to content

Commit

Permalink
[Metrics generator] Initial implementation (grafana#1282)
Browse files Browse the repository at this point in the history
* Add metrics-generator component

* Add some initial logic to make the generator module optional

* Rename tempo_distributor_metrics_generator_clients metric

* Periodical check in: add remote-write setup

* jsonnet: rename generator -> metrics_generator

* Periodical check in: add instance, copy servicegraphprocessor over, add scrape endpoint

* Fix make test

* Support spanmetrics via OTel collector

Generator is now able to run an embedded OTel collector.
This allows to run any number of processors, which includes spanmetrics.

This collector uses a custom metrics exporter and a remote write client.
Metrics generated by the spanmetrics processor are converted to Prometheus
and appended to the remote write client's WAL, which will export the metric
data after commit.

* Implement internal spanmetrics processor

* Add latency metrics

* Only loop one for latency metrics

* Improve how metrics are collected, some bug fixes

Changes:
- Remote write was constantly failing with “out of order samples”. Fix: apparently the timestamp has to be in milliseconds, not in seconds. There is not a lot of documentation on the Prometheus interfaces, but the cortex proto has a field timestampMs.
- After this remote write was failing with label name \"le\" is not unique: invalid sample. Fix: there was a bug in the append logic of the labels array. Since you were reusing the labels variable in a for-loop, the le label was added multiple times.

Initially I thought pushing metrics on every push request was causing the out-of-order issues, so I refactored the processor a bit to have both a PushSpans and a CollectMetrics method. CollectMetrics is called every 15s with a storage.Appender. This way we can also use a single appender for multiple processors,
This wasn’t necessary to get it working, but I think this is the logic we want to end up with (i.e. don’t send samples too often to keep DPM low).

* make vendor-check

* Add spanmetrics_test.go

* jsonnet: add metrics-generator service

* Add distributor.enable_metrics_generator_ring; some bug fixes

* Remove unused Registerer and scrape endpoint code; sprinkle some more TODO's around

* Add crude mechanism to trim active series in span metrics processor

* span metrics: add delete_after_last_update

* Admit config parameters in spanmetrics processor

Histogram buckets and additional dimensions can be configured

* Initial implementation of service graphs processor

* Hook up service graph processor

Store
- make sure tests pass
- fix copy-paste error in shouldEvictHead

Service graphs
- ensure metric names align with agent
- server latencies were overwriting client latencies
- expose operational metrics

* Rename metrics from generator_processor_... to metrics_generator_processor_...

* Set up BasicLifecycler, load balance spans across metrics-generator

* Replace processor.CollectMetrics with a custom prometheus.Registerer

Changes:

Service graphs processor
- replaced code to manually build metrics with CounterVec and HistogramVec
- aligned metric names with the agent
- commented out code around collectCh, this was leaking memory

Span metrics processor
- replaced code to manually build metrics with CounterVec and HistogramVec

Introduced Registry to bridge prometheus.Registerer and storage.Appender

Add support for configuring external labels

Fix generator_test.go

* make vendor-check

* Refactor remote-write structures, add tests

* Add add_instance_id_label config

* Split write requests up to a max msg size

Builds write requests up to a max size and sends them sequentially.

* Service graphs: collect edges when completed

When an edge is completed, it's sent to be collected by a number of
goroutines that call collectEdge.

Edges can also be collected when expired.

* Store: interface: give paramaters names

* Service graphs: Close closech

* Simplify evict method name

* Move remote_write to its own method

* Add metrics_generator_processors to overrides, dynamically create/remove processors

* Add tests for split requests in remote write appender

* Refactor distributor code a bit

* Make collection interval configurable

* Add concurrency test for instance

* Minor tweaks

* Add metrics to track failed active processors updates

* Tweak remote write metrics

* Add exemplars support

* Fix latency measurement in spanmetrics processor

* Fix typo

* Change metrics generator enabled config var

* Return ready when generator is registered in the ring

* Add read-only mode during generator shutdown

* Configure docker-compose distributed to work with metrics generator

* Enable pushing bytes to metrics-generator

* Remove cortex from distributed example

* Replace cortexpb -> prompb, move all remote-write stuff into remotewrite package

* Set context timeout when collecting metrics, remove hola

* Protect readOnly from concurrency

* Update vendor

* Rename metrics prefix span_metrics to spanmetrics

This matches the naming from OTel spanmetrics processor

* Always add le="+Inf" bucket when not set already

* Fix lint errors

* Fix compactor test

Test was writing tempodb.Trace to the WAL, instead of tempodb.TraceBytes

* Move expire goroutine to collector workers

* Move metrics_generator_enabled to the top-level config

* Update docker-compose distributor example

* Use snappy from klauspost/compress

* Fix distributor test after moving metrics_generator_enabled flag

* Reorganize how processors are updated, simplify use of locks

* Stub time in processor tests

* Fix how label slices are handled in registry

* jsonnet: disable metrics-generator by default

* Regenerate kube manifests

* Pin time.Now in test with registry

* Update CHANGELOG.md

Co-authored-by: Koenraad Verheyden <[email protected]>
  • Loading branch information
mapno and Koenraad Verheyden authored Feb 21, 2022
1 parent 81f1898 commit f4175e5
Show file tree
Hide file tree
Showing 200 changed files with 37,955 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
distributors. Also, during this period, the ingesters will use considerably more resources and as such should be scaled up (or incoming traffic should be
heavily throttled). Once all distributors and ingesters have rolled performance will return to normal. Internally we have observed ~1.5x CPU load on the
ingesters during the rollout. [#1227](https://github.com/grafana/tempo/pull/1227) (@joe-elliott)
* [FEATURE] Added metrics-generator: an optional components to generate metrics from ingested traces [#1282](https://github.com/grafana/tempo/pull/1282) (@mapno, @kvrhdn)
* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
Expand Down
84 changes: 54 additions & 30 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/grafana/tempo/modules/distributor/receiver"
"github.com/grafana/tempo/modules/frontend"
frontend_v1 "github.com/grafana/tempo/modules/frontend/v1"
"github.com/grafana/tempo/modules/generator"
generator_client "github.com/grafana/tempo/modules/generator/client"
"github.com/grafana/tempo/modules/ingester"
ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
Expand All @@ -48,23 +50,26 @@ const apiDocs = "https://grafana.com/docs/tempo/latest/api_docs/"

// Config is the root config for App.
type Config struct {
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"`
SearchEnabled bool `yaml:"search_enabled,omitempty"`
HTTPAPIPrefix string `yaml:"http_api_prefix"`
UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
Frontend frontend.Config `yaml:"query_frontend,omitempty"`
Compactor compactor.Config `yaml:"compactor,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage,omitempty"`
LimitsConfig overrides.Limits `yaml:"overrides,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"`
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"`
SearchEnabled bool `yaml:"search_enabled,omitempty"`
MetricsGeneratorEnabled bool `yaml:"metrics_generator_enabled"`
HTTPAPIPrefix string `yaml:"http_api_prefix"`
UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
GeneratorClient generator_client.Config `yaml:"metrics_generator_client,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
Frontend frontend.Config `yaml:"query_frontend,omitempty"`
Compactor compactor.Config `yaml:"compactor,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
Generator generator.Config `yaml:"metrics_generator,omitempty"`
StorageConfig storage.Config `yaml:"storage,omitempty"`
LimitsConfig overrides.Limits `yaml:"overrides,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"`
}

// RegisterFlagsAndApplyDefaults registers flag.
Expand Down Expand Up @@ -108,10 +113,13 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
// Everything else
flagext.DefaultValues(&c.IngesterClient)
c.IngesterClient.GRPCClientConfig.GRPCCompression = "snappy"
flagext.DefaultValues(&c.GeneratorClient)
c.GeneratorClient.GRPCClientConfig.GRPCCompression = "snappy"
flagext.DefaultValues(&c.LimitsConfig)

c.Distributor.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "distributor"), f)
c.Ingester.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "ingester"), f)
c.Generator.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "generator"), f)
c.Querier.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "querier"), f)
c.Frontend.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "frontend"), f)
c.Compactor.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "compactor"), f)
Expand All @@ -126,14 +134,19 @@ func (c *Config) MultitenancyIsEnabled() bool {

// CheckConfig checks if config values are suspect.
func (c *Config) CheckConfig() {
if c.Target == MetricsGenerator && !c.MetricsGeneratorEnabled {
level.Warn(log.Logger).Log("msg", "target == metrics-generator but metrics_generator_enabled != true",
"explain", "The metrics-generator will only receive data if metrics_generator_enabled is set to true globally")
}

if c.Ingester.CompleteBlockTimeout < c.StorageConfig.Trace.BlocklistPoll {
level.Warn(log.Logger).Log("msg", "ingester.complete_block_timeout < storage.trace.blocklist_poll",
"explan", "You may receive 404s between the time the ingesters have flushed a trace and the querier is aware of the new block")
"explain", "You may receive 404s between the time the ingesters have flushed a trace and the querier is aware of the new block")
}

if c.Compactor.Compactor.BlockRetention < c.StorageConfig.Trace.BlocklistPoll {
level.Warn(log.Logger).Log("msg", "compactor.compaction.compacted_block_timeout < storage.trace.blocklist_poll",
"explan", "Queriers and Compactors may attempt to read a block that no longer exists")
"explain", "Queriers and Compactors may attempt to read a block that no longer exists")
}

if c.Compactor.Compactor.RetentionConcurrency == 0 {
Expand All @@ -142,7 +155,7 @@ func (c *Config) CheckConfig() {

if c.StorageConfig.Trace.Backend == "s3" && c.Compactor.Compactor.FlushSizeBytes < 5242880 {
level.Warn(log.Logger).Log("msg", "c.Compactor.Compactor.FlushSizeBytes < 5242880",
"explan", "Compaction flush size should be 5MB or higher for S3 backend")
"explain", "Compaction flush size should be 5MB or higher for S3 backend")
}

if c.StorageConfig.Trace.BlocklistPollConcurrency == 0 {
Expand All @@ -161,16 +174,18 @@ func newDefaultConfig() *Config {
type App struct {
cfg Config

Server *server.Server
ring *ring.Ring
overrides *overrides.Overrides
distributor *distributor.Distributor
querier *querier.Querier
frontend *frontend_v1.Frontend
compactor *compactor.Compactor
ingester *ingester.Ingester
store storage.Store
MemberlistKV *memberlist.KVInitService
Server *server.Server
ring *ring.Ring
generatorRing *ring.Ring
overrides *overrides.Overrides
distributor *distributor.Distributor
querier *querier.Querier
frontend *frontend_v1.Frontend
compactor *compactor.Compactor
ingester *ingester.Ingester
generator *generator.Generator
store storage.Store
MemberlistKV *memberlist.KVInitService

HTTPAuthMiddleware middleware.Interface
TracesConsumerMiddleware receiver.Middleware
Expand Down Expand Up @@ -386,6 +401,15 @@ func (t *App) readyHandler(sm *services.Manager) http.HandlerFunc {
}
}

// Generator has a special check that makes sure that it was able to register into the ring,
// and that all other ring entries are OK too.
if t.generator != nil {
if err := t.generator.CheckReady(r.Context()); err != nil {
http.Error(w, "Generator not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
}

// Query Frontend has a special check that makes sure that a querier is attached before it signals
// itself as ready
if t.frontend != nil {
Expand Down
43 changes: 41 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
frontend_v1pb "github.com/grafana/tempo/modules/frontend/v1/frontendv1pb"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/weaveworks/common/middleware"
Expand All @@ -20,6 +19,8 @@ import (
"github.com/grafana/tempo/modules/compactor"
"github.com/grafana/tempo/modules/distributor"
"github.com/grafana/tempo/modules/frontend"
frontend_v1pb "github.com/grafana/tempo/modules/frontend/v1/frontendv1pb"
"github.com/grafana/tempo/modules/generator"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/querier"
Expand All @@ -33,10 +34,12 @@ import (
// The various modules that make up tempo.
const (
Ring string = "ring"
MetricsGeneratorRing string = "metrics-generator-ring"
Overrides string = "overrides"
Server string = "server"
Distributor string = "distributor"
Ingester string = "ingester"
MetricsGenerator string = "metrics-generator"
Querier string = "querier"
QueryFrontend string = "query-frontend"
Compactor string = "compactor"
Expand Down Expand Up @@ -86,6 +89,18 @@ func (t *App) initRing() (services.Service, error) {
return t.ring, nil
}

func (t *App) initGeneratorRing() (services.Service, error) {
generatorRing, err := tempo_ring.New(t.cfg.Generator.Ring.ToRingConfig(), "metrics-generator", generator.RingKey, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create metrics-generator ring %w", err)
}
t.generatorRing = generatorRing

t.Server.HTTP.Handle("/metrics-generator/ring", t.generatorRing)

return t.generatorRing, nil
}

func (t *App) initOverrides() (services.Service, error) {
overrides, err := overrides.NewOverrides(t.cfg.LimitsConfig)
if err != nil {
Expand All @@ -104,7 +119,7 @@ func (t *App) initOverrides() (services.Service, error) {

func (t *App) initDistributor() (services.Service, error) {
// todo: make ingester client a module instead of passing the config everywhere
distributor, err := distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides, t.TracesConsumerMiddleware, t.cfg.Server.LogLevel, t.cfg.SearchEnabled, prometheus.DefaultRegisterer)
distributor, err := distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.cfg.GeneratorClient, t.generatorRing, t.overrides, t.TracesConsumerMiddleware, t.cfg.Server.LogLevel, t.cfg.SearchEnabled, t.cfg.MetricsGeneratorEnabled, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create distributor %w", err)
}
Expand Down Expand Up @@ -132,6 +147,18 @@ func (t *App) initIngester() (services.Service, error) {
return t.ingester, nil
}

func (t *App) initGenerator() (services.Service, error) {
t.cfg.Generator.Ring.ListenPort = t.cfg.Server.GRPCListenPort
generator, err := generator.New(&t.cfg.Generator, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create metrics-generator %w", err)
}
t.generator = generator

tempopb.RegisterMetricsGeneratorServer(t.Server.GRPC, t.generator)
return t.generator, nil
}

func (t *App) initQuerier() (services.Service, error) {
// validate worker config
// if we're not in single binary mode and worker address is not specified - bail
Expand Down Expand Up @@ -268,6 +295,7 @@ func (t *App) initMemberlistKV() (services.Service, error) {
t.MemberlistKV = memberlist.NewKVInitService(&t.cfg.MemberlistKV, log.Logger, dnsProvider, reg)

t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.cfg.Generator.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

Expand All @@ -282,12 +310,14 @@ func (t *App) setupModuleManager() error {
mm.RegisterModule(Server, t.initServer, modules.UserInvisibleModule)
mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule)
mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule)
mm.RegisterModule(MetricsGeneratorRing, t.initGeneratorRing, modules.UserInvisibleModule)
mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(MetricsGenerator, t.initGenerator)
mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule)
mm.RegisterModule(SingleBinary, nil)
mm.RegisterModule(ScalableSingleBinary, nil)
Expand All @@ -299,14 +329,23 @@ func (t *App) setupModuleManager() error {
MemberlistKV: {Server},
QueryFrontend: {Store, Server},
Ring: {Server, MemberlistKV},
MetricsGeneratorRing: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
MetricsGenerator: {Server, Overrides, MemberlistKV},
Querier: {Store, Ring, Overrides},
Compactor: {Store, Server, Overrides, MemberlistKV},
SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor},
ScalableSingleBinary: {SingleBinary},
}

if t.cfg.MetricsGeneratorEnabled {
// If metrics-generator is enabled, the distributor needs the metrics-generator ring
deps[Distributor] = append(deps[Distributor], MetricsGeneratorRing)
// Add the metrics generator as dependency for when target is {,scalable-}single-binary
deps[SingleBinary] = append(deps[SingleBinary], MetricsGenerator)
}

for mod, targets := range deps {
if err := mm.AddDependency(mod, targets...); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions cmd/tempo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ func loadConfig() (*app.Config, error) {
config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = "inmemory"
config.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor = 1
config.Ingester.LifecyclerConfig.Addr = "127.0.0.1"

// Generator's ring
config.Generator.Ring.KVStore.Store = "inmemory"
config.Generator.Ring.InstanceAddr = "127.0.0.1"
}

return config, nil
Expand Down
16 changes: 14 additions & 2 deletions example/docker-compose/distributed/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ services:
ports:
- "3200" # tempo

metrics_generator:
image: tempo:latest
command: "-target=metrics-generator -config.file=/etc/tempo.yaml"
restart: always
volumes:
- ./tempo-distributed.yaml:/etc/tempo.yaml
ports:
- "3200" # tempo

minio:
image: minio/minio:latest
environment:
Expand All @@ -85,19 +94,22 @@ services:

prometheus:
image: prom/prometheus:latest
command: [ "--config.file=/etc/prometheus.yaml" ]
command:
- --config.file=/etc/prometheus.yaml
- --web.enable-remote-write-receiver
volumes:
- ./prometheus.yaml:/etc/prometheus.yaml
ports:
- "9090:9090"

grafana:
image: grafana/grafana:8.1.6
image: grafana/grafana:8.3.6
volumes:
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_FEATURE_TOGGLES_ENABLE=tempoSearch tempoBackendSearch tempoServiceGraph
ports:
- "3000:3000"
4 changes: 4 additions & 0 deletions example/docker-compose/distributed/grafana-datasources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ datasources:
isDefault: false
version: 1
editable: false
uid: prometheus
- name: Tempo
type: tempo
access: proxy
Expand All @@ -20,4 +21,7 @@ datasources:
version: 1
editable: false
apiVersion: 1
jsonData:
serviceMap:
datasourceUid: prometheus
uid: tempo
1 change: 1 addition & 0 deletions example/docker-compose/distributed/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scrape_configs:
- 'ingester-2:3200'
- 'querier:3200'
- 'query-frontend:3200'
- 'metrics-generator:3200'
10 changes: 10 additions & 0 deletions example/docker-compose/distributed/tempo-distributed.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
search_enabled: true
metrics_generator_enabled: true

server:
http_listen_port: 3200
Expand Down Expand Up @@ -46,6 +47,12 @@ querier:
frontend_worker:
frontend_address: query-frontend:9095

metrics_generator:
remote_write:
enabled: true
client:
url: http://prometheus:9090/api/v1/write

storage:
trace:
backend: s3
Expand All @@ -63,3 +70,6 @@ storage:
pool:
max_workers: 100 # worker pool determines the number of parallel requests to the object store backend
queue_depth: 10000

overrides:
metrics_generator_processors: ['service-graphs', 'span-metrics']
Loading

0 comments on commit f4175e5

Please sign in to comment.