diff --git a/db.go b/db.go index 6c6c1799a..b64fa5c65 100644 --- a/db.go +++ b/db.go @@ -44,12 +44,11 @@ var ( ) type closers struct { - updateSize *y.Closer - compactors *y.Closer - memtable *y.Closer - writes *y.Closer - txnCallback *y.Closer - valueGC *y.Closer + updateSize *y.Closer + compactors *y.Closer + memtable *y.Closer + writes *y.Closer + valueGC *y.Closer } // DB provides the various functions required to interact with Badger. @@ -61,18 +60,17 @@ type DB struct { // nil if Dir and ValueDir are the same valueDirGuard *directoryLockGuard - closers closers - elog trace.EventLog - mt *skl.Skiplist // Our latest (actively written) in-memory table - imm []*skl.Skiplist // Add here only AFTER pushing to flushChan. - opt Options - manifest *manifestFile - lc *levelsController - vlog valueLog - vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt - writeCh chan *request - flushChan chan flushTask // For flushing memtables. - txnCallbackCh chan *txnCb // For running txn callbacks. + closers closers + elog trace.EventLog + mt *skl.Skiplist // Our latest (actively written) in-memory table + imm []*skl.Skiplist // Add here only AFTER pushing to flushChan. + opt Options + manifest *manifestFile + lc *levelsController + vlog valueLog + vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt + writeCh chan *request + flushChan chan flushTask // For flushing memtables. blockWrites int32 @@ -249,7 +247,6 @@ func Open(opt Options) (db *DB, err error) { imm: make([]*skl.Skiplist, 0, opt.NumMemtables), flushChan: make(chan flushTask, opt.NumMemtables), writeCh: make(chan *request, kvWriteChCapacity), - txnCallbackCh: make(chan *txnCb, 100), opt: opt, manifest: manifestFile, elog: trace.NewEventLog("Badger", "DB"), @@ -302,12 +299,6 @@ func Open(opt Options) (db *DB, err error) { db.orc.txnMark.Done(db.orc.nextTxnTs) db.orc.nextTxnTs++ - // These goroutines run the user specified callbacks passed during txn.CommitWith. - db.closers.txnCallback = y.NewCloser(3) - for i := 0; i < 3; i++ { - go db.runTxnCallbacks(db.closers.txnCallback) - } - db.writeCh = make(chan *request, kvWriteChCapacity) db.closers.writes = y.NewCloser(1) go db.doWrites(db.closers.writes) @@ -332,10 +323,6 @@ func (db *DB) Close() (err error) { // Stop writes next. db.closers.writes.SignalAndWait() - // Wait for callbacks to be run. - close(db.txnCallbackCh) - db.closers.txnCallback.SignalAndWait() - // Now close the value log. if vlogErr := db.vlog.Close(); err == nil { err = errors.Wrap(vlogErr, "DB.Close") diff --git a/txn.go b/txn.go index d370ea97d..a284cae72 100644 --- a/txn.go +++ b/txn.go @@ -604,31 +604,19 @@ type txnCb struct { err error } -func (db *DB) runTxnCallbacks(closer *y.Closer) { - defer closer.Done() - - run := func(cb *txnCb) { - switch { - case cb == nil: - panic("txn callback is nil") - case cb.err != nil: - cb.user(cb.err) - case cb.commit != nil: - err := cb.commit() - cb.user(err) - case cb.user == nil: - panic("Must have caught a nil callback for txn.CommitWith") - default: - cb.user(nil) - } - } - - // We don't check closer.HasBeenClosed because we must empty out the txnCallbackCh completely - // before exiting here. During db.Close, txnCallbackCh is closed, so this loop below would - // automatically exit. We do still need the closer, so we can block until all these callbacks - // are finished running. - for cb := range db.txnCallbackCh { - run(cb) +func runTxnCallback(cb *txnCb) { + switch { + case cb == nil: + panic("txn callback is nil") + case cb.err != nil: + cb.user(cb.err) + case cb.commit != nil: + err := cb.commit() + cb.user(err) + case cb.user == nil: + panic("Must have caught a nil callback for txn.CommitWith") + default: + cb.user(nil) } } @@ -648,16 +636,17 @@ func (txn *Txn) CommitWith(cb func(error)) { // Do not run these callbacks from here, because the CommitWith and the // callback might be acquiring the same locks. Instead run the callback // from another goroutine. - txn.db.txnCallbackCh <- &txnCb{user: cb, err: nil} + go runTxnCallback(&txnCb{user: cb, err: nil}) return } commitCb, err := txn.commitAndSend() if err != nil { - txn.db.txnCallbackCh <- &txnCb{user: cb, err: err} + go runTxnCallback(&txnCb{user: cb, err: err}) return } - txn.db.txnCallbackCh <- &txnCb{user: cb, commit: commitCb} + + go runTxnCallback(&txnCb{user: cb, commit: commitCb}) } // ReadTs returns the read timestamp of the transaction. diff --git a/value_test.go b/value_test.go index 8b78c6fd9..27796be43 100644 --- a/value_test.go +++ b/value_test.go @@ -772,7 +772,7 @@ func (th *testHelper) readRange(from, to int) { } return nil }) - require.NoErrorf(th.t, err, "key=%q", th.key(i)) + require.NoError(th.t, err, "key=%q", th.key(i)) } }