Skip to content

Commit

Permalink
MaxDataPoints consolidation improvements: support nudging for consist…
Browse files Browse the repository at this point in the history
…ent bucketing (#146)

Co-authored-by: Justin Lei <[email protected]>
  • Loading branch information
npazosmendez and leizor committed Sep 6, 2024
1 parent ace9091 commit 1b81533
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 2 deletions.
16 changes: 16 additions & 0 deletions expr/types/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package config

type ConfigType = struct {
// NudgeStartTimeOnAggregation enables nudging the start time of metrics
// when aggregated. The start time is nudged in such way that timestamps
// always fall in the same bucket. This is done by GraphiteWeb, and is
// useful to avoid jitter in graphs when refreshing the page.
NudgeStartTimeOnAggregation bool

// UseBucketsHighestTimestampOnAggregation enables using the highest timestamp of the
// buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp.
// This prevents results to appear to predict the future.
UseBucketsHighestTimestampOnAggregation bool
}

var Config = ConfigType{}
60 changes: 58 additions & 2 deletions expr/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-graphite/carbonapi/expr/consolidations"
"github.com/go-graphite/carbonapi/expr/tags"
"github.com/go-graphite/carbonapi/expr/types/config"
pbv2 "github.com/go-graphite/protocol/carbonapi_v2_pb"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
pickle "github.com/lomik/og-rek"
Expand Down Expand Up @@ -141,7 +142,7 @@ func MarshalJSON(results []*MetricData, timestampMultiplier int64, noNullPoints
b = append(b, `,"datapoints":[`...)

var innerComma bool
t := r.StartTime * timestampMultiplier
t := r.AggregatedStartTime() * timestampMultiplier
for _, v := range r.AggregatedValues() {
if noNullPoints && math.IsNaN(v) {
t += r.AggregatedTimeStep() * timestampMultiplier
Expand Down Expand Up @@ -330,6 +331,60 @@ func (r *MetricData) AggregatedTimeStep() int64 {
return r.StepTime * int64(r.ValuesPerPoint)
}

// AggregatedStartTime returns the start time of the aggregated series.
// This can be different from the original start time if NudgeStartTimeOnAggregation
// or UseBucketsHighestTimestampOnAggregation are enabled.
func (r *MetricData) AggregatedStartTime() int64 {
start := r.StartTime + r.nudgePointsCount()*r.StepTime
if config.Config.UseBucketsHighestTimestampOnAggregation {
return start + r.AggregatedTimeStep() - r.StepTime
}
return start
}

// nudgePointsCount returns the number of points to discard at the beginning of
// the series when aggregating. This is done if NudgeStartTimeOnAggregation is
// enabled, and has the purpose of assigning timestamps of a series to buckets
// consistently across different time ranges. To simplify the aggregation
// logic, we discard points at the beginning of the series so that a bucket
// starts right at the beginning. This function calculates how many points to
// discard.
func (r *MetricData) nudgePointsCount() int64 {
if !config.Config.NudgeStartTimeOnAggregation {
return 0
}

if len(r.Values) <= 2*r.ValuesPerPoint {
// There would be less than 2 points after aggregation, removing one would be too impactful.
return 0
}

// Suppose r.StartTime=4, r.StepTime=3 and aggTimeStep=6.
// - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ...
// - original buckets: -- -- --| | | | | | ...
// - aggregated buckets: -- -- --| | | ...

// We start counting our aggTimeStep buckets at absolute time r.StepTime.
// Notice the following:
// - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ...
// - bucket #: - - - 1 1 1 1 1 1 2 2 2 2 2 2 3 3 ...
// - (ts-step) % aggTimeStep: - - - 0 1 2 3 4 5 0 1 2 3 4 5 0 1 ...

// Given a timestamp 'ts', we can calculate how far it is from the beginning
// of the nearest bucket to the right by doing:
// * aggTimeStep - ((ts-r.StepTime) % aggTimeStep)
// Using this, we calculate the 'distance' from r.StartTime to the
// nearest bucket to the right. If this distance is less than aggTimeStep,
// then r.StartTime is not the beginning of a bucket. We need to discard
// dist / r.StepTime points (which could be zero if dist < r.StepTime).
aggTimeStep := r.AggregatedTimeStep()
dist := aggTimeStep - ((r.StartTime - r.StepTime) % aggTimeStep)
if dist < aggTimeStep {
return dist / r.StepTime
}
return 0
}

// GetAggregateFunction returns MetricData.AggregateFunction and set it, if it's not yet
func (r *MetricData) GetAggregateFunction() func([]float64) float64 {
if r.AggregateFunction == nil {
Expand Down Expand Up @@ -363,7 +418,8 @@ func (r *MetricData) AggregateValues() {
n := len(r.Values)/r.ValuesPerPoint + 1
aggV := make([]float64, 0, n)

v := r.Values
nudgeCount := r.nudgePointsCount()
v := r.Values[nudgeCount:]

for len(v) >= r.ValuesPerPoint {
val := aggFunc(v[:r.ValuesPerPoint])
Expand Down
222 changes: 222 additions & 0 deletions expr/types/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package types

import (
"testing"

"github.com/go-graphite/carbonapi/expr/types/config"
"github.com/stretchr/testify/assert"
)

func TestAggregatedValuesNudgedAndHighestTimestamp(t *testing.T) {

config.Config.NudgeStartTimeOnAggregation = true
config.Config.UseBucketsHighestTimestampOnAggregation = true

tests := []struct {
name string
values []float64
step int64
start int64
mdp int64
want []float64
wantStep int64
wantStart int64
}{
{
name: "empty",
values: []float64{},
step: 60,
mdp: 100,
want: []float64{},
wantStep: 60,
wantStart: 0,
},
{
name: "one point",
values: []float64{1, 2, 3, 4},
start: 10,
step: 10,
mdp: 1,
want: []float64{10},
wantStep: 40,
wantStart: 40,
},
{
name: "no nudge if few points",
values: []float64{1, 2, 3, 4},
step: 10,
start: 20,
mdp: 1,
want: []float64{10},
wantStep: 40,
wantStart: 50,
},

{
name: "should nudge the first point",
values: []float64{1, 2, 3, 4, 5, 6},
start: 20,
step: 10,
mdp: 3,
want: []float64{5, 9, 6},
wantStep: 20,
wantStart: 40,
},
{
name: "should be stable with previous",
values: []float64{2, 3, 4, 5, 6, 7},
start: 30,
step: 10,
mdp: 3,
want: []float64{5, 9, 13},
wantStep: 20,
wantStart: 40,
},
{
name: "more data",
values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14},
start: 20,
step: 10,
mdp: 3,
want: []float64{40, 50},
wantStep: 50,
wantStart: 100,
},
{
name: "even more data",
values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10.0, 11, 12, 13, 14},
start: 10,
step: 10,
mdp: 3,
want: []float64{15, 40, 50},
wantStep: 50,
wantStart: 50,
},
{
name: "skewed start time",
values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10},
start: 21,
step: 10,
mdp: 5,
want: []float64{2 + 3, 4 + 5, 6 + 7, 8 + 9, 10}, // no points discarded, bucket starts at 20
wantStep: 20,
wantStart: 31,
},
{
name: "skewed start time 2",
values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10},
start: 29,
step: 10,
mdp: 5,
want: []float64{2 + 3, 4 + 5, 6 + 7, 8 + 9, 10}, // no points discarded, bucket starts at 20
wantStep: 20,
wantStart: 39,
},
{
name: "skewed start time 3",
values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10},
start: 31,
step: 10,
mdp: 5,
want: []float64{3 + 4, 5 + 6, 7 + 8, 9 + 10}, // 1st point discarded, it belongs to the incomplete bucket (20,40)
wantStep: 20,
wantStart: 51,
},
{
name: "skewed start time no aggregation",
values: []float64{1, 2, 3, 4},
start: 31,
step: 10,
mdp: 4,
want: []float64{1, 2, 3, 4},
wantStep: 10,
wantStart: 31,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
input := MakeMetricData("test", tt.values, tt.step, tt.start)
input.ConsolidationFunc = "sum"
ConsolidateJSON(tt.mdp, []*MetricData{input})

got := input.AggregatedValues()
gotStep := input.AggregatedTimeStep()
gotStart := input.AggregatedStartTime()

assert.Equal(t, tt.want, got, "bad values")
assert.Equal(t, tt.wantStep, gotStep, "bad step")
assert.Equal(t, tt.wantStart, gotStart, "bad start")
})
}
}

func TestAggregatedValuesConfigVariants(t *testing.T) {
const start = 20
const step = 10
const mdp = 3
values := []float64{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}
const expectedStep = int64(50)
/*
ts: | | 20 | 30 | 40 | 50 | 60 | 70 | 80 | 90 | 100 | 110 | 120 | 130 | 140 |
vals: | | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 |
unaligned buckets: | | |
aligned buckets: | | |
*/

tests := []struct {
name string
nudge bool
highestTimestamp bool
want []float64
wantStart int64
}{
{
name: "nudge start and highest timestamp",
nudge: true,
highestTimestamp: true,
want: []float64{40, 50},
wantStart: 100,
},
{
name: "nudge start and not highest timestamp",
nudge: true,
highestTimestamp: false,
want: []float64{40, 50},
wantStart: 60,
},
{
name: "not nudge start and highest timestamp",
nudge: false,
highestTimestamp: true,
want: []float64{20, 45, 39},
wantStart: 60,
},
{
name: "not nudge start and not highest timestamp",
nudge: false,
highestTimestamp: false,
want: []float64{20, 45, 39},
wantStart: 20,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config.Config.NudgeStartTimeOnAggregation = tt.nudge
config.Config.UseBucketsHighestTimestampOnAggregation = tt.highestTimestamp

input := MakeMetricData("test", values, step, start)
input.ConsolidationFunc = "sum"
ConsolidateJSON(mdp, []*MetricData{input})

got := input.AggregatedValues()
gotStep := input.AggregatedTimeStep()
gotStart := input.AggregatedStartTime()

assert.Equal(t, tt.want, got, "bad values")
assert.Equal(t, expectedStep, gotStep, "bad step")
assert.Equal(t, tt.wantStart, gotStart, "bad start")
})
}
}

0 comments on commit 1b81533

Please sign in to comment.