Skip to content

Commit

Permalink
Unify Iterator interfaces. All point to storage now.
Browse files Browse the repository at this point in the history
This is part of prometheus#5882 that can be done to simplify things.
All todos I added will be fixed in follow up PRs.

* querier.Querier, querier.Appender, querier.SeriesSet, and querier.Series interfaces merged
with storage interface.go. All imports that.
* querier.SeriesIterator replaced by chunkenc.Iterator
* Added chunkenc.Iterator.Seek method and tests for xor implementation (?)
* Since we properly handle SelectParams for Select methods I adjusted min max
based on that. This should help in terms of performance for queries with functions like offset.
* added Seek to deletedIterator and test.
* storage/tsdb was removed as it was only a unnecessary glue with incompatible structs.

No logic was changed, only different source of abstractions, so no need for benchmarks.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Feb 17, 2020
1 parent 489a9aa commit 3442676
Show file tree
Hide file tree
Showing 40 changed files with 938 additions and 1,055 deletions.
2 changes: 1 addition & 1 deletion cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/web"
)
Expand Down
10 changes: 2 additions & 8 deletions promql/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,7 @@ func (t *Test) exec(tc testCommand) error {
t.clear()

case *loadCmd:
app, err := t.storage.Appender()
if err != nil {
return err
}
app := t.storage.Appender()
if err := cmd.append(app); err != nil {
app.Rollback()
return err
Expand Down Expand Up @@ -641,10 +638,7 @@ func (ll *LazyLoader) clear() {

// appendTill appends the defined time series to the storage till the given timestamp (in milliseconds).
func (ll *LazyLoader) appendTill(ts int64) error {
app, err := ll.storage.Appender()
if err != nil {
return err
}
app := ll.storage.Appender()
for h, smpls := range ll.loadCmd.defs {
m := ll.loadCmd.metrics[h]
for i, s := range smpls {
Expand Down
3 changes: 2 additions & 1 deletion promql/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -275,7 +276,7 @@ func (ss *StorageSeries) Labels() labels.Labels {
}

// Iterator returns a new iterator of the data of the series.
func (ss *StorageSeries) Iterator() storage.SeriesIterator {
func (ss *StorageSeries) Iterator() chunkenc.Iterator {
return newStorageSeriesIterator(ss.series)
}

Expand Down
22 changes: 4 additions & 18 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
numDuplicates = 0
)

app, err := g.opts.Appendable.Appender()
if err != nil {
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}

app := g.opts.Appendable.Appender()
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
for _, s := range vector {
if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
Expand Down Expand Up @@ -645,14 +640,10 @@ func (g *Group) cleanupStaleSeries(ts time.Time) {
if len(g.staleSeries) == 0 {
return
}
app, err := g.opts.Appendable.Appender()
if err != nil {
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}
app := g.opts.Appendable.Appender()
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
_, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
Expand Down Expand Up @@ -836,11 +827,6 @@ type Manager struct {
logger log.Logger
}

// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}

// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)

Expand All @@ -850,7 +836,7 @@ type ManagerOptions struct {
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable Appendable
Appendable storage.Appendable
TSDB storage.Storage
Logger log.Logger
Registerer prometheus.Registerer
Expand Down
4 changes: 2 additions & 2 deletions rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestStaleness(t *testing.T) {
})

// A time series that has two samples and then goes stale.
app, _ := storage.Appender()
app := storage.Appender()
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
Expand Down Expand Up @@ -868,7 +868,7 @@ func TestNotify(t *testing.T) {
Opts: opts,
})

app, _ := storage.Appender()
app := storage.Appender()
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3)
Expand Down
33 changes: 23 additions & 10 deletions scrape/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import (

type nopAppendable struct{}

func (a nopAppendable) Appender() (storage.Appender, error) {
return nopAppender{}, nil
func (a nopAppendable) Appender() storage.Appender {
return nopAppender{}
}

type nopAppender struct{}

func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil }
func (a nopAppender) AddFast(labels.Labels, uint64, int64, float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil }
func (a nopAppender) AddFast(uint64, int64, float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }

type sample struct {
metric labels.Labels
Expand All @@ -42,18 +42,21 @@ type sample struct {
type collectResultAppender struct {
next storage.Appender
result []sample

mapper map[uint64]labels.Labels
}

func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error {
func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error {
if a.next == nil {
return storage.ErrNotFound
}
err := a.next.AddFast(m, ref, t, v)

err := a.next.AddFast(ref, t, v)
if err != nil {
return err
}
a.result = append(a.result, sample{
metric: m,
metric: a.mapper[ref],
t: t,
v: v,
})
Expand All @@ -69,7 +72,17 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64
if a.next == nil {
return 0, nil
}
return a.next.Add(m, t, v)

if a.mapper == nil {
a.mapper = map[uint64]labels.Labels{}
}

ref, err := a.next.Add(m, t, v)
if err != nil {
return 0, err
}
a.mapper[ref] = m
return ref, nil
}

func (a *collectResultAppender) Commit() error { return nil }
Expand Down
9 changes: 2 additions & 7 deletions scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,8 @@ func (mc *MetadataMetricsCollector) Collect(ch chan<- prometheus.Metric) {
}
}

// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}

// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
func NewManager(logger log.Logger, app storage.Appendable) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -127,7 +122,7 @@ func NewManager(logger log.Logger, app Appendable) *Manager {
// when receiving new target groups form the discovery manager.
type Manager struct {
logger log.Logger
append Appendable
append storage.Appendable
graceShut chan struct{}

jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup.
Expand Down
16 changes: 5 additions & 11 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func init() {

// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable Appendable
appendable storage.Appendable
logger log.Logger

mtx sync.RWMutex
Expand Down Expand Up @@ -187,7 +187,7 @@ const maxAheadTime = 10 * time.Minute

type labelsMutator func(labels.Labels) labels.Labels

func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
targetScrapePools.Inc()
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -228,13 +228,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64,
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func() storage.Appender {
app, err := app.Appender()
if err != nil {
panic(err)
}
return appender(app, opts.limit)
},
func() storage.Appender { return appender(app.Appender(), opts.limit) },
cache,
jitterSeed,
opts.honorTimestamps,
Expand Down Expand Up @@ -1112,7 +1106,7 @@ loop:
}
ce, ok := sl.cache.get(yoloString(met))
if ok {
switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
switch err = app.AddFast(ce.ref, t, v); err {
case nil:
if tp == nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
Expand Down Expand Up @@ -1323,7 +1317,7 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ce, ok := sl.cache.get(s)
if ok {
err := app.AddFast(ce.lset, ce.ref, t, v)
err := app.AddFast(ce.ref, t, v)
switch err {
case nil:
return nil
Expand Down
33 changes: 13 additions & 20 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

app, err := s.Appender()
testutil.Ok(t, err)
app := s.Appender()

ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
Expand Down Expand Up @@ -788,8 +787,7 @@ func TestScrapeLoopCache(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

sapp, err := s.Appender()
testutil.Ok(t, err)
sapp := s.Appender()

appender := &collectResultAppender{next: sapp}
var (
Expand Down Expand Up @@ -866,8 +864,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

sapp, err := s.Appender()
testutil.Ok(t, err)
sapp := s.Appender()

appender := &collectResultAppender{next: sapp}
var (
Expand Down Expand Up @@ -1092,8 +1089,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

app, err := s.Appender()
testutil.Ok(t, err)
app := s.Appender()

capp := &collectResultAppender{next: app}

Expand All @@ -1108,7 +1104,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
)

now := time.Now()
_, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now)
_, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now)
testutil.Ok(t, err)

_, _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
Expand Down Expand Up @@ -1273,8 +1269,8 @@ func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, e
}
}

func (app *errorAppender) AddFast(lset labels.Labels, ref uint64, t int64, v float64) error {
return app.collectResultAppender.AddFast(lset, ref, t, v)
func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error {
return app.collectResultAppender.AddFast(ref, t, v)
}

func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
Expand Down Expand Up @@ -1498,8 +1494,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

app, err := s.Appender()
testutil.Ok(t, err)
app := s.Appender()

capp := &collectResultAppender{next: app}

Expand All @@ -1513,7 +1508,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
)

now := time.Now()
_, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
_, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err)

want := []sample{
Expand All @@ -1530,8 +1525,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

app, err := s.Appender()
testutil.Ok(t, err)
app := s.Appender()

capp := &collectResultAppender{next: app}

Expand All @@ -1545,7 +1539,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
)

now := time.Now()
_, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
_, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err)

want := []sample{
Expand All @@ -1562,8 +1556,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

app, err := s.Appender()
testutil.Ok(t, err)
app := s.Appender()

ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
Expand All @@ -1579,7 +1572,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
defer cancel()

// We add a good and a bad metric to check that both are discarded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
_, _, _, err := sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)

q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
Expand Down
Loading

0 comments on commit 3442676

Please sign in to comment.