Skip to content

Commit

Permalink
fix(drop): drain handed skiplists before stopping memory flush (dgrap…
Browse files Browse the repository at this point in the history
…h-io#1762)

Fixes the race condition. For details see PR dgraph-io#1762
  • Loading branch information
NamanJain8 authored Dec 2, 2021
1 parent 32912a9 commit 3f320f5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 9 deletions.
60 changes: 51 additions & 9 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type DB struct {
lc *levelsController
vlog valueLog
writeCh chan *request
sklCh chan *handoverRequest
flushChan chan flushTask // For flushing memtables.
closeOnce sync.Once // For closing DB only once.

Expand Down Expand Up @@ -245,6 +246,7 @@ func Open(opt Options) (*DB, error) {
imm: make([]*memTable, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
sklCh: make(chan *handoverRequest),
opt: opt,
manifest: manifestFile,
dirLockGuard: dirLockGuard,
Expand Down Expand Up @@ -383,8 +385,9 @@ func Open(opt Options) (*DB, error) {
return db, errors.Wrapf(err, "While setting banned keys")
}

db.closers.writes = z.NewCloser(1)
db.closers.writes = z.NewCloser(2)
go db.doWrites(db.closers.writes)
go db.handleHandovers(db.closers.writes)

if !db.opt.InMemory {
db.closers.valueGC = z.NewCloser(1)
Expand Down Expand Up @@ -555,6 +558,7 @@ func (db *DB) close() (err error) {

// Don't accept any more write.
close(db.writeCh)
close(db.sklCh)

db.closers.pub.SignalAndWait()
db.closers.cacheHealth.Signal()
Expand Down Expand Up @@ -875,6 +879,19 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
return req, nil
}

func (db *DB) handleHandovers(lc *z.Closer) {
defer lc.Done()
for {
select {
case r := <-db.sklCh:
r.err = db.handoverSkiplist(r)
r.wg.Done()
case <-lc.HasBeenClosed():
return
}
}
}

func (db *DB) doWrites(lc *z.Closer) {
defer lc.Done()
pendingCh := make(chan struct{}, 1)
Expand Down Expand Up @@ -1001,13 +1018,8 @@ func (db *DB) ensureRoomForWrite() error {
}
}

func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
if !db.opt.managedTxns {
panic("Handover Skiplist is only available in managed mode.")
}
db.lock.Lock()
defer db.lock.Unlock()

func (db *DB) handoverSkiplist(r *handoverRequest) error {
skl, callback := r.skl, r.callback
// If we have some data in db.mt, we should push that first, so the ordering of writes is
// maintained.
if !db.mt.sl.Empty() {
Expand Down Expand Up @@ -1057,6 +1069,25 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
}
}

func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
if !db.opt.managedTxns {
panic("Handover Skiplist is only available in managed mode.")
}

if atomic.LoadInt32(&db.blockWrites) == 1 {
return ErrBlockedWrites
}

db.lock.Lock()
defer db.lock.Unlock()

req := &handoverRequest{skl: skl, callback: callback}
req.wg.Add(1)
db.sklCh <- req
req.wg.Wait()
return req.err
}

func arenaSize(opt Options) int64 {
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}
Expand Down Expand Up @@ -1726,8 +1757,9 @@ func (db *DB) blockWrite() error {
}

func (db *DB) unblockWrite() {
db.closers.writes = z.NewCloser(1)
db.closers.writes = z.NewCloser(2)
go db.doWrites(db.closers.writes)
go db.handleHandovers(db.closers.writes)

// Resume writes.
atomic.StoreInt32(&db.blockWrites, 0)
Expand All @@ -1744,14 +1776,24 @@ func (db *DB) prepareToDrop() (func(), error) {
return func() {}, err
}
reqs := make([]*request, 0, 10)
skls := make([]*handoverRequest, 0, 5)
for {
select {
case r := <-db.writeCh:
reqs = append(reqs, r)
case skl := <-db.sklCh:
skls = append(skls, skl)
default:
if err := db.writeRequests(reqs); err != nil {
db.opt.Errorf("writeRequests: %v", err)
}
for _, skl := range skls {
skl.err = db.handoverSkiplist(skl)
skl.wg.Done()
if skl.err != nil {
db.opt.Errorf("handoverSkiplists: %v", skl.err)
}
}
db.stopMemoryFlush()
return func() {
db.opt.Infof("Resuming writes")
Expand Down
8 changes: 8 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/ristretto/z"
"github.com/pkg/errors"
Expand Down Expand Up @@ -677,6 +678,13 @@ type request struct {
ref int32
}

type handoverRequest struct {
skl *skl.Skiplist
callback func()
err error
wg sync.WaitGroup
}

func (req *request) reset() {
req.Entries = req.Entries[:0]
req.Ptrs = req.Ptrs[:0]
Expand Down

0 comments on commit 3f320f5

Please sign in to comment.