Skip to content

Commit

Permalink
Force flush memtable on log file rotate (dgraph-io#766)
Browse files Browse the repository at this point in the history
Add a new option which allows a memtable to be force flushed after N log files have been rotated.
  • Loading branch information
manishrjain authored and ashish-goswami committed May 12, 2019
1 parent 82e4521 commit 2237832
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 34 deletions.
60 changes: 42 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ type DB struct {
writeCh chan *request
flushChan chan flushTask // For flushing memtables.

// Number of log rotates since the last memtable flush. We will access this field via atomic
// functions. Since we are not going to use any 64bit atomic functions, there is no need for
// 64 bit alignment of this struct(see #311).
logRotates int32

blockWrites int32

orc *oracle
Expand Down Expand Up @@ -761,21 +766,36 @@ func (db *DB) ensureRoomForWrite() error {
var err error
db.Lock()
defer db.Unlock()
if db.mt.MemSize() < db.opt.MaxTableSize {

// Here we determine if we need to force flush memtable. Given we rotated log file, it would
// make sense to force flush a memtable, so the updated value head would have a chance to be
// pushed to L0. Otherwise, it would not go to L0, until the memtable has been fully filled,
// which can take a lot longer if the write load has fewer keys and larger values. This force
// flush, thus avoids the need to read through a lot of log files on a crash and restart.
// Above approach is quite simple with small drawback. We are calling ensureRoomForWrite before
// inserting every entry in Memtable. We will get latest db.head after all entries for a request
// are inserted in Memtable. If we have done >= db.logRotates rotations, then while inserting
// first entry in Memtable, below condition will be true and we will endup flushing old value of
// db.head. Hence we are limiting no of value log files to be read to db.logRotates only.
forceFlush := atomic.LoadInt32(&db.logRotates) >= db.opt.LogRotatesToFlush

if !forceFlush && db.mt.MemSize() < db.opt.MaxTableSize {
return nil
}

y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
select {
case db.flushChan <- flushTask{mt: db.mt, vptr: db.vhead}:
db.elog.Printf("Flushing value log to disk if async mode.")
// After every memtable flush, let's reset the counter.
atomic.StoreInt32(&db.logRotates, 0)

// Ensure value log is synced to disk so this memtable's contents wouldn't be lost.
err = db.vlog.sync(db.vhead.Fid)
if err != nil {
return err
}

db.elog.Printf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.mt.MemSize(), len(db.flushChan))
// We manage to push this task. Let's modify imm.
db.imm = append(db.imm, db.mt)
Expand Down Expand Up @@ -818,23 +838,27 @@ type flushTask struct {

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
if !ft.mt.Empty() {
// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, vptrSize)
ft.vptr.Encode(offset)

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})

// Also store lfDiscardStats before flushing memtables
discardStatsKey := y.KeyWithTs(lfDiscardStatsKey, 1)
ft.mt.Put(discardStatsKey, y.ValueStruct{Value: db.vlog.encodedDiscardStats()})
// There can be a scnerio, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
return nil
}

// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, vptrSize)
ft.vptr.Encode(offset)

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})

// Also store lfDiscardStats before flushing memtables
discardStatsKey := y.KeyWithTs(lfDiscardStatsKey, 1)
ft.mt.Put(discardStatsKey, y.ValueStruct{Value: db.vlog.encodedDiscardStats()})

fileID := db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"time"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/skl"

"github.com/dgraph-io/badger/y"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1727,6 +1728,48 @@ func TestNoCrash(t *testing.T) {
require.NoError(t, db.Close())
}

func TestForceFlushMemtable(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err, "temp dir for badger count not be created")

ops := getTestOptions(dir)
ops.ValueLogMaxEntries = 1
ops.LogRotatesToFlush = 1

db, err := Open(ops)
require.NoError(t, err, "error while openning db")
defer db.Close()

for i := 0; i < 3; i++ {
err = db.Update(func(txn *Txn) error {
return txn.Set([]byte(fmt.Sprintf("key-%d", i)), []byte(fmt.Sprintf("value-%d", i)))
})
require.NoError(t, err, "unable to set key and value")
}
time.Sleep(1 * time.Second)

// We want to make sure that memtable is flushed on disk. While flushing memtable to disk,
// latest head is also stored in it. Hence we will try to read head from disk. To make sure
// this. we will truncate all memtables.
db.Lock()
db.mt.DecrRef()
for _, mt := range db.imm {
mt.DecrRef()
}
db.imm = db.imm[:0]
db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Set it up for future writes.
db.Unlock()

// get latest value of value log head
headKey := y.KeyWithTs(head, math.MaxUint64)
vs, err := db.get(headKey)
var vptr valuePointer
vptr.Decode(vs.Value)
// Since we are inserting 3 entries and ValueLogMaxEntries is 1, there will be 3 rotation. For
// 1st and 2nd time head flushed with memtable will have fid as 0 and last time it will be 1.
require.True(t, vptr.Fid == 1, fmt.Sprintf("expected fid: %d, actual fid: %d", 1, vptr.Fid))
}

