Skip to content

Commit

Permalink
opt(stream): add option to directly copy over tables from lower levels (
Browse files Browse the repository at this point in the history
dgraph-io#1700)

This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs.
For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels.

To use this option, the following options should be set in Stream.

stream.KeyToList = nil
stream.ChooseKey = nil
stream.SinceTs = 0
db.managedTxns = true

If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes.
  • Loading branch information
manishrjain authored May 21, 2021
1 parent 59c069f commit 74ade98
Show file tree
Hide file tree
Showing 14 changed files with 706 additions and 166 deletions.
1 change: 1 addition & 0 deletions badger/cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func stream(cmd *cobra.Command, args []string) error {
WithValueDir(so.outDir).
WithNumVersionsToKeep(so.numVersions).
WithCompression(options.CompressionType(so.compressionType)).
WithEncryptionKey(encKey).
WithReadOnly(false)
err = inDB.StreamDB(outOpt)

Expand Down
12 changes: 5 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,13 +813,10 @@ func (db *DB) writeRequests(reqs []*request) error {
}
}
db.opt.Debugf("writeRequests called. Writing to value log")
if !db.opt.managedTxns {
// Don't do value log writes in managed mode.
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}

db.opt.Debugf("Sending updates to subscribers")
Expand Down Expand Up @@ -2194,6 +2191,7 @@ func (db *DB) StreamDB(outOptions Options) error {
// Stream contents of DB to the output DB.
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
stream.FullCopy = true

stream.Send = func(buf *z.Buffer) error {
return writer.Write(buf)
Expand Down
20 changes: 10 additions & 10 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
// that the tables are sorted in the right order.
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
filterTables := func(tables []*table.Table) []*table.Table {
if opt.SinceTs > 0 {
tmp := tables[:0]
for _, t := range tables {
if t.MaxVersion() < opt.SinceTs {
continue
}
tmp = append(tmp, t)
if opt.SinceTs == 0 {
return tables
}
out := tables[:0]
for _, t := range tables {
if t.MaxVersion() < opt.SinceTs {
continue
}
tables = tmp
out = append(out, t)
}
return tables
return out
}

if len(opt.Prefix) == 0 {
Expand Down Expand Up @@ -492,7 +492,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
for i := 0; i < len(tables); i++ {
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
}
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
res := &Iterator{
txn: txn,
iitr: table.NewMergeIterator(iters, opt.Reverse),
Expand Down
85 changes: 46 additions & 39 deletions key_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/y"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -264,7 +265,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
// Write all the datakeys to the buf.
for _, k := range reg.dataKeys {
// Writing the datakey to the given buffer.
if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil {
if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil {
return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry")
}
}
Expand Down Expand Up @@ -338,44 +339,58 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
defer kr.Unlock()
// Key might have generated by another go routine. So,
// checking once again.
key, valid = validKey()
if valid {
if key, valid := validKey(); valid {
return key, nil
}
k := make([]byte, len(kr.opt.EncryptionKey))
iv, err := y.GenerateIV()
if err != nil {
return nil, err
}
_, err = rand.Read(k)
if err != nil {

if _, err = rand.Read(k); err != nil {
return nil, err
}
// Otherwise Increment the KeyID and generate new datakey.
kr.nextKeyID++
dk := &pb.DataKey{
dk := pb.DataKey{
KeyId: kr.nextKeyID,
Data: k,
CreatedAt: time.Now().Unix(),
Iv: iv,
}
kr.lastCreated = dk.CreatedAt
kr.dataKeys[kr.nextKeyID] = &dk
// Don't store the datakey on file if badger is running in InMemory mode.
if !kr.opt.InMemory {
// Store the datekey.
buf := &bytes.Buffer{}
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
return nil, err
}
// Persist the datakey to the disk
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
return nil, err
}
if kr.opt.InMemory {
return &dk, nil

}
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
dk.Data = k
kr.lastCreated = dk.CreatedAt
kr.dataKeys[kr.nextKeyID] = dk
return dk, nil
// Store the datekey.
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
return nil, err
}
return &dk, nil
}

func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) {
// If we don't have a encryption key, we cannot store the datakey.
if len(kr.opt.EncryptionKey) == 0 {
return 0, errors.New("No encryption key found. Cannot add data key")
}

if _, ok := kr.dataKeys[dk.KeyId]; !ok {
// If KeyId does not exists already, then use the next available KeyId to store data key.
kr.nextKeyID++
dk.KeyId = kr.nextKeyID
}
kr.dataKeys[dk.KeyId] = &dk

if kr.opt.InMemory {
return dk.KeyId, nil
}
// Store the datakey.
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
}

// Close closes the key registry.
Expand All @@ -387,38 +402,30 @@ func (kr *KeyRegistry) Close() error {
}

// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error {
// xor will encrypt the IV and xor with the given data.
// It'll used for both encryption and decryption.
xor := func() error {
if len(storageKey) == 0 {
return nil
}
var err error
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
return err
}
// In memory datakey will be plain text so encrypting before storing to the disk.
var err error
if err = xor(); err != nil {
if err := xor(); err != nil {
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
}
var data []byte
if data, err = k.Marshal(); err != nil {
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
var err2 error
// decrypting the datakey back.
if err2 = xor(); err2 != nil {
return y.Wrapf(err,
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
}
return err
data, err := key.Marshal()
if err != nil {
return y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
}
var lenCrcBuf [8]byte
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
y.Check2(buf.Write(lenCrcBuf[:]))
y.Check2(buf.Write(data))
// Decrypting the datakey back since we're using the pointer.
return xor()
y.Check2(w.Write(lenCrcBuf[:]))
y.Check2(w.Write(data))
return nil
}
37 changes: 32 additions & 5 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
return maxVs, decr()
}

// appendIterators appends iterators to an array of iterators, for merging.
// iterators returns an array of iterators, for merging.
// Note: This obtains references for the table handlers. Remember to close these iterators.
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
s.RLock()
defer s.RUnlock()

Expand All @@ -324,14 +324,41 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
out = append(out, t)
}
}
return appendIteratorsReversed(iters, out, topt)
return iteratorsReversed(out, topt)
}

tables := opt.pickTables(s.tables)
if len(tables) == 0 {
return iters
return nil
}
return append(iters, table.NewConcatIterator(tables, topt))
return []y.Iterator{table.NewConcatIterator(tables, topt)}
}

func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
if opt.Reverse {
panic("Invalid option for getTables")
}

// Typically this would only be called for the last level.
s.RLock()
defer s.RUnlock()

if s.level == 0 {
var out []*table.Table
for _, t := range s.tables {
if opt.pickTable(t) {
t.IncrRef()
out = append(out, t)
}
}
return out
}

tables := opt.pickTables(s.tables)
for _, t := range tables {
t.IncrRef()
}
return tables
}

type levelHandlerRLocked struct{}
Expand Down
68 changes: 61 additions & 7 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/table"
"github.com/dgraph-io/badger/v3/y"
Expand Down Expand Up @@ -900,7 +901,7 @@ func (s *levelsController) compactBuildTables(
var iters []y.Iterator
switch {
case lev == 0:
iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...)
case len(topTables) > 0:
y.AssertTrue(len(topTables) == 1)
iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
Expand Down Expand Up @@ -1606,24 +1607,34 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
return maxVs, nil
}

func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
func iteratorsReversed(th []*table.Table, opt int) []y.Iterator {
out := make([]y.Iterator, 0, len(th))
for i := len(th) - 1; i >= 0; i-- {
// This will increment the reference of the table handler.
out = append(out, th[i].NewIterator(opt))
}
return out
}

// appendIterators appends iterators to an array of iterators, for merging.
// getTables return tables from all levels. It would call IncrRef on all returned tables.
func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table {
res := make([][]*table.Table, 0, len(s.levels))
for _, level := range s.levels {
res = append(res, level.getTables(opt))
}
return res
}

// iterators returns an array of iterators, for merging.
// Note: This obtains references for the table handlers. Remember to close these iterators.
func (s *levelsController) appendIterators(
iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator {
// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
// data when there's a compaction.
itrs := make([]y.Iterator, 0, len(s.levels))
for _, level := range s.levels {
iters = level.appendIterators(iters, opt)
itrs = append(itrs, level.iterators(opt)...)
}
return iters
return itrs
}

// TableInfo represents the information about a table.
Expand Down Expand Up @@ -1750,3 +1761,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
sort.Strings(splits)
return splits
}

// AddTable builds the table from the KV.value options passed through the KV.Key.
func (lc *levelsController) AddTable(
kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error {
// TODO: Encryption / Decryption might be required for the table, if the sender and receiver
// don't have same encryption mode. See if inplace encryption/decryption can be done.
// Tables are sent in the sorted order, so no need to sort them here.
encrypted := len(lc.kv.opt.EncryptionKey) > 0
y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted))
// The keyId is zero if there is no encryption.
opts := buildTableOptions(lc.kv)
opts.Compression = options.CompressionType(change.Compression)
opts.DataKey = dk

fileID := lc.reserveFileID()
fname := table.NewFilename(fileID, lc.kv.opt.Dir)

// kv.Value is owned by the z.buffer. Ensure that we copy this buffer.
var tbl *table.Table
var err error
if lc.kv.opt.InMemory {
if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil {
return errors.Wrap(err, "while creating in-memory table from buffer")
}
} else {
if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil {
return errors.Wrap(err, "while creating table from buffer")
}
}

lc.levels[lev].addTable(tbl)
// Release the ref held by OpenTable. addTable would add a reference.
_ = tbl.DecrRef()

change.Id = fileID
change.Level = uint32(lev)
if dk != nil {
change.KeyId = dk.KeyId
}
// We use the same data KeyId. So, change.KeyId remains the same.
y.AssertTrue(change.Op == pb.ManifestChange_CREATE)
return lc.kv.manifest.addChanges([]*pb.ManifestChange{change})
}
Loading

0 comments on commit 74ade98

Please sign in to comment.