Skip to content

Commit

Permalink
Refactor metrics.TrendSink to have (mostly) private properties
Browse files Browse the repository at this point in the history
This is in preparation for HDR histograms, when the TrendSink's internals won't be the same and won't be directly accessible.
  • Loading branch information
na-- committed Jul 13, 2023
1 parent 4a39f17 commit feb4ac9
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 74 deletions.
2 changes: 1 addition & 1 deletion js/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func createTestMetrics(t *testing.T) (map[string]*metrics.Metric, *lib.Group) {
require.NoError(t, err)
checksMetric.Tainted = null.BoolFrom(false)
checksMetric.Thresholds = metrics.Thresholds{Thresholds: []*metrics.Threshold{{Source: "rate>70", LastFailed: false}}}
sink := &metrics.TrendSink{}
sink := metrics.NewTrendSink()

samples := []float64{10.0, 15.0, 20.0}
for _, s := range samples {
Expand Down
2 changes: 1 addition & 1 deletion metrics/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestNewMetric(t *testing.T) {
}{
"Counter": {Counter, &CounterSink{}},
"Gauge": {Gauge, &GaugeSink{}},
"Trend": {Trend, &TrendSink{}},
"Trend": {Trend, NewTrendSink()},
"Rate": {Rate, &RateSink{}},
}

Expand Down
8 changes: 4 additions & 4 deletions metrics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ func PushIfNotDone(ctx context.Context, output chan<- SampleContainer, sample Sa
// the summary output and then returns a map of the corresponding resolvers.
func GetResolversForTrendColumns(trendColumns []string) (map[string]func(s *TrendSink) float64, error) {
staticResolvers := map[string]func(s *TrendSink) float64{
"avg": func(s *TrendSink) float64 { return s.Avg },
"min": func(s *TrendSink) float64 { return s.Min },
"avg": func(s *TrendSink) float64 { return s.Avg() },
"min": func(s *TrendSink) float64 { return s.Min() },
"med": func(s *TrendSink) float64 { return s.P(0.5) },
"max": func(s *TrendSink) float64 { return s.Max },
"count": func(s *TrendSink) float64 { return float64(s.Count) },
"max": func(s *TrendSink) float64 { return s.Max() },
"count": func(s *TrendSink) float64 { return float64(s.Count()) },
}
dynamicResolver := func(percentile float64) func(s *TrendSink) float64 {
return func(s *TrendSink) float64 {
Expand Down
4 changes: 2 additions & 2 deletions metrics/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func TestGetResolversForTrendColumnsCalculation(t *testing.T) {
}

func createTestTrendSink(count int) *TrendSink {
sink := TrendSink{}
sink := NewTrendSink()

for i := 0; i < count; i++ {
sink.Add(Sample{Value: float64(i)})
}

return &sink
return sink
}
84 changes: 59 additions & 25 deletions metrics/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
var (
_ Sink = &CounterSink{}
_ Sink = &GaugeSink{}
_ Sink = &TrendSink{}
_ Sink = NewTrendSink()
_ Sink = &RateSink{}
)

Expand All @@ -30,7 +30,7 @@ func NewSink(mt MetricType) Sink {
case Gauge:
sink = &GaugeSink{}
case Trend:
sink = &TrendSink{}
sink = NewTrendSink()
case Rate:
sink = &RateSink{}
default:
Expand Down Expand Up @@ -88,67 +88,101 @@ func (g *GaugeSink) Format(t time.Duration) map[string]float64 {
return map[string]float64{"value": g.Value}
}

// NewTrendSink makes a Trend sink with the OpenHistogram circllhist histogram.
func NewTrendSink() *TrendSink {
return &TrendSink{}
}

type TrendSink struct {
Values []float64
values []float64
sorted bool

Count uint64
Min, Max float64
Sum, Avg float64
count uint64
min, max float64
// TODO: unexport after this dependency is removed:
// https://github.com/grafana/xk6-output-prometheus-remote/blob/v0.2.1/pkg/remotewrite/remotewrite.go#L173
Sum float64
}

// IsEmpty indicates whether the TrendSink is empty.
func (t *TrendSink) IsEmpty() bool { return t.Count == 0 }
func (t *TrendSink) IsEmpty() bool { return t.count == 0 }

func (t *TrendSink) Add(s Sample) {
if t.Count == 0 {
t.Max, t.Min = s.Value, s.Value
if t.count == 0 {
t.max, t.min = s.Value, s.Value
} else {
if s.Value > t.Max {
t.Max = s.Value
if s.Value > t.max {
t.max = s.Value
}
if s.Value < t.Min {
t.Min = s.Value
if s.Value < t.min {
t.min = s.Value
}
}

t.Values = append(t.Values, s.Value)
t.values = append(t.values, s.Value)
t.sorted = false
t.Count += 1
t.count++
t.Sum += s.Value
t.Avg = t.Sum / float64(t.Count)
}

// P calculates the given percentile from sink values.
func (t *TrendSink) P(pct float64) float64 {
switch t.Count {
switch t.count {
case 0:
return 0
case 1:
return t.Values[0]
return t.values[0]
default:
if !t.sorted {
sort.Float64s(t.Values)
sort.Float64s(t.values)
t.sorted = true
}

// If percentile falls on a value in Values slice, we return that value.
// If percentile does not fall on a value in Values slice, we calculate (linear interpolation)
// the value that would fall at percentile, given the values above and below that percentile.
i := pct * (float64(t.Count) - 1.0)
j := t.Values[int(math.Floor(i))]
k := t.Values[int(math.Ceil(i))]
i := pct * (float64(t.count) - 1.0)
j := t.values[int(math.Floor(i))]
k := t.values[int(math.Ceil(i))]
f := i - math.Floor(i)
return j + (k-j)*f
}
}

// Min returns the minimum value.
func (t *TrendSink) Min() float64 {
return t.min
}

// Max returns the maximum value.
func (t *TrendSink) Max() float64 {
return t.max
}

// Count returns the number of recorded values.
func (t *TrendSink) Count() uint64 {
return t.count
}

// Avg returns the average (i.e. mean) value.
func (t *TrendSink) Avg() float64 {
if t.count > 0 {
return t.Sum / float64(t.count)
}
return 0
}

// Total returns the total (i.e. "sum") value for all measurements.
func (t *TrendSink) Total() float64 {
return t.Sum
}

func (t *TrendSink) Format(tt time.Duration) map[string]float64 {
// TODO: respect the summaryTrendStats for REST API
return map[string]float64{
"min": t.Min,
"max": t.Max,
"avg": t.Avg,
"min": t.Min(),
"max": t.Max(),
"avg": t.Avg(),
"med": t.P(0.5),
"p(90)": t.P(0.90),
"p(95)": t.P(0.95),
Expand Down
68 changes: 34 additions & 34 deletions metrics/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestNewSink(t *testing.T) {
{mt: Counter, sink: &CounterSink{}},
{mt: Gauge, sink: &GaugeSink{}},
{mt: Rate, sink: &RateSink{}},
{mt: Trend, sink: &TrendSink{}},
{mt: Trend, sink: NewTrendSink()},
}
for _, tc := range tests {
assert.Equal(t, tc.sink, NewSink(tc.mt))
Expand Down Expand Up @@ -101,56 +101,56 @@ func TestTrendSink(t *testing.T) {
t.Run("one value", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 7.0})
assert.Equal(t, uint64(1), sink.Count)
assert.Equal(t, 7.0, sink.Min)
assert.Equal(t, 7.0, sink.Max)
assert.Equal(t, 7.0, sink.Avg)
assert.Equal(t, 7.0, sink.Sum)
assert.Equal(t, []float64{7.0}, sink.Values)
assert.Equal(t, uint64(1), sink.Count())
assert.Equal(t, 7.0, sink.Min())
assert.Equal(t, 7.0, sink.Max())
assert.Equal(t, 7.0, sink.Avg())
assert.Equal(t, 7.0, sink.Total())
assert.Equal(t, []float64{7.0}, sink.values)
})
t.Run("values", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
for _, s := range unsortedSamples10 {
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s})
}
assert.Equal(t, uint64(len(unsortedSamples10)), sink.Count)
assert.Equal(t, 0.0, sink.Min)
assert.Equal(t, 100.0, sink.Max)
assert.Equal(t, 54.0, sink.Avg)
assert.Equal(t, 540.0, sink.Sum)
assert.Equal(t, unsortedSamples10, sink.Values)
assert.Equal(t, uint64(len(unsortedSamples10)), sink.Count())
assert.Equal(t, 0.0, sink.Min())
assert.Equal(t, 100.0, sink.Max())
assert.Equal(t, 54.0, sink.Avg())
assert.Equal(t, 540.0, sink.Total())
assert.Equal(t, unsortedSamples10, sink.values)
})
t.Run("negative", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
for _, s := range []float64{-10, -20} {
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s})
}
assert.Equal(t, uint64(2), sink.Count)
assert.Equal(t, -20.0, sink.Min)
assert.Equal(t, -10.0, sink.Max)
assert.Equal(t, -15.0, sink.Avg)
assert.Equal(t, -30.0, sink.Sum)
assert.Equal(t, []float64{-10, -20}, sink.Values)
assert.Equal(t, uint64(2), sink.Count())
assert.Equal(t, -20.0, sink.Min())
assert.Equal(t, -10.0, sink.Max())
assert.Equal(t, -15.0, sink.Avg())
assert.Equal(t, -30.0, sink.Total())
assert.Equal(t, []float64{-10, -20}, sink.values)
})
t.Run("mixed", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
for _, s := range []float64{1.4, 0, -1.2} {
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s})
}
assert.Equal(t, uint64(3), sink.Count)
assert.Equal(t, -1.2, sink.Min)
assert.Equal(t, 1.4, sink.Max)
assert.Equal(t, 0.067, math.Round(sink.Avg*1000)/1000)
assert.Equal(t, 0.199, math.Floor(sink.Sum*1000)/1000)
assert.Equal(t, []float64{1.4, 0, -1.2}, sink.Values)
assert.Equal(t, uint64(3), sink.Count())
assert.Equal(t, -1.2, sink.Min())
assert.Equal(t, 1.4, sink.Max())
assert.Equal(t, 0.067, math.Round(sink.Avg()*1000)/1000)
assert.Equal(t, 0.199, math.Floor(sink.Total()*1000)/1000)
assert.Equal(t, []float64{1.4, 0, -1.2}, sink.values)
})
})

Expand All @@ -159,15 +159,15 @@ func TestTrendSink(t *testing.T) {
t.Run("no values", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
for i := 1; i <= 100; i++ {
assert.Equal(t, 0.0, sink.P(float64(i)/100.0))
}
})
t.Run("one value", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 10.0})
for i := 1; i <= 100; i++ {
assert.Equal(t, 10.0, sink.P(float64(i)/100.0))
Expand All @@ -176,7 +176,7 @@ func TestTrendSink(t *testing.T) {
t.Run("two values", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 5.0})
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 10.0})
assert.Equal(t, false, sink.sorted)
Expand All @@ -190,7 +190,7 @@ func TestTrendSink(t *testing.T) {
t.Run("more than 2", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
for _, s := range unsortedSamples10 {
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s})
}
Expand All @@ -205,7 +205,7 @@ func TestTrendSink(t *testing.T) {
t.Run("format", func(t *testing.T) {
t.Parallel()

sink := TrendSink{}
sink := NewTrendSink()
for _, s := range unsortedSamples10 {
sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s})
}
Expand Down
6 changes: 3 additions & 3 deletions metrics/thresholds.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) {
case *GaugeSink:
ts.sinked["value"] = sinkImpl.Value
case *TrendSink:
ts.sinked["min"] = sinkImpl.Min
ts.sinked["max"] = sinkImpl.Max
ts.sinked["avg"] = sinkImpl.Avg
ts.sinked["min"] = sinkImpl.Min()
ts.sinked["max"] = sinkImpl.Max()
ts.sinked["avg"] = sinkImpl.Avg()
ts.sinked["med"] = sinkImpl.P(0.5)

// Parse the percentile thresholds and insert them in
Expand Down
16 changes: 12 additions & 4 deletions metrics/thresholds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,14 @@ func TestThresholdsRunAll(t *testing.T) {
}
}

