From feb4ac9d42527638c1df17a3e6b36216ea8ffb31 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 12 Jul 2023 18:40:55 +0300 Subject: [PATCH] Refactor metrics.TrendSink to have (mostly) private properties This is in preparation for HDR histograms, when the TrendSink's internals won't be the same and won't be directly accessible. --- js/summary_test.go | 2 +- metrics/metric_test.go | 2 +- metrics/sample.go | 8 ++-- metrics/sample_test.go | 4 +- metrics/sink.go | 84 ++++++++++++++++++++++++++------------ metrics/sink_test.go | 68 +++++++++++++++--------------- metrics/thresholds.go | 6 +-- metrics/thresholds_test.go | 16 ++++++-- 8 files changed, 116 insertions(+), 74 deletions(-) diff --git a/js/summary_test.go b/js/summary_test.go index 2f30669a0d4..a43f68aa4b0 100644 --- a/js/summary_test.go +++ b/js/summary_test.go @@ -150,7 +150,7 @@ func createTestMetrics(t *testing.T) (map[string]*metrics.Metric, *lib.Group) { require.NoError(t, err) checksMetric.Tainted = null.BoolFrom(false) checksMetric.Thresholds = metrics.Thresholds{Thresholds: []*metrics.Threshold{{Source: "rate>70", LastFailed: false}}} - sink := &metrics.TrendSink{} + sink := metrics.NewTrendSink() samples := []float64{10.0, 15.0, 20.0} for _, s := range samples { diff --git a/metrics/metric_test.go b/metrics/metric_test.go index 4b29ebe54e7..cfd2d751022 100644 --- a/metrics/metric_test.go +++ b/metrics/metric_test.go @@ -15,7 +15,7 @@ func TestNewMetric(t *testing.T) { }{ "Counter": {Counter, &CounterSink{}}, "Gauge": {Gauge, &GaugeSink{}}, - "Trend": {Trend, &TrendSink{}}, + "Trend": {Trend, NewTrendSink()}, "Rate": {Rate, &RateSink{}}, } diff --git a/metrics/sample.go b/metrics/sample.go index 81c1a942afd..3d1e873821b 100644 --- a/metrics/sample.go +++ b/metrics/sample.go @@ -140,11 +140,11 @@ func PushIfNotDone(ctx context.Context, output chan<- SampleContainer, sample Sa // the summary output and then returns a map of the corresponding resolvers. func GetResolversForTrendColumns(trendColumns []string) (map[string]func(s *TrendSink) float64, error) { staticResolvers := map[string]func(s *TrendSink) float64{ - "avg": func(s *TrendSink) float64 { return s.Avg }, - "min": func(s *TrendSink) float64 { return s.Min }, + "avg": func(s *TrendSink) float64 { return s.Avg() }, + "min": func(s *TrendSink) float64 { return s.Min() }, "med": func(s *TrendSink) float64 { return s.P(0.5) }, - "max": func(s *TrendSink) float64 { return s.Max }, - "count": func(s *TrendSink) float64 { return float64(s.Count) }, + "max": func(s *TrendSink) float64 { return s.Max() }, + "count": func(s *TrendSink) float64 { return float64(s.Count()) }, } dynamicResolver := func(percentile float64) func(s *TrendSink) float64 { return func(s *TrendSink) float64 { diff --git a/metrics/sample_test.go b/metrics/sample_test.go index 2c2042f7f32..f5654e7ea7a 100644 --- a/metrics/sample_test.go +++ b/metrics/sample_test.go @@ -99,11 +99,11 @@ func TestGetResolversForTrendColumnsCalculation(t *testing.T) { } func createTestTrendSink(count int) *TrendSink { - sink := TrendSink{} + sink := NewTrendSink() for i := 0; i < count; i++ { sink.Add(Sample{Value: float64(i)}) } - return &sink + return sink } diff --git a/metrics/sink.go b/metrics/sink.go index ae4126c700f..046a6bfee6a 100644 --- a/metrics/sink.go +++ b/metrics/sink.go @@ -10,7 +10,7 @@ import ( var ( _ Sink = &CounterSink{} _ Sink = &GaugeSink{} - _ Sink = &TrendSink{} + _ Sink = NewTrendSink() _ Sink = &RateSink{} ) @@ -30,7 +30,7 @@ func NewSink(mt MetricType) Sink { case Gauge: sink = &GaugeSink{} case Trend: - sink = &TrendSink{} + sink = NewTrendSink() case Rate: sink = &RateSink{} default: @@ -88,67 +88,101 @@ func (g *GaugeSink) Format(t time.Duration) map[string]float64 { return map[string]float64{"value": g.Value} } +// NewTrendSink makes a Trend sink with the OpenHistogram circllhist histogram. +func NewTrendSink() *TrendSink { + return &TrendSink{} +} + type TrendSink struct { - Values []float64 + values []float64 sorted bool - Count uint64 - Min, Max float64 - Sum, Avg float64 + count uint64 + min, max float64 + // TODO: unexport after this dependency is removed: + // https://github.com/grafana/xk6-output-prometheus-remote/blob/v0.2.1/pkg/remotewrite/remotewrite.go#L173 + Sum float64 } // IsEmpty indicates whether the TrendSink is empty. -func (t *TrendSink) IsEmpty() bool { return t.Count == 0 } +func (t *TrendSink) IsEmpty() bool { return t.count == 0 } func (t *TrendSink) Add(s Sample) { - if t.Count == 0 { - t.Max, t.Min = s.Value, s.Value + if t.count == 0 { + t.max, t.min = s.Value, s.Value } else { - if s.Value > t.Max { - t.Max = s.Value + if s.Value > t.max { + t.max = s.Value } - if s.Value < t.Min { - t.Min = s.Value + if s.Value < t.min { + t.min = s.Value } } - t.Values = append(t.Values, s.Value) + t.values = append(t.values, s.Value) t.sorted = false - t.Count += 1 + t.count++ t.Sum += s.Value - t.Avg = t.Sum / float64(t.Count) } // P calculates the given percentile from sink values. func (t *TrendSink) P(pct float64) float64 { - switch t.Count { + switch t.count { case 0: return 0 case 1: - return t.Values[0] + return t.values[0] default: if !t.sorted { - sort.Float64s(t.Values) + sort.Float64s(t.values) t.sorted = true } // If percentile falls on a value in Values slice, we return that value. // If percentile does not fall on a value in Values slice, we calculate (linear interpolation) // the value that would fall at percentile, given the values above and below that percentile. - i := pct * (float64(t.Count) - 1.0) - j := t.Values[int(math.Floor(i))] - k := t.Values[int(math.Ceil(i))] + i := pct * (float64(t.count) - 1.0) + j := t.values[int(math.Floor(i))] + k := t.values[int(math.Ceil(i))] f := i - math.Floor(i) return j + (k-j)*f } } +// Min returns the minimum value. +func (t *TrendSink) Min() float64 { + return t.min +} + +// Max returns the maximum value. +func (t *TrendSink) Max() float64 { + return t.max +} + +// Count returns the number of recorded values. +func (t *TrendSink) Count() uint64 { + return t.count +} + +// Avg returns the average (i.e. mean) value. +func (t *TrendSink) Avg() float64 { + if t.count > 0 { + return t.Sum / float64(t.count) + } + return 0 +} + +// Total returns the total (i.e. "sum") value for all measurements. +func (t *TrendSink) Total() float64 { + return t.Sum +} + func (t *TrendSink) Format(tt time.Duration) map[string]float64 { // TODO: respect the summaryTrendStats for REST API return map[string]float64{ - "min": t.Min, - "max": t.Max, - "avg": t.Avg, + "min": t.Min(), + "max": t.Max(), + "avg": t.Avg(), "med": t.P(0.5), "p(90)": t.P(0.90), "p(95)": t.P(0.95), diff --git a/metrics/sink_test.go b/metrics/sink_test.go index d17c656738f..5f872e70e9a 100644 --- a/metrics/sink_test.go +++ b/metrics/sink_test.go @@ -19,7 +19,7 @@ func TestNewSink(t *testing.T) { {mt: Counter, sink: &CounterSink{}}, {mt: Gauge, sink: &GaugeSink{}}, {mt: Rate, sink: &RateSink{}}, - {mt: Trend, sink: &TrendSink{}}, + {mt: Trend, sink: NewTrendSink()}, } for _, tc := range tests { assert.Equal(t, tc.sink, NewSink(tc.mt)) @@ -101,56 +101,56 @@ func TestTrendSink(t *testing.T) { t.Run("one value", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 7.0}) - assert.Equal(t, uint64(1), sink.Count) - assert.Equal(t, 7.0, sink.Min) - assert.Equal(t, 7.0, sink.Max) - assert.Equal(t, 7.0, sink.Avg) - assert.Equal(t, 7.0, sink.Sum) - assert.Equal(t, []float64{7.0}, sink.Values) + assert.Equal(t, uint64(1), sink.Count()) + assert.Equal(t, 7.0, sink.Min()) + assert.Equal(t, 7.0, sink.Max()) + assert.Equal(t, 7.0, sink.Avg()) + assert.Equal(t, 7.0, sink.Total()) + assert.Equal(t, []float64{7.0}, sink.values) }) t.Run("values", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range unsortedSamples10 { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } - assert.Equal(t, uint64(len(unsortedSamples10)), sink.Count) - assert.Equal(t, 0.0, sink.Min) - assert.Equal(t, 100.0, sink.Max) - assert.Equal(t, 54.0, sink.Avg) - assert.Equal(t, 540.0, sink.Sum) - assert.Equal(t, unsortedSamples10, sink.Values) + assert.Equal(t, uint64(len(unsortedSamples10)), sink.Count()) + assert.Equal(t, 0.0, sink.Min()) + assert.Equal(t, 100.0, sink.Max()) + assert.Equal(t, 54.0, sink.Avg()) + assert.Equal(t, 540.0, sink.Total()) + assert.Equal(t, unsortedSamples10, sink.values) }) t.Run("negative", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range []float64{-10, -20} { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } - assert.Equal(t, uint64(2), sink.Count) - assert.Equal(t, -20.0, sink.Min) - assert.Equal(t, -10.0, sink.Max) - assert.Equal(t, -15.0, sink.Avg) - assert.Equal(t, -30.0, sink.Sum) - assert.Equal(t, []float64{-10, -20}, sink.Values) + assert.Equal(t, uint64(2), sink.Count()) + assert.Equal(t, -20.0, sink.Min()) + assert.Equal(t, -10.0, sink.Max()) + assert.Equal(t, -15.0, sink.Avg()) + assert.Equal(t, -30.0, sink.Total()) + assert.Equal(t, []float64{-10, -20}, sink.values) }) t.Run("mixed", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range []float64{1.4, 0, -1.2} { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } - assert.Equal(t, uint64(3), sink.Count) - assert.Equal(t, -1.2, sink.Min) - assert.Equal(t, 1.4, sink.Max) - assert.Equal(t, 0.067, math.Round(sink.Avg*1000)/1000) - assert.Equal(t, 0.199, math.Floor(sink.Sum*1000)/1000) - assert.Equal(t, []float64{1.4, 0, -1.2}, sink.Values) + assert.Equal(t, uint64(3), sink.Count()) + assert.Equal(t, -1.2, sink.Min()) + assert.Equal(t, 1.4, sink.Max()) + assert.Equal(t, 0.067, math.Round(sink.Avg()*1000)/1000) + assert.Equal(t, 0.199, math.Floor(sink.Total()*1000)/1000) + assert.Equal(t, []float64{1.4, 0, -1.2}, sink.values) }) }) @@ -159,7 +159,7 @@ func TestTrendSink(t *testing.T) { t.Run("no values", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for i := 1; i <= 100; i++ { assert.Equal(t, 0.0, sink.P(float64(i)/100.0)) } @@ -167,7 +167,7 @@ func TestTrendSink(t *testing.T) { t.Run("one value", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 10.0}) for i := 1; i <= 100; i++ { assert.Equal(t, 10.0, sink.P(float64(i)/100.0)) @@ -176,7 +176,7 @@ func TestTrendSink(t *testing.T) { t.Run("two values", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 5.0}) sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 10.0}) assert.Equal(t, false, sink.sorted) @@ -190,7 +190,7 @@ func TestTrendSink(t *testing.T) { t.Run("more than 2", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range unsortedSamples10 { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } @@ -205,7 +205,7 @@ func TestTrendSink(t *testing.T) { t.Run("format", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range unsortedSamples10 { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } diff --git a/metrics/thresholds.go b/metrics/thresholds.go index c96d9bb713c..44badb4a10d 100644 --- a/metrics/thresholds.go +++ b/metrics/thresholds.go @@ -185,9 +185,9 @@ func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) { case *GaugeSink: ts.sinked["value"] = sinkImpl.Value case *TrendSink: - ts.sinked["min"] = sinkImpl.Min - ts.sinked["max"] = sinkImpl.Max - ts.sinked["avg"] = sinkImpl.Avg + ts.sinked["min"] = sinkImpl.Min() + ts.sinked["max"] = sinkImpl.Max() + ts.sinked["avg"] = sinkImpl.Avg() ts.sinked["med"] = sinkImpl.P(0.5) // Parse the percentile thresholds and insert them in diff --git a/metrics/thresholds_test.go b/metrics/thresholds_test.go index c119d4404e8..21faa24a607 100644 --- a/metrics/thresholds_test.go +++ b/metrics/thresholds_test.go @@ -639,6 +639,14 @@ func TestThresholdsRunAll(t *testing.T) { } } +func getTrendSink(values ...float64) *TrendSink { + sink := NewTrendSink() + for _, v := range values { + sink.Add(Sample{Value: v}) + } + return sink +} + func TestThresholdsRun(t *testing.T) { t.Parallel() @@ -686,7 +694,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with no values and passing med statement succeeds", args: args{ - sink: &TrendSink{Values: []float64{}}, + sink: getTrendSink(), thresholdExpressions: []string{"med<39"}, duration: 0, }, @@ -696,7 +704,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with no values and non passing med statement fails", args: args{ - sink: &TrendSink{Values: []float64{}}, + sink: getTrendSink(), thresholdExpressions: []string{"med>39"}, duration: 0, }, @@ -706,7 +714,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with values and passing med statement succeeds", args: args{ - sink: &TrendSink{Values: []float64{70, 80, 90}, Count: 3}, + sink: getTrendSink(70, 80, 90), thresholdExpressions: []string{"med>39"}, duration: 0, }, @@ -716,7 +724,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with values and failing med statement fails", args: args{ - sink: &TrendSink{Values: []float64{70, 80, 90}, Count: 3}, + sink: getTrendSink(70, 80, 90), thresholdExpressions: []string{"med<39"}, duration: 0, },