Skip to content

Commit

Permalink
Add remote read filter option
Browse files Browse the repository at this point in the history
For special remote read endpoints which have only data for specific
queries, it is desired to limit the number of queries sent to the
configured remote read endpoint to reduce latency and performance
overhead.
  • Loading branch information
grobie committed Nov 13, 2017
1 parent 434f037 commit 7098c56
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 7 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,10 @@ type RemoteReadConfig struct {
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig HTTPClientConfig `yaml:",inline"`

// RequiredMatchers is an optional list of equality matchers which have to
// be present in a selector to query the remote read endpoint.
RequiredMatchers model.LabelSet `yaml:"required_matchers,omitempty"`

// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
Expand Down
7 changes: 4 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ var expectedConf = &Config{
ReadRecent: true,
},
{
URL: mustParseURL("http://remote3/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ReadRecent: false,
URL: mustParseURL("http://remote3/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ReadRecent: false,
RequiredMatchers: model.LabelSet{"job": "special"},
},
},

Expand Down
2 changes: 2 additions & 0 deletions config/testdata/conf.good.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ remote_read:
read_recent: true
- url: http://remote3/read
read_recent: false
required_matchers:
job: special

scrape_configs:
- job_name: prometheus
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,11 @@ likely in future releases.
# The URL of the endpoint to query from.
url: <string>
# An optional list of equality matchers which have to be
# present in a selector to query the remote read endpoint.
required_matchers:
[ <labelname>: <labelvalue> ... ]
# Timeout for requests to the remote read endpoint.
[ remote_timeout: <duration> | default = 30s ]
Expand Down
41 changes: 41 additions & 0 deletions storage/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,47 @@ func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) stor
})
}

// RequiredMatchersFilter returns a storage.Queryable which creates a
// requiredMatchersQuerier.
func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := next.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil
})
}

// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls
// to match the given labelSet.
type requiredMatchersQuerier struct {
storage.Querier

requiredMatchers []*labels.Matcher
}

// Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
ms := q.requiredMatchers
for _, m := range matchers {
for i, r := range ms {
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
ms = append(ms[:i], ms[i+1:]...)
break
}
}
if len(ms) == 0 {
break
}
}
if len(ms) > 0 {
return storage.NoopSeriesSet()
}
return q.Querier.Select(matchers...)
}

// addExternalLabels adds matchers for each external label. External labels
// that already have a corresponding user-supplied matcher are skipped, as we
// assume that the user explicitly wants to select a different value for them.
Expand Down
129 changes: 125 additions & 4 deletions storage/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,21 @@ func TestSeriesSetFilter(t *testing.T) {
}
}

type testQuerier struct {
type mockQuerier struct {
ctx context.Context
mint, maxt int64

storage.Querier
}

type mockSeriesSet struct {
storage.SeriesSet
}

func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet {
return mockSeriesSet{}
}

func TestPreferLocalStorageFilter(t *testing.T) {
ctx := context.Background()

Expand All @@ -163,13 +171,13 @@ func TestPreferLocalStorageFilter(t *testing.T) {
localStartTime: int64(100),
mint: int64(0),
maxt: int64(50),
querier: testQuerier{ctx: ctx, mint: 0, maxt: 50},
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
},
{
localStartTime: int64(20),
mint: int64(0),
maxt: int64(50),
querier: testQuerier{ctx: ctx, mint: 0, maxt: 20},
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20},
},
{
localStartTime: int64(20),
Expand All @@ -182,7 +190,7 @@ func TestPreferLocalStorageFilter(t *testing.T) {
for i, test := range tests {
f := PreferLocalStorageFilter(
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return testQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
}),
func() (int64, error) { return test.localStartTime, nil },
)
Expand All @@ -197,3 +205,116 @@ func TestPreferLocalStorageFilter(t *testing.T) {
}
}
}

func TestRequiredMatchersFilter(t *testing.T) {
ctx := context.Background()

f := RequiredMatchersFilter(
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
}),
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")},
)

want := &requiredMatchersQuerier{
Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
requiredMatchers: []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")},
}
have, err := f.Querier(ctx, 0, 50)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(want, have) {
t.Errorf("expected quierer %+v, got %+v", want, have)
}
}

func TestRequiredLabelsQuerierSelect(t *testing.T) {
tests := []struct {
requiredMatchers []*labels.Matcher
matchers []*labels.Matcher
seriesSet storage.SeriesSet
}{
{
requiredMatchers: []*labels.Matcher{},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
},
seriesSet: mockSeriesSet{},
},
{
requiredMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
},
seriesSet: mockSeriesSet{},
},
{
requiredMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchRegexp, "special", "label"),
},
seriesSet: storage.NoopSeriesSet(),
},
{
requiredMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "different"),
},
seriesSet: storage.NoopSeriesSet(),
},
{
requiredMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
},
seriesSet: mockSeriesSet{},
},
{
requiredMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
mustNewLabelMatcher(labels.MatchEqual, "foo", "baz"),
},
seriesSet: storage.NoopSeriesSet(),
},
{
requiredMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
},
matchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
},
seriesSet: mockSeriesSet{},
},
}

for i, test := range tests {
q := &requiredMatchersQuerier{
Querier: mockQuerier{},
requiredMatchers: test.requiredMatchers,
}

if want, have := test.seriesSet, q.Select(test.matchers...); want != have {
t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
}
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) {
t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i)
}
}
}
16 changes: 16 additions & 0 deletions storage/remote/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -101,6 +102,9 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
var q storage.Queryable
q = QueryableClient(c)
q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels)
if len(rrConf.RequiredMatchers) > 0 {
q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers))
}
if !rrConf.ReadRecent {
q = PreferLocalStorageFilter(q, s.localStartTimeCallback)
}
Expand Down Expand Up @@ -144,3 +148,15 @@ func (s *Storage) Close() error {

return nil
}

func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher {
ms := make([]*labels.Matcher, 0, len(ls))
for k, v := range ls {
ms = append(ms, &labels.Matcher{
Type: labels.MatchEqual,
Name: string(k),
Value: string(v),
})
}
return ms
}

0 comments on commit 7098c56

Please sign in to comment.