func getTrendSink(values ...float64) *TrendSink {
sink := NewTrendSink()
for _, v := range values {
sink.Add(Sample{Value: v})
}
return sink
}

func TestThresholdsRun(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -686,7 +694,7 @@ func TestThresholdsRun(t *testing.T) {
{
name: "Running threshold on trend sink with no values and passing med statement succeeds",
args: args{
sink: &TrendSink{Values: []float64{}},
sink: getTrendSink(),
thresholdExpressions: []string{"med<39"},
duration: 0,
},
Expand All @@ -696,7 +704,7 @@ func TestThresholdsRun(t *testing.T) {
{
name: "Running threshold on trend sink with no values and non passing med statement fails",
args: args{
sink: &TrendSink{Values: []float64{}},
sink: getTrendSink(),
thresholdExpressions: []string{"med>39"},
duration: 0,
},
Expand All @@ -706,7 +714,7 @@ func TestThresholdsRun(t *testing.T) {
{
name: "Running threshold on trend sink with values and passing med statement succeeds",
args: args{
sink: &TrendSink{Values: []float64{70, 80, 90}, Count: 3},
sink: getTrendSink(70, 80, 90),
thresholdExpressions: []string{"med>39"},
duration: 0,
},
Expand All @@ -716,7 +724,7 @@ func TestThresholdsRun(t *testing.T) {
{
name: "Running threshold on trend sink with values and failing med statement fails",
args: args{
sink: &TrendSink{Values: []float64{70, 80, 90}, Count: 3},
sink: getTrendSink(70, 80, 90),
thresholdExpressions: []string{"med<39"},
duration: 0,
},
Expand Down

0 comments on commit feb4ac9

Please sign in to comment.