Skip to content

Commit

Permalink
tsdb: error on series with duplicate labels (prometheus#6664)
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Pivotto <[email protected]>
  • Loading branch information
roidelapluie authored and brian-brazil committed Jan 20, 2020
1 parent 08d6ddc commit 46d1811
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 2 deletions.
14 changes: 14 additions & 0 deletions pkg/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,20 @@ func (ls Labels) Has(name string) bool {
return false
}

// HasDuplicateLabels returns whether ls has duplicate label names.
// It assumes that the labelset is sorted.
func (ls Labels) HasDuplicateLabelNames() (string, bool) {
for i, l := range ls {
if i == 0 {
continue
}
if l.Name == ls[i-1].Name {
return l.Name, true
}
}
return "", false
}

// WithoutEmpty returns the labelset without empty labels.
// May return the same labelset.
func (ls Labels) WithoutEmpty() Labels {
Expand Down
26 changes: 26 additions & 0 deletions pkg/labels/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,29 @@ func TestLabels_MatchLabels(t *testing.T) {
testutil.Equals(t, test.expected, got, "unexpected labelset for test case %d", i)
}
}

func TestLabels_HasDuplicateLabelNames(t *testing.T) {
cases := []struct {
Input Labels
Duplicate bool
LabelName string
}{
{
Input: FromMap(map[string]string{"__name__": "up", "hostname": "localhost"}),
Duplicate: false,
}, {
Input: append(
FromMap(map[string]string{"__name__": "up", "hostname": "localhost"}),
FromMap(map[string]string{"hostname": "127.0.0.1"})...,
),
Duplicate: true,
LabelName: "hostname",
},
}

for i, c := range cases {
l, d := c.Input.HasDuplicateLabelNames()
testutil.Equals(t, c.Duplicate, d, "test %d: incorrect duplicate bool", i)
testutil.Equals(t, c.LabelName, l, "test %d: incorrect label name", i)
}
}
43 changes: 42 additions & 1 deletion scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,6 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
}

func TestScrapeLoopAppend(t *testing.T) {

tests := []struct {
title string
honorLabels bool
Expand Down Expand Up @@ -1535,3 +1534,45 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
}
testutil.Equals(t, want, capp.result, "Appended samples not as expected")
}

func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
s := teststorage.New(t)
defer s.Close()

app, err := s.Appender()
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
&testScraper{},
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
0,
true,
)
defer cancel()

// We add a good and a bad metric to check that both are discarded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)

q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")

// We add a good metric to check that it is recorded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
testutil.Ok(t, err)

q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Ok(t, err)
testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
}
8 changes: 8 additions & 0 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ var (
// writable time range.
ErrOutOfBounds = errors.New("out of bounds")

// ErrInvalidSample is returned if an appended sample is not valid and can't
// be ingested.
ErrInvalidSample = errors.New("invalid sample")

// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = tombstones.NewMemTombstones()
Expand Down Expand Up @@ -921,6 +925,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()

if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}

s, created := a.head.getOrCreate(lset.Hash(), lset)
if created {
a.series = append(a.series, record.RefSeries{
Expand Down
27 changes: 26 additions & 1 deletion tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
}

func TestNewWalSegmentOnTruncate(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_segements")
dir, err := ioutil.TempDir("", "test_wal_segments")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
Expand Down Expand Up @@ -1290,3 +1290,28 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 2, last)
}

func TestAddDuplicateLabelName(t *testing.T) {
dir, err := ioutil.TempDir("", "test_duplicate_label_name")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)

h, err := NewHead(nil, nil, wlog, 1000)
testutil.Ok(t, err)
defer h.Close()

add := func(labels labels.Labels, labelName string) {
app := h.Appender()
_, err = app.Add(labels, 0, 0)
testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
}

add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "b"}}, "a")
add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a")
add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le")
}

0 comments on commit 46d1811

Please sign in to comment.