func TestMain(m *testing.M) {
// call flag.Parse() here if TestMain uses flags
go func() {
Expand Down
31 changes: 20 additions & 11 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ type Options struct {
// How many versions to keep per key.
NumVersionsToKeep int

// Open the DB as read-only. With this set, multiple processes can
// open the same Badger DB. Note: if the DB being opened had crashed
// before and has vlog data to be replayed, ReadOnly will cause Open
// to fail with an appropriate message.
ReadOnly bool

// Truncate value log to delete corrupt data, if any. Would not truncate if ReadOnly is set.
Truncate bool

// DB-specific logger which will override the global logger.
Logger Logger

// 3. Flags that user might want to review
// ----------------------------------------
// The following affect all levels of LSM tree.
Expand Down Expand Up @@ -89,6 +101,13 @@ type Options struct {
// efficient when the DB is opened later.
CompactL0OnClose bool

// After this many number of value log file rotates, there would be a force flushing of memtable
// to disk. This is useful in write loads with fewer keys and larger values. This work load
// would fill up the value logs quickly, while not filling up the Memtables. Thus, on a crash
// and restart, the value log head could cause the replay of a good number of value log files
// which can slow things on start.
LogRotatesToFlush int32

// Transaction start and commit timestamps are managed by end-user.
// This is only useful for databases built on top of Badger (like Dgraph).
// Not recommended for most users.
Expand All @@ -99,17 +118,6 @@ type Options struct {
maxBatchCount int64 // max entries in batch
maxBatchSize int64 // max batch size in bytes

// Open the DB as read-only. With this set, multiple processes can
// open the same Badger DB. Note: if the DB being opened had crashed
// before and has vlog data to be replayed, ReadOnly will cause Open
// to fail with an appropriate message.
ReadOnly bool

// Truncate value log to delete corrupt data, if any. Would not truncate if ReadOnly is set.
Truncate bool

// DB-specific logger which will override the global logger.
Logger Logger
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down Expand Up @@ -140,6 +148,7 @@ var DefaultOptions = Options{
ValueThreshold: 32,
Truncate: false,
Logger: defaultLogger,
LogRotatesToFlush: 2,
}

// LSMOnlyOptions follows from DefaultOptions, but sets a higher ValueThreshold
Expand Down
1 change: 1 addition & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ func (vlog *valueLog) write(reqs []*request) error {
return err
}
curlf = newlf
atomic.AddInt32(&vlog.db.logRotates, 1)
}
return nil
}
Expand Down
22 changes: 17 additions & 5 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"sync"
"testing"
"time"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
Expand Down Expand Up @@ -434,6 +435,8 @@ func TestPersistLFDiscardStats(t *testing.T) {
opt := getTestOptions(dir)
opt.ValueLogFileSize = 1 << 20
opt.Truncate = true
// avoid compaction on close, so that discard map remains same
opt.CompactL0OnClose = false

db, err := Open(opt)
require.NoError(t, err)
Expand All @@ -449,26 +452,34 @@ func TestPersistLFDiscardStats(t *testing.T) {
txn = db.NewTransaction(true)
}
}
require.NoError(t, txn.Commit())
require.NoError(t, txn.Commit(), "error while commiting txn")

for i := 0; i < 500; i++ {
txnDelete(t, db, []byte(fmt.Sprintf("key%d", i)))
// use SetWithDiscard to delete entries, because this causes data to be flushed on disk,
// creating SSTs. Simple Delete was having data in Memtables only.
err = db.Update(func(txn *Txn) error {
return txn.SetWithDiscard([]byte(fmt.Sprintf("key%d", i)), v, byte(0))
})
require.NoError(t, err)
}

err = db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
require.NoError(t, err)
// wait for compaction to complete
time.Sleep(1 * time.Second)

persistedMap := make(map[uint32]int64)
db.vlog.lfDiscardStats.Lock()
for k, v := range db.vlog.lfDiscardStats.m {
persistedMap[k] = v
}
db.vlog.lfDiscardStats.Unlock()
err = db.Close()
require.NoError(t, err)

db, err = Open(opt)
require.NoError(t, err)
defer db.Close()
require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m))
require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m),
"Discard maps are not equal")
}

func TestChecksums(t *testing.T) {
Expand Down Expand Up @@ -727,6 +738,7 @@ func TestPenultimateLogCorruption(t *testing.T) {
opt.ValueLogLoadingMode = options.FileIO
// Each txn generates at least two entries. 3 txns will fit each file.
opt.ValueLogMaxEntries = 5
opt.LogRotatesToFlush = 1000

db0, err := Open(opt)
require.NoError(t, err)
Expand Down

0 comments on commit 2237832

Please sign in to comment.