Skip to content

Commit

Permalink
Do not return flushMemtable goroutine if there are errors. Try indefi…
Browse files Browse the repository at this point in the history
…nitely until success.
  • Loading branch information
manishrjain committed Oct 4, 2018
1 parent 3240293 commit 8b6445e
Showing 1 changed file with 71 additions and 58 deletions.
129 changes: 71 additions & 58 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,73 +813,86 @@ type flushTask struct {
vptr valuePointer
}

// TODO: Ensure that this function doesn't return, or is handled by another wrapper function.
// Otherwise, we would have no goroutine which can flush memtables.
func (db *DB) flushMemtable(lc *y.Closer) error {
defer lc.Done()

for ft := range db.flushChan {
if ft.mt == nil {
return nil
}
// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
if !ft.mt.Empty() {
// Store badger head even if vptr is zero, need it for readTs
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, vptrSize)
ft.vptr.Encode(offset)

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})
}

fileID := db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
if err != nil {
return y.Wrap(err)
}

if !ft.mt.Empty() {
// Store badger head even if vptr is zero, need it for readTs
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, vptrSize)
ft.vptr.Encode(offset)
// Don't block just to sync the directory entry.
dirSyncCh := make(chan error)
go func() { dirSyncCh <- syncDir(db.opt.Dir) }()

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})
}
err = writeLevel0Table(ft.mt, fd)
dirSyncErr := <-dirSyncCh

fileID := db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
if err != nil {
return y.Wrap(err)
}
if err != nil {
db.elog.Errorf("ERROR while writing to level 0: %v", err)
return err
}
if dirSyncErr != nil {
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
return err
}

// Don't block just to sync the directory entry.
dirSyncCh := make(chan error)
go func() { dirSyncCh <- syncDir(db.opt.Dir) }()
tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
}
// We own a ref on tbl.
err = db.lc.addLevel0Table(tbl) // This will incrRef (if we don't error, sure)
tbl.DecrRef() // Releases our ref.
if err != nil {
return err
}

err = writeLevel0Table(ft.mt, fd)
dirSyncErr := <-dirSyncCh
// Update s.imm. Need a lock.
db.Lock()
defer db.Unlock()
// This is a single-threaded operation. ft.mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next ft.mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
return nil
}

if err != nil {
db.elog.Errorf("ERROR while writing to level 0: %v", err)
return err
}
if dirSyncErr != nil {
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
return err
}
// flushMemtable must keep running until we send it an empty flushTask. If there
// are errors during handling the flush task, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *y.Closer) error {
defer lc.Done()

tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
for ft := range db.flushChan {
if ft.mt == nil {
return nil // Stop this goroutine.
}
// We own a ref on tbl.
err = db.lc.addLevel0Table(tbl) // This will incrRef (if we don't error, sure)
tbl.DecrRef() // Releases our ref.
if err != nil {
return err
for {
err := db.handleFlushTask(ft)
if err == nil {
break
}
// Encounterd error. Retry indefinitely.
log.Printf("Error while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)
}

// Update s.imm. Need a lock.
db.Lock()
// This is a single-threaded operation. ft.mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next ft.mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
db.Unlock()
}
return nil
}
Expand Down

0 comments on commit 8b6445e

Please sign in to comment.