From c40b2e9af90225eebbfe1adbf36c6c8fa74c38d4 Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Wed, 7 Jul 2021 14:12:05 +0530 Subject: [PATCH] fix(stream): ensure that the stream level do not have any data (#1723) While doing an incremental stream write, we should look at the first level on which there is no data. Earlier, due to a bug we were writing to a level that already has some tables. --- stream_writer.go | 1 + stream_writer_test.go | 41 ++++++++++++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index 08735a453..ee0564edc 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -115,6 +115,7 @@ func (sw *StreamWriter) PrepareIncremental() error { if level.NumTables > 0 { sw.prevLevel = level.Level isEmptyDB = false + break } } if isEmptyDB { diff --git a/stream_writer_test.go b/stream_writer_test.go index eb82b8c9f..61c8861cc 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -604,14 +604,16 @@ func TestStreamWriterWithLargeValue(t *testing.T) { } func TestStreamWriterIncremental(t *testing.T) { - addIncremtal := func(t *testing.T, db *DB) { + addIncremtal := func(t *testing.T, db *DB, keys [][]byte) { buf := z.NewBuffer(10<<20, "test") defer buf.Release() - KVToBuffer(&pb.KV{ - Key: []byte("key-2"), - Value: []byte("val"), - Version: 1, - }, buf) + for _, key := range keys { + KVToBuffer(&pb.KV{ + Key: key, + Value: []byte("val"), + Version: 1, + }, buf) + } // Now do an incremental stream write. sw := db.NewStreamWriter() require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed") @@ -633,7 +635,7 @@ func TestStreamWriterIncremental(t *testing.T) { require.NoError(t, sw.Write(buf), "sw.Write() failed") require.NoError(t, sw.Flush(), "sw.Flush() failed") - addIncremtal(t, db) + addIncremtal(t, db, [][]byte{[]byte("key-2")}) txn := db.NewTransaction(false) defer txn.Discard() @@ -645,10 +647,31 @@ func TestStreamWriterIncremental(t *testing.T) { }) t.Run("incremental on empty DB", func(t *testing.T) { runBadgerTest(t, nil, func(t *testing.T, db *DB) { - addIncremtal(t, db) + addIncremtal(t, db, [][]byte{[]byte("key-1")}) txn := db.NewTransaction(false) defer txn.Discard() - _, err := txn.Get([]byte("key-2")) + _, err := txn.Get([]byte("key-1")) + require.NoError(t, err) + }) + }) + t.Run("multiple incremental", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")}) + addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")}) + addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")}) + txn := db.NewTransaction(false) + defer txn.Discard() + _, err := txn.Get([]byte("a1")) + require.NoError(t, err) + _, err = txn.Get([]byte("c1")) + require.NoError(t, err) + _, err = txn.Get([]byte("a2")) + require.NoError(t, err) + _, err = txn.Get([]byte("c2")) + require.NoError(t, err) + _, err = txn.Get([]byte("a3")) + require.NoError(t, err) + _, err = txn.Get([]byte("c3")) require.NoError(t, err) }) })