Skip to content

Commit

Permalink
feat: add external label to athena metrics (gravitational#34731)
Browse files Browse the repository at this point in the history
* feat: add external label to athena metrics

This commit adds an `external: true` or `external: false` label to all
Prometheus metrics published by the athena audit log, indicating whether
External Audit Storage is used.
This will make is easier to filter metrics by usage of the feature, and
directly measure usage of the feature.

* initialize struct all at once
  • Loading branch information
nklaassen authored Nov 20, 2023
1 parent dff6001 commit 30db4b2
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 128 deletions.
217 changes: 124 additions & 93 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type Config struct {
// Tracer is used to create spans
Tracer oteltrace.Tracer

externalAuditStorage bool
metrics *athenaMetrics

// TODO(tobiaszheller): add FIPS config in later phase.
}

Expand Down Expand Up @@ -286,6 +289,16 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
cfg.Tracer = tracing.NoopTracer(teleport.ComponentAthena)
}

if cfg.metrics == nil {
cfg.metrics, err = newAthenaMetrics(athenaMetricsConfig{
batchInterval: cfg.BatchMaxInterval,
externalAuditStorage: cfg.externalAuditStorage,
})
if err != nil {
return trace.Wrap(err)
}
}

return nil
}

Expand Down Expand Up @@ -386,6 +399,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error {
}

func (cfg *Config) UpdateForExternalCloudAudit(ctx context.Context, spec *externalcloudaudit.ExternalCloudAuditSpec, credentialsProvider aws.CredentialsProvider) error {
cfg.externalAuditStorage = true
cfg.LocationS3 = spec.AuditEventsLongTermURI
cfg.Workgroup = spec.AthenaWorkgroup
cfg.QueryResultsS3 = spec.AthenaResultsURI
Expand Down Expand Up @@ -425,14 +439,6 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
return nil, trace.Wrap(err)
}

// metricConsumerBatchProcessingDuration is defined after checking config, because
// its bucket depends on batchMaxInterval.
metricConsumerBatchProcessingDuration := metricConsumerBatchProcessingDuration(cfg.BatchMaxInterval)

if err := metrics.RegisterPrometheusCollectors(append(prometheusCollectors, metricConsumerBatchProcessingDuration)...); err != nil {
return nil, trace.Wrap(err)
}

querier, err := newQuerier(querierConfig{
tablename: cfg.TableName,
database: cfg.Database,
Expand All @@ -451,7 +457,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) {

consumerCtx, consumerCancel := context.WithCancel(ctx)

consumer, err := newConsumer(cfg, consumerCancel, metricConsumerBatchProcessingDuration)
consumer, err := newConsumer(cfg, consumerCancel)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -500,90 +506,115 @@ func isValidUrlWithScheme(s string) (string, bool) {
return u.Scheme, true
}

func metricConsumerBatchProcessingDuration(batchInterval time.Duration) prometheus.Histogram {
batchSeconds := batchInterval.Seconds()
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchPorcessingDuration,
Help: "Duration of processing single batch of events in parquetlog",
// For 60s batch interval it will look like:
// 6.00, 12.00, 30.00, 45.00, 54.00, 59.01, 64.48, 70.47, 77.01, 84.15, 91.96, 100.49, 109.81, 120.00
// We want some visibility if batch takes very small amount of time, but we are mostly interested
// in range from 0.9*batch to 2*batch.
Buckets: append([]float64{0.1 * batchSeconds, 0.2 * batchSeconds, 0.5 * batchSeconds, 0.75 * batchSeconds}, prometheus.ExponentialBucketsRange(0.9*batchSeconds, 2*batchSeconds, 10)...),
},
)
type athenaMetricsConfig struct {
batchInterval time.Duration
externalAuditStorage bool
}

var (
consumerS3parquetFlushDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerS3FlushDuration,
Help: "Duration of flush and close of s3 parquet files in parquetlog",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^15 == 32.768 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)

consumerDeleteMessageDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerDeleteEventsDuration,
Help: "Duration of delation of events on SQS in parquetlog",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^15 == 32.768 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)

consumerBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchSize,
Help: "Size of single batch of events in parquetlog",
Buckets: prometheus.ExponentialBucketsRange(200, 100*1024*1024 /* 100 MB*/, 10),
},
)

consumerBatchCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchCount,
Help: "Number of events in single batch in parquetlog",
},
)

consumerLastProcessedTimestamp = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerLastProcessedTimestamp,
Help: "Timestamp of last finished consumer execution",
},
)

consumerAgeOfOldestProcessedMessage = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerOldestProcessedMessage,
Help: "Age of oldest processed message in seconds",
},
)

consumerNumberOfErrorsFromSQSCollect = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerCollectFailed,
Help: "Number of errors received from sqs collect",
},
)
type athenaMetrics struct {
consumerBatchProcessingDuration prometheus.Histogram
consumerS3parquetFlushDuration prometheus.Histogram
consumerDeleteMessageDuration prometheus.Histogram
consumerBatchSize prometheus.Histogram
consumerBatchCount prometheus.Counter
consumerLastProcessedTimestamp prometheus.Gauge
consumerAgeOfOldestProcessedMessage prometheus.Gauge
consumerNumberOfErrorsFromSQSCollect prometheus.Counter
}

prometheusCollectors = []prometheus.Collector{
consumerS3parquetFlushDuration, consumerDeleteMessageDuration,
consumerBatchSize, consumerBatchCount,
consumerLastProcessedTimestamp, consumerAgeOfOldestProcessedMessage,
consumerNumberOfErrorsFromSQSCollect,
}
)
func newAthenaMetrics(cfg athenaMetricsConfig) (*athenaMetrics, error) {
constLabels := prometheus.Labels{
"external": strconv.FormatBool(cfg.externalAuditStorage),
}
batchSeconds := cfg.batchInterval.Seconds()

m := &athenaMetrics{
consumerBatchProcessingDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchPorcessingDuration,
Help: "Duration of processing single batch of events in parquetlog",
// For 60s batch interval it will look like:
// 6.00, 12.00, 30.00, 45.00, 54.00, 59.01, 64.48, 70.47, 77.01, 84.15, 91.96, 100.49, 109.81, 120.00
// We want some visibility if batch takes very small amount of time, but we are mostly interested
// in range from 0.9*batch to 2*batch.
Buckets: append([]float64{0.1 * batchSeconds, 0.2 * batchSeconds, 0.5 * batchSeconds, 0.75 * batchSeconds}, prometheus.ExponentialBucketsRange(0.9*batchSeconds, 2*batchSeconds, 10)...),
ConstLabels: constLabels,
},
),
consumerS3parquetFlushDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerS3FlushDuration,
Help: "Duration of flush and close of s3 parquet files in parquetlog",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^15 == 32.768 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
ConstLabels: constLabels,
},
),
consumerDeleteMessageDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerDeleteEventsDuration,
Help: "Duration of delation of events on SQS in parquetlog",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^15 == 32.768 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
ConstLabels: constLabels,
},
),
consumerBatchSize: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchSize,
Help: "Size of single batch of events in parquetlog",
Buckets: prometheus.ExponentialBucketsRange(200, 100*1024*1024 /* 100 MB*/, 10),
ConstLabels: constLabels,
},
),
consumerBatchCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchCount,
Help: "Number of events in single batch in parquetlog",
ConstLabels: constLabels,
},
),
consumerLastProcessedTimestamp: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerLastProcessedTimestamp,
Help: "Timestamp of last finished consumer execution",
ConstLabels: constLabels,
},
),
consumerAgeOfOldestProcessedMessage: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerOldestProcessedMessage,
Help: "Age of oldest processed message in seconds",
ConstLabels: constLabels,
},
),
consumerNumberOfErrorsFromSQSCollect: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerCollectFailed,
Help: "Number of errors received from sqs collect",
ConstLabels: constLabels,
},
),
}

return m, trace.Wrap(metrics.RegisterPrometheusCollectors(
m.consumerBatchProcessingDuration,
m.consumerS3parquetFlushDuration,
m.consumerDeleteMessageDuration,
m.consumerBatchSize,
m.consumerBatchCount,
m.consumerLastProcessedTimestamp,
m.consumerAgeOfOldestProcessedMessage,
m.consumerNumberOfErrorsFromSQSCollect,
))
}
3 changes: 2 additions & 1 deletion lib/events/athena/athena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
QueueURL: "https://queue-url",
PublisherConsumerAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
metrics: &athenaMetrics{},
}
tests := []struct {
name string
Expand Down Expand Up @@ -305,7 +306,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
err := cfg.CheckAndSetDefaults(context.Background())
if tt.wantErr == "" {
require.NoError(t, err, "CheckAndSetDefaults return unexpected err")
require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator", "LogEntry", "Tracer"), cmp.AllowUnexported(Config{})))
require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator", "LogEntry", "Tracer", "metrics"), cmp.AllowUnexported(Config{})))
} else {
require.ErrorContains(t, err, tt.wantErr)
}
Expand Down
Loading

0 comments on commit 30db4b2

Please sign in to comment.