Skip to content

Commit

Permalink
Opt(stream): Use z.Buffer to stream data (dgraph-io#1606)
Browse files Browse the repository at this point in the history
Stream.Send now sends out z.Buffer instead of pb.KVList. z.Buffer marshals each KV as a separate slice. This significantly reduces the memory requirement by the Stream framework. Stream no longer uses z.Allocator or tries to put pb.KV struct on the Allocator for memory safety reasons.

Bring back the z.AllocatorPool for table.Builder.

Changes:
* Use z.Buffer for stream.Send
* Only use 8 streams in write bench
* Revert "Bug Fix: Fix up how we use z.Allocator"
This reverts commit 5ff9e1d.
* Bring allocator back. Use z.Buffer for send
* Add BufferToKVList function
* Print jemalloc while stream
* Bring in latest Ristretto
* Fix memory leak and benchmark read test

Co-authored-by: Ibrahim Jarif <[email protected]>
  • Loading branch information
manishrjain and Ibrahim Jarif authored Nov 25, 2020
1 parent feb1f5f commit ab8b5d9
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 178 deletions.
7 changes: 6 additions & 1 deletion backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/protobuf/proto"
)

Expand Down Expand Up @@ -115,7 +116,11 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {
}

var maxVersion uint64
stream.Send = func(list *pb.KVList) error {
stream.Send = func(buf *z.Buffer) error {
list, err := BufferToKVList(buf)
if err != nil {
return err
}
out := list.Kv[:0]
for _, kv := range list.Kv {
if maxVersion < kv.Version {
Expand Down
15 changes: 9 additions & 6 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -359,7 +360,7 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumVersionsToKeep(int(math.MaxInt32)).
WithBlockCacheSize(1 << 30).
WithIndexCacheSize(1 << 30)

if verbose {
opts = opts.WithLoggingLevel(badger.DEBUG)
}
Expand Down Expand Up @@ -498,13 +499,15 @@ func runTest(cmd *cobra.Command, args []string) error {
batch := tmpDb.NewWriteBatch()

stream := db.NewStream()
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
if err := batch.Set(kv.Key, kv.Value); err != nil {
stream.Send = func(buf *z.Buffer) error {
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
}
return nil
return batch.Set(kv.Key, kv.Value)
})
return err
}
y.Check(stream.Orchestrate(context.Background()))
y.Check(batch.Flush())
Expand Down
22 changes: 16 additions & 6 deletions badger/cmd/read_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"github.com/spf13/cobra"

"github.com/dgraph-io/badger/v2"
Expand Down Expand Up @@ -79,7 +80,7 @@ func init() {

// Scan the whole database using the iterators
func fullScanDB(db *badger.DB) {
txn := db.NewTransaction(false)
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

startTime = time.Now()
Expand Down Expand Up @@ -111,7 +112,7 @@ func readBench(cmd *cobra.Command, args []string) error {
WithBlockCacheSize(blockCacheSize << 20).
WithIndexCacheSize(indexCacheSize << 20)
fmt.Printf("Opening badger with options = %+v\n", opt)
db, err := badger.Open(opt)
db, err := badger.OpenManaged(opt)
if err != nil {
return y.Wrapf(err, "unable to open DB")
}
Expand Down Expand Up @@ -205,21 +206,30 @@ func getSampleKeys(db *badger.DB) ([][]byte, error) {
return l, nil
}

errStop := errors.Errorf("Stop iterating")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream.Send = func(l *pb.KVList) error {
stream.Send = func(buf *z.Buffer) error {
if count >= sampleSize {
return nil
}
for _, kv := range l.Kv {
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
keys = append(keys, kv.Key)
count++
if count >= sampleSize {
cancel()
return nil
return errStop
}
return nil
})
if err == errStop || err == nil {
return nil
}
return nil
return err
}

if err := stream.Orchestrate(ctx); err != nil && err != context.Canceled {
Expand Down
19 changes: 11 additions & 8 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,37 +204,38 @@ func writeSorted(db *badger.DB, num uint64) error {
}

wg := &sync.WaitGroup{}
writeCh := make(chan *pb.KVList, 3)
writeCh := make(chan *z.Buffer, 3)
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvs := &pb.KVList{}
kvBuf := z.NewBuffer(5 << 20)
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
kv := &pb.KV{
Key: key,
Value: value,
Version: 1,
StreamId: streamId,
})
}
badger.KVToBuffer(kv, kvBuf)

sz += es
atomic.AddUint64(&entriesWritten, 1)
atomic.AddUint64(&sizeWritten, uint64(es))

if sz >= 4<<20 { // 4 MB
writeCh <- kvs
kvs = &pb.KVList{}
writeCh <- kvBuf
kvBuf = z.NewBuffer(1 << 20)
sz = 0
}
}
writeCh <- kvs
writeCh <- kvBuf
}

// Let's create some streams.
width := num / 16
width := num / 4
streamID := uint32(0)
for start := uint64(0); start < num; start += width {
end := start + width
Expand All @@ -254,6 +255,7 @@ func writeSorted(db *badger.DB, num uint64) error {
if err := writer.Write(kvs); err != nil {
panic(err)
}
y.Check(kvs.Release())
}
log.Println("DONE streaming. Flushing...")
return writer.Flush()
Expand Down Expand Up @@ -395,6 +397,7 @@ func reportStats(c *z.Closer, db *badger.DB) {
y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))

if count%10 == 0 {
fmt.Printf(db.LevelsToString())
}
Expand Down
35 changes: 27 additions & 8 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -95,17 +96,35 @@ func (wb *WriteBatch) callback(err error) {
wb.err.Store(err)
}

func (wb *WriteBatch) Write(kvList *pb.KVList) error {
func (wb *WriteBatch) writeKV(kv *pb.KV) error {
e := Entry{Key: kv.Key, Value: kv.Value}
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]
}
y.AssertTrue(kv.Version != 0)
e.version = kv.Version
return wb.handleEntry(&e)
}

func (wb *WriteBatch) Write(buf *z.Buffer) error {
wb.Lock()
defer wb.Unlock()
for _, kv := range kvList.Kv {
e := Entry{Key: kv.Key, Value: kv.Value}
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]

err := buf.SliceIterate(func(s []byte) error {
kv := &pb.KV{}
if err := kv.Unmarshal(s); err != nil {
return err
}
y.AssertTrue(kv.Version != 0)
e.version = kv.Version
if err := wb.handleEntry(&e); err != nil {
return wb.writeKV(kv)
})
return err
}

func (wb *WriteBatch) WriteList(kvList *pb.KVList) error {
wb.Lock()
defer wb.Unlock()
for _, kv := range kvList.Kv {
if err := wb.writeKV(kv); err != nil {
return err
}
}
Expand Down
8 changes: 6 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type DB struct {
registry *KeyRegistry
blockCache *ristretto.Cache
indexCache *ristretto.Cache
allocPool *z.AllocatorPool
}

const (
Expand Down Expand Up @@ -218,6 +219,7 @@ func Open(opt Options) (*DB, error) {
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
Expand Down Expand Up @@ -476,6 +478,8 @@ func (db *DB) IsClosed() bool {
}

func (db *DB) close() (err error) {
defer db.allocPool.Release()

db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(atomic.LoadInt64(&db.lc.l0stallsMs)))

Expand Down Expand Up @@ -1779,8 +1783,8 @@ func (db *DB) StreamDB(outOptions Options) error {
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)

stream.Send = func(kvs *pb.KVList) error {
return writer.Write(kvs)
stream.Send = func(buf *z.Buffer) error {
return writer.Write(buf)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
Expand Down
22 changes: 19 additions & 3 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,20 @@ func TestKeyCount(t *testing.T) {
wg.Wait()
close(writeCh)
}()

write := func(kvs *pb.KVList) error {
buf := z.NewBuffer(1 << 20)
defer buf.Release()

for _, kv := range kvs.Kv {
KVToBuffer(kv, buf)
}
writer.Write(buf)
return nil
}

for kvs := range writeCh {
require.NoError(t, writer.Write(kvs))
require.NoError(t, write(kvs))
}
require.NoError(t, writer.Flush())
}
Expand Down Expand Up @@ -999,8 +1011,11 @@ func TestKeyCount(t *testing.T) {

streams := make(map[uint32]int)
stream := db2.NewStream()
stream.Send = func(list *pb.KVList) error {
count += len(list.Kv)
stream.Send = func(buf *z.Buffer) error {
list, err := BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.Kv {
last := streams[kv.StreamId]
key := binary.BigEndian.Uint64(kv.Key)
Expand All @@ -1010,6 +1025,7 @@ func TestKeyCount(t *testing.T) {
}
streams[kv.StreamId] = int(key)
}
count += len(list.Kv)
return nil
}
require.NoError(t, stream.Orchestrate(context.Background()))
Expand Down
11 changes: 7 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

// summary is produced when DB is closed. Currently it is used only for testing.
Expand Down Expand Up @@ -2088,25 +2089,27 @@ func TestVerifyChecksum(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
value := make([]byte, 32)
y.Check2(rand.Read(value))
l := &pb.KVList{}
st := 0

buf := z.NewBuffer(10 << 20)
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(i))
l.Kv = append(l.Kv, &pb.KV{
KVToBuffer(&pb.KV{
Key: key,
Value: value,
StreamId: uint32(st),
Version: 1,
})
}, buf)
if i%100 == 0 {
st++
}
}

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(l), "sw.Write() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

require.NoError(t, db.VerifyChecksum(), "checksum verification failed for DB")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695
github.com/dgraph-io/ristretto v0.0.4-0.20201125174811-766bca5e9938
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695 h1:UP7ZrWkI7Qnp0T2ejWq7HGJOfiTIUfLE58Jg862a1eQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201125174811-766bca5e9938 h1:FdSJif9oUVeH+MpsScsrL6OAbdW0pUYvXmkdhDSWWcQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201125174811-766bca5e9938/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
1 change: 1 addition & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func buildTableOptions(db *DB) table.Options {
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
AllocPool: db.allocPool,
DataKey: dk,
}
}
Expand Down
Loading

0 comments on commit ab8b5d9

Please sign in to comment.