Skip to content

Commit

Permalink
add head compaction test (prometheus#7338)
Browse files Browse the repository at this point in the history
  • Loading branch information
krasi-georgiev authored Jun 12, 2020
1 parent b5d61fb commit ab6203b
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2858,3 +2858,95 @@ func TestChunkReader_ConcurrentReads(t *testing.T) {
}
testutil.Ok(t, r.Close())
}

// TestCompactHead ensures that the head compaction
// creates a block that is ready for loading and
// does not cause data loss.
// This test:
// * opens a storage;
// * appends values;
// * compacts the head; and
// * queries the db to ensure the samples are present from the compacted head.
func TestCompactHead(t *testing.T) {
dbDir, err := ioutil.TempDir("", "testFlush")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dbDir)) }()

// Open a DB and append data to the WAL.
tsdbCfg := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: true,
}

db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
testutil.Ok(t, err)
app := db.Appender()
var expSamples []sample
maxt := 100
for i := 0; i < maxt; i++ {
val := rand.Float64()
_, err := app.Add(labels.FromStrings("a", "b"), int64(i), val)
testutil.Ok(t, err)
expSamples = append(expSamples, sample{int64(i), val})
}
testutil.Ok(t, app.Commit())

// Compact the Head to create a new block.
testutil.Ok(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
testutil.Ok(t, db.Close())

// Delete everything but the new block and
// reopen the db to query it to ensure it includes the head data.
testutil.Ok(t, deleteNonBlocks(db.Dir()))
db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.Blocks()))
testutil.Equals(t, int64(maxt), db.Head().MinTime())
defer func() { testutil.Ok(t, db.Close()) }()
querier, err := db.Querier(context.Background(), 0, int64(maxt)-1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }()

seriesSet, _, err := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"})
testutil.Ok(t, err)
var actSamples []sample

for seriesSet.Next() {
series := seriesSet.At().Iterator()
for series.Next() {
time, val := series.At()
actSamples = append(actSamples, sample{int64(time), val})
}
testutil.Ok(t, series.Err())
}
testutil.Equals(t, expSamples, actSamples)
testutil.Ok(t, seriesSet.Err())
}

func deleteNonBlocks(dbDir string) error {
dirs, err := ioutil.ReadDir(dbDir)
if err != nil {
return err
}
for _, dir := range dirs {
if ok := isBlockDir(dir); !ok {
if err := os.RemoveAll(filepath.Join(dbDir, dir.Name())); err != nil {
return err
}
}
}
dirs, err = ioutil.ReadDir(dbDir)
if err != nil {
return err
}
for _, dir := range dirs {
if ok := isBlockDir(dir); !ok {
return errors.Errorf("root folder:%v still hase non block directory:%v", dbDir, dir.Name())
}
}

return nil
}

0 comments on commit ab6203b

Please sign in to comment.