Skip to content

Commit

Permalink
Fix(OOM): Reuse pb.KVs in Stream (hypermodeinc#1609)
Browse files Browse the repository at this point in the history
We store a slice of pb.KVs in Iterator, so it can be used by Stream users.
manishrjain authored Dec 3, 2020
1 parent 6e7f078 commit 70088c6
Showing 4 changed files with 25 additions and 3 deletions.
3 changes: 2 additions & 1 deletion backup.go
Original file line number Diff line number Diff line change
@@ -87,7 +87,8 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {

// clear txn bits
meta := item.meta &^ (bitTxn | bitFinTxn)
kv := &pb.KV{
kv := itr.NewKV()
*kv = pb.KV{
Key: item.KeyCopy(nil),
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
17 changes: 17 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import (
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/table"

"github.com/dgraph-io/badger/v2/y"
@@ -421,6 +422,8 @@ type Iterator struct {
// the iterator. It can be used, for example, to uniquely identify each of the
// iterators created by the stream interface
ThreadId int

reuse []*pb.KV
}

// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
@@ -480,6 +483,20 @@ func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
return txn.NewIterator(opt)
}

// NewKV must be called serially. It is NOT thread-safe.
func (it *Iterator) NewKV() *pb.KV {
if len(it.reuse) == 0 {
return &pb.KV{}
}
kv := it.reuse[len(it.reuse)-1]
it.reuse = it.reuse[:len(it.reuse)-1]
if kv == nil {
kv = &pb.KV{}
}
kv.Reset()
return kv
}

func (it *Iterator) newItem() *Item {
item := it.waste.pop()
if item == nil {
5 changes: 4 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
@@ -110,7 +110,7 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
break
}

kv := &pb.KV{}
kv := itr.NewKV()
kv.Key = ka

if err := item.Value(func(val []byte) error {
@@ -245,6 +245,9 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
return err
}
}
if len(itr.reuse) < 100 {
itr.reuse = append(itr.reuse, list.Kv...)
}
}
// Mark the stream as done.
if st.doneMarkers {
3 changes: 2 additions & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
@@ -300,7 +300,8 @@ func TestStreamCustomKeyToList(t *testing.T) {
if err != nil {
return nil, err
}
kv := &pb.KV{
kv := itr.NewKV()
*kv = pb.KV{
Key: y.Copy(item.Key()),
Value: val,
}

0 comments on commit 70088c6

Please sign in to comment.