Skip to content

Commit

Permalink
[query] Add graphite function aggregateWithWildcards (m3db#2582)
Browse files Browse the repository at this point in the history
Co-authored-by: Rob Skillington <[email protected]>
  • Loading branch information
yyin-sc and robskillington authored Sep 4, 2020
1 parent f3f2864 commit 98039ce
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,25 @@ func sumSeriesWithWildcards(
return combineSeriesWithWildcards(ctx, series, positions, sumSpecificationFunc, ts.Sum)
}

// aggregateWithWildcards splits the given set of series into sub-groupings
// based on wildcard matches in the hierarchy, then aggregate the values in
// each grouping based on the given function.
func aggregateWithWildcards(
ctx *common.Context,
series singlePathSpec,
fname string,
positions ...int,
) (ts.SeriesList, error) {
f, fexists := summarizeFuncs[fname]
if !fexists {
err := errors.NewInvalidParamsError(fmt.Errorf(
"invalid func %s", fname))
return ts.NewSeriesList(), err
}

return combineSeriesWithWildcards(ctx, series, positions, f.specificationFunc, f.consolidationFunc)
}

// combineSeriesWithWildcards splits the given set of series into sub-groupings
// based on wildcard matches in the hierarchy, then combines the values in each
// sub-grouping according to the provided consolidation function
Expand Down
50 changes: 50 additions & 0 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,56 @@ func TestSumSeriesWithWildcards(t *testing.T) {
}
}

func TestAggregateWithWildcards(t *testing.T) {
var (
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end})
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start,
ts.NewConstantValues(ctx, 2, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start,
ts.NewConstantValues(ctx, 4, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-3.pod1.status.500", start,
ts.NewConstantValues(ctx, 6, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-1.pod2.status.500", start,
ts.NewConstantValues(ctx, 8, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod2.status.500", start,
ts.NewConstantValues(ctx, 10, 12, 10000)),

ts.NewSeries(ctx, "servers.foo-1.pod1.status.400", start,
ts.NewConstantValues(ctx, 20, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.400", start,
ts.NewConstantValues(ctx, 30, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-3.pod2.status.400", start,
ts.NewConstantValues(ctx, 40, 12, 10000)),
}
)
defer ctx.Close()

outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{
Values: inputs,
}, "sum", 1, 2)
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))

expectedOutputs := []struct {
name string
sumOfVals float64
}{
{"servers.status.400", 90 * 12},
{"servers.status.500", 30 * 12},
}

for i, expected := range expectedOutputs {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name())
assert.Equal(t, expected.sumOfVals, series.SafeSum())
}
}

func TestGroupByNode(t *testing.T) {
var (
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,7 @@ func init() {
})
MustRegisterFunction(sumSeries)
MustRegisterFunction(sumSeriesWithWildcards)
MustRegisterFunction(aggregateWithWildcards)
MustRegisterFunction(sustainedAbove)
MustRegisterFunction(sustainedBelow)
MustRegisterFunction(threshold).WithDefaultParams(map[uint8]interface{}{
Expand Down

0 comments on commit 98039ce

Please sign in to comment.