Skip to content

Commit

Permalink
fix(stream): ensure that the stream level do not have any data (dgrap…
Browse files Browse the repository at this point in the history
…h-io#1723)

While doing an incremental stream write, we should look at the first level on which there is no data. Earlier, due to a bug we were writing to a level that already has some tables.
  • Loading branch information
NamanJain8 authored Jul 7, 2021
1 parent 725913b commit c40b2e9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
1 change: 1 addition & 0 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (sw *StreamWriter) PrepareIncremental() error {
if level.NumTables > 0 {
sw.prevLevel = level.Level
isEmptyDB = false
break
}
}
if isEmptyDB {
Expand Down
41 changes: 32 additions & 9 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,16 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
}

func TestStreamWriterIncremental(t *testing.T) {
addIncremtal := func(t *testing.T, db *DB) {
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-2"),
Value: []byte("val"),
Version: 1,
}, buf)
for _, key := range keys {
KVToBuffer(&pb.KV{
Key: key,
Value: []byte("val"),
Version: 1,
}, buf)
}
// Now do an incremental stream write.
sw := db.NewStreamWriter()
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
Expand All @@ -633,7 +635,7 @@ func TestStreamWriterIncremental(t *testing.T) {
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db)
addIncremtal(t, db, [][]byte{[]byte("key-2")})

txn := db.NewTransaction(false)
defer txn.Discard()
Expand All @@ -645,10 +647,31 @@ func TestStreamWriterIncremental(t *testing.T) {
})
t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db)
addIncremtal(t, db, [][]byte{[]byte("key-1")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-2"))
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
})
})
t.Run("multiple incremental", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a2"))
require.NoError(t, err)
_, err = txn.Get([]byte("c2"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
_, err = txn.Get([]byte("c3"))
require.NoError(t, err)
})
})
Expand Down

0 comments on commit c40b2e9

Please sign in to comment.