Skip to content

Commit

Permalink
Performance: Run each txn callback in a goroutine.
Browse files Browse the repository at this point in the history
Run each txn callback in goroutine, like we used to do before. Sending
them all over one channel to execute via a few goroutines slows down
things quite a lot.
  • Loading branch information
manishrjain committed Oct 27, 2018
1 parent 758c4ba commit 6c8b15c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 58 deletions.
45 changes: 16 additions & 29 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ var (
)

type closers struct {
updateSize *y.Closer
compactors *y.Closer
memtable *y.Closer
writes *y.Closer
txnCallback *y.Closer
valueGC *y.Closer
updateSize *y.Closer
compactors *y.Closer
memtable *y.Closer
writes *y.Closer
valueGC *y.Closer
}

// DB provides the various functions required to interact with Badger.
Expand All @@ -61,18 +60,17 @@ type DB struct {
// nil if Dir and ValueDir are the same
valueDirGuard *directoryLockGuard

closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
txnCallbackCh chan *txnCb // For running txn callbacks.
closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.

blockWrites int32

Expand Down Expand Up @@ -249,7 +247,6 @@ func Open(opt Options) (db *DB, err error) {
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
txnCallbackCh: make(chan *txnCb, 100),
opt: opt,
manifest: manifestFile,
elog: trace.NewEventLog("Badger", "DB"),
Expand Down Expand Up @@ -302,12 +299,6 @@ func Open(opt Options) (db *DB, err error) {
db.orc.txnMark.Done(db.orc.nextTxnTs)
db.orc.nextTxnTs++

// These goroutines run the user specified callbacks passed during txn.CommitWith.
db.closers.txnCallback = y.NewCloser(3)
for i := 0; i < 3; i++ {
go db.runTxnCallbacks(db.closers.txnCallback)
}

db.writeCh = make(chan *request, kvWriteChCapacity)
db.closers.writes = y.NewCloser(1)
go db.doWrites(db.closers.writes)
Expand All @@ -332,10 +323,6 @@ func (db *DB) Close() (err error) {
// Stop writes next.
db.closers.writes.SignalAndWait()

// Wait for callbacks to be run.
close(db.txnCallbackCh)
db.closers.txnCallback.SignalAndWait()

// Now close the value log.
if vlogErr := db.vlog.Close(); err == nil {
err = errors.Wrap(vlogErr, "DB.Close")
Expand Down
45 changes: 17 additions & 28 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,31 +604,19 @@ type txnCb struct {
err error
}

func (db *DB) runTxnCallbacks(closer *y.Closer) {
defer closer.Done()

run := func(cb *txnCb) {
switch {
case cb == nil:
panic("txn callback is nil")
case cb.err != nil:
cb.user(cb.err)
case cb.commit != nil:
err := cb.commit()
cb.user(err)
case cb.user == nil:
panic("Must have caught a nil callback for txn.CommitWith")
default:
cb.user(nil)
}
}

// We don't check closer.HasBeenClosed because we must empty out the txnCallbackCh completely
// before exiting here. During db.Close, txnCallbackCh is closed, so this loop below would
// automatically exit. We do still need the closer, so we can block until all these callbacks
// are finished running.
for cb := range db.txnCallbackCh {
run(cb)
func runTxnCallback(cb *txnCb) {
switch {
case cb == nil:
panic("txn callback is nil")
case cb.err != nil:
cb.user(cb.err)
case cb.commit != nil:
err := cb.commit()
cb.user(err)
case cb.user == nil:
panic("Must have caught a nil callback for txn.CommitWith")
default:
cb.user(nil)
}
}

Expand All @@ -648,16 +636,17 @@ func (txn *Txn) CommitWith(cb func(error)) {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
// from another goroutine.
txn.db.txnCallbackCh <- &txnCb{user: cb, err: nil}
go runTxnCallback(&txnCb{user: cb, err: nil})
return
}

commitCb, err := txn.commitAndSend()
if err != nil {
txn.db.txnCallbackCh <- &txnCb{user: cb, err: err}
go runTxnCallback(&txnCb{user: cb, err: err})
return
}
txn.db.txnCallbackCh <- &txnCb{user: cb, commit: commitCb}

go runTxnCallback(&txnCb{user: cb, commit: commitCb})
}

// ReadTs returns the read timestamp of the transaction.
Expand Down
2 changes: 1 addition & 1 deletion value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func (th *testHelper) readRange(from, to int) {
}
return nil
})
require.NoErrorf(th.t, err, "key=%q", th.key(i))
require.NoError(th.t, err, "key=%q", th.key(i))
}
}

Expand Down

0 comments on commit 6c8b15c

Please sign in to comment.