Skip to content

Commit

Permalink
Create a WriteBatch API to allow efficient serial writes (dgraph-io#608)
Browse files Browse the repository at this point in the history
**Breaking change**

- This change removes the ability to provide a callback in Commit. Instead, it creates another function called CommitWith, which takes a callback.
- Previously, a user-specified callback might or might not get executed. This PR fixes that by guaranteeing that the commit callback is always executed.
- Create a WriteBatch API to batch up multiple updates into a single txn. The txn gets Committed via callback, so the user gets a simple efficient way to do a lot of writes.
- Run only a few goroutines to execute these user callbacks. Previously, we were shooting off one goroutine per Commit callback.
  • Loading branch information
manishrjain authored Oct 9, 2018
1 parent fc94c57 commit 6daccf9
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 150 deletions.
53 changes: 13 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,52 +614,25 @@ There are multiple workarounds during iteration:

Are you creating a new transaction for every single key update, and waiting for
it to `Commit` fully before creating a new one? This will lead to very low
throughput. To get best write performance, batch up multiple writes inside a
transaction using single `DB.Update()` call. You could also have multiple such
`DB.Update()` calls being made concurrently from multiple goroutines.
throughput.

The way to achieve the highest write throughput via Badger, is to do serial
writes and use callbacks in `txn.Commit`, like so:
We have created `WriteBatch` API which provides a way to batch up
many updates into a single transaction and `Commit` that transaction using
callbacks to avoid blocking. This amortizes the cost of a transaction really
well, and provides the most efficient way to do bulk writes.

```go
che := make(chan error, 1)
storeErr := func(err error) {
if err == nil {
return
}
select {
case che <- err:
default:
}
}
Note that `WriteBatch` API does not allow any reads. For read-modify-write
workloads, you should be using the `Transaction` API.

getErr := func() error {
select {
case err := <-che:
return err
default:
return nil
}
}

var wg sync.WaitGroup
for _, kv := range kvs {
wg.Add(1)
txn := db.NewTransaction(true)
handle(txn.Set(kv.Key, kv.Value))
handle(txn.Commit(func(err error) {
storeErr(err)
wg.Done()
}))
```go
wb := db.NewWriteBatch()
for i := 0; i < N; i++ {
err := wb.Set(key(i), value(i), 0) // Will create txns as needed.
handle(err)
}
wg.Wait()
return getErr()
handle(wb.Flush()) // Wait for all txns to finish.
```

In this code, we passed a callback function to `txn.Commit`, which can pick up
and return the first error encountered, if any. Callbacks can be made to do more
things, like retrying commits etc.

- **I don't see any disk write. Why?**

If you're using Badger with `SyncWrites=false`, then your writes might not be written to value log
Expand Down
23 changes: 5 additions & 18 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,27 +377,13 @@ func runTest(cmd *cobra.Command, args []string) error {
}
defer db.Close()

var wg sync.WaitGroup
var txns []*badger.Txn
wb := db.NewWriteBatch()
for i := 0; i < numAccounts; i++ {
wg.Add(1)
txn := db.NewTransaction(true)
y.Check(putBalance(txn, i, initialBal))
txns = append(txns, txn)
}
for _, txn := range txns {
y.Check(txn.Commit(func(err error) {
y.Check(err)
wg.Done()
}))
y.Check(wb.Set(key(i), toSlice(initialBal), 0))
}
log.Println("Waiting for writes to be done")
wg.Wait()
log.Println("Waiting for writes to be done...")
y.Check(wb.Flush())

y.Check(db.View(func(txn *badger.Txn) error {
log.Printf("LowTs: %d\n", txn.ReadTs())
return nil
}))
log.Println("Bank initialization OK. Commencing test.")
log.Printf("Running with %d accounts, and %d goroutines.\n", numAccounts, numGoroutines)
log.Printf("Using keyPrefix: %s\n", keyPrefix)
Expand All @@ -409,6 +395,7 @@ func runTest(cmd *cobra.Command, args []string) error {
endTs := time.Now().Add(dur)
var total, errors, reads uint64

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
126 changes: 126 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

import (
"sync"
)

type WriteBatch struct {
sync.Mutex
txn *Txn
db *DB
wg sync.WaitGroup
err error
}

// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
// batching them up as tightly as possible in a single transaction and using callbacks to avoid
// waiting for them to commit, thus achieving good performance. This API hides away the logic of
// creating and committing transactions. Due to the nature of SSI guaratees provided by Badger,
// blind writes can never encounter transaction conflicts (ErrConflict).
func (db *DB) NewWriteBatch() *WriteBatch {
return &WriteBatch{db: db, txn: db.newTransaction(true, true)}
}

func (wb *WriteBatch) callback(err error) {
// sync.WaitGroup is thread-safe, so it doesn't need to be run inside wb.Lock.
defer wb.wg.Done()
if err == nil {
return
}

wb.Lock()
defer wb.Unlock()
if wb.err != nil {
return
}
wb.err = err
}

// Set is equivalent of Txn.SetWithMeta.
func (wb *WriteBatch) Set(k, v []byte, meta byte) error {
wb.Lock()
defer wb.Unlock()

if err := wb.txn.SetWithMeta(k, v, meta); err != ErrTxnTooBig {
return err
}
// Txn has reached it's zenith. Commit now.
if cerr := wb.commit(); cerr != nil {
return cerr
}
// This time the error must not be ErrTxnTooBig, otherwise, we make the
// error permanent.
if err := wb.txn.SetWithMeta(k, v, meta); err != nil {
wb.err = err
return err
}
return nil
}

// Delete is equivalent of Txn.Delete.
func (wb *WriteBatch) Delete(k []byte) error {
wb.Lock()
defer wb.Unlock()

if err := wb.txn.Delete(k); err != ErrTxnTooBig {
return err
}
if err := wb.commit(); err != nil {
return err
}
if err := wb.txn.Delete(k); err != nil {
wb.err = err
return err
}
return nil
}

// Caller to commit must hold a write lock.
func (wb *WriteBatch) commit() error {
if wb.err != nil {
return wb.err
}
// Get a new txn before we commit this one. So, the new txn doesn't need
// to wait for this one to commit.
wb.wg.Add(1)
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, true)
wb.txn.readTs = 0 // We're not reading anything.
return wb.err
}

// Flush must be called at the end to ensure that any pending writes get committed to Badger. Flush
// returns any error stored by WriteBatch.
func (wb *WriteBatch) Flush() error {
wb.Lock()
_ = wb.commit()
wb.txn.Discard()
wb.Unlock()

wb.wg.Wait()
// Safe to access error without any synchronization here.
return wb.err
}

// Error returns any errors encountered so far. No commits would be run once an error is detected.
func (wb *WriteBatch) Error() error {
wb.Lock()
defer wb.Unlock()
return wb.err
}
70 changes: 70 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestWriteBatch(t *testing.T) {
key := func(i int) []byte {
// b := make([]byte, 8)
// binary.BigEndian.PutUint64(b, uint64(i))
// return b
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}

runBadgerTest(t, nil, func(t *testing.T, db *DB) {
wb := db.NewWriteBatch()
N, M := 50000, 1000
start := time.Now()

for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i), 0))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.Delete(key(i)))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
45 changes: 29 additions & 16 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ var (
)

type closers struct {
updateSize *y.Closer
compactors *y.Closer
memtable *y.Closer
writes *y.Closer
valueGC *y.Closer
updateSize *y.Closer
compactors *y.Closer
memtable *y.Closer
writes *y.Closer
txnCallback *y.Closer
valueGC *y.Closer
}

// DB provides the various functions required to interact with Badger.
Expand All @@ -63,17 +64,18 @@ type DB struct {
// nil if Dir and ValueDir are the same
valueDirGuard *directoryLockGuard

closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
txnCallbackCh chan *txnCb // For running txn callbacks.

blockWrites int32

Expand Down Expand Up @@ -246,6 +248,7 @@ func Open(opt Options) (db *DB, err error) {
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
txnCallbackCh: make(chan *txnCb, 100),
opt: opt,
manifest: manifestFile,
elog: trace.NewEventLog("Badger", "DB"),
Expand Down Expand Up @@ -309,6 +312,12 @@ func Open(opt Options) (db *DB, err error) {
return db, errors.Wrapf(err, "Unable to mmap RDWR log file")
}

// These goroutines run the user specified callbacks passed during txn.CommitWith.
db.closers.txnCallback = y.NewCloser(3)
for i := 0; i < 3; i++ {
go db.runTxnCallbacks(db.closers.txnCallback)
}

db.writeCh = make(chan *request, kvWriteChCapacity)
db.closers.writes = y.NewCloser(1)
go db.doWrites(db.closers.writes)
Expand All @@ -333,6 +342,10 @@ func (db *DB) Close() (err error) {
// Stop writes next.
db.closers.writes.SignalAndWait()

// Wait for callbacks to be run.
close(db.txnCallbackCh)
db.closers.txnCallback.SignalAndWait()

// Now close the value log.
if vlogErr := db.vlog.Close(); err == nil {
err = errors.Wrap(vlogErr, "DB.Close")
Expand Down
Loading

0 comments on commit 6daccf9

Please sign in to comment.