Skip to content

Commit

Permalink
Reuse buffer and fix a deadlock in levelsController (pingcap#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
hslam authored Mar 31, 2021
1 parent d2b18ae commit 0885bbf
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 23 deletions.
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func Open(opt Options) (db *DB, err error) {
NumCounters: opt.MaxBlockCacheSize / int64(opt.TableBuilderOptions.BlockSize) * 10,
MaxCost: opt.MaxBlockCacheSize,
BufferItems: 64,
OnEvict: sstable.OnEvict,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create block cache")
Expand Down Expand Up @@ -453,6 +454,7 @@ func (db *DB) DeleteFilesInRange(start, end []byte) {
discardStats.collect(it.Value())
}
deletes[i] = tbl
it.Close()
}
if len(discardStats.ptrs) > 0 {
db.blobManger.discardCh <- &discardStats
Expand Down Expand Up @@ -835,6 +837,7 @@ func arenaSize(opt Options) int64 {
// WriteLevel0Table flushes memtable. It drops deleteValues.
func (db *DB) writeLevel0Table(s *memtable.Table, f *os.File) error {
iter := s.NewIterator(false)
defer iter.Close()
var (
bb *blobFileBuilder
numWrite, bytesWrite int
Expand Down
8 changes: 4 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,6 @@ func TestIterateDeleted(t *testing.T) {

txn := db.NewTransaction(false)
idxIt := txn.NewIterator(DefaultIteratorOptions)
defer idxIt.Close()

count := 0
txn2 := db.NewTransaction(true)
Expand All @@ -646,11 +645,10 @@ func TestIterateDeleted(t *testing.T) {
}
require.Equal(t, 2, count)
require.NoError(t, txn2.Commit())

idxIt.Close()
t.Run(fmt.Sprintf("Prefetch=%t", false), func(t *testing.T) {
txn := db.NewTransaction(false)
idxIt = txn.NewIterator(DefaultIteratorOptions)

idxIt := txn.NewIterator(DefaultIteratorOptions)
var estSize int64
var idxKeys []string
for idxIt.Seek(prefix); idxIt.Valid(); idxIt.Next() {
Expand All @@ -665,6 +663,7 @@ func TestIterateDeleted(t *testing.T) {
}
require.Equal(t, 0, len(idxKeys))
require.Equal(t, int64(0), estSize)
idxIt.Close()
})
})
}
Expand Down Expand Up @@ -762,6 +761,7 @@ func TestSetIfAbsentAsync(t *testing.T) {
txn := kv.NewTransaction(false)
var count int
it := txn.NewIterator(DefaultIteratorOptions)
defer it.Close()
{
t.Log("Starting first basic iteration")
for it.Rewind(); it.Valid(); it.Next() {
Expand Down
8 changes: 8 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (opts *IteratorOptions) OverlapMemTable(t *memtable.Table) bool {
return true
}
iter := t.NewIterator(false)
defer iter.Close()
iter.Seek(opts.StartKey.UserKey)
if !iter.Valid() {
return false
Expand Down Expand Up @@ -259,6 +260,8 @@ type Iterator struct {
item *Item
itBuf Item
vs y.ValueStruct

closed bool
}

// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
Expand Down Expand Up @@ -318,6 +321,11 @@ func (it *Iterator) ValidForPrefix(prefix []byte) bool {

// Close would close the iterator. It is important to call this when you're done with iteration.
func (it *Iterator) Close() {
if it.closed {
return
}
it.closed = true
it.iitr.Close()
atomic.AddInt32(&it.txn.numIterators, -1)
}

Expand Down
8 changes: 5 additions & 3 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func (lc *levelsController) compactBuildTables(cd *CompactDef) (newTables []tabl
func CompactTables(cd *CompactDef, stats *y.CompactionStats, discardStats *DiscardStats) ([]*sstable.BuildResult, error) {
var buildResults []*sstable.BuildResult
it := cd.buildIterator()
defer it.Close()

skippedTbls := cd.SkippedTbls
splitHints := cd.splitHints
Expand Down Expand Up @@ -687,12 +688,13 @@ func (lc *levelsController) addLevel0Table(t table.Table, head *protos.HeadInfo)
var timeStart time.Time
{
log.Warn("STALLED STALLED STALLED", zap.Duration("duration", time.Since(lastUnstalled)))
lc.cstatus.RLock()
for i := 0; i < lc.kv.opt.TableBuilderOptions.MaxLevels; i++ {
log.Warn("dump level status", zap.Int("level", i), zap.String("status", lc.cstatus.levels[i].debug()),
lc.cstatus.RLock()
status := lc.cstatus.levels[i].debug()
lc.cstatus.RUnlock()
log.Warn("dump level status", zap.Int("level", i), zap.String("status", status),
zap.Int64("size", lc.levels[i].getTotalSize()))
}
lc.cstatus.RUnlock()
timeStart = time.Now()
}
// Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we
Expand Down
15 changes: 13 additions & 2 deletions options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"io/ioutil"

"github.com/golang/snappy"
"github.com/pingcap/badger/buffer"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/badger/buffer"
)

// CompressionType specifies how a block should be compressed.
Expand Down Expand Up @@ -69,8 +69,19 @@ func (c CompressionType) Decompress(data []byte) ([]byte, error) {
case None:
return data, nil
case Snappy:
return snappy.Decode(nil, data)
defer buffer.PutBuffer(data)
length, err := snappy.DecodedLen(data)
if err != nil {
return nil, err
}
dst := buffer.GetBuffer(length)
res, err := snappy.Decode(dst, data)
if &res[0] != &dst[0] {
buffer.PutBuffer(dst)
}
return res, err
case ZSTD:
defer buffer.PutBuffer(data)
reader := bytes.NewBuffer(data)
r, err := zstd.NewReader(reader)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions table/concat_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,16 @@ func (s *ConcatIterator) Next() {
func (s *ConcatIterator) NextVersion() bool {
return s.cur.NextVersion()
}

// Close implements y.Interface.
func (s *ConcatIterator) Close() error {
for _, it := range s.iters {
if it == nil {
continue
}
if err := it.Close(); err != nil {
return y.Wrapf(err, "ConcatIterator")
}
}
return nil
}
6 changes: 6 additions & 0 deletions table/memtable/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,10 @@ func (s *Iterator) SeekToLast() {
s.loadNode()
}

func (s *Iterator) Close() error {
return nil
}

// UniIterator is a unidirectional memtable iterator. It is a thin wrapper around
// Iterator. We like to keep Iterator as before, because it is more powerful and
// we might support bidirectional iterators in the future.
Expand Down Expand Up @@ -713,3 +717,5 @@ func (s *UniIterator) FillValue(vs *y.ValueStruct) { s.iter.FillValue(vs) }

// Valid implements y.Interface
func (s *UniIterator) Valid() bool { return s.iter.Valid() }

func (s *UniIterator) Close() error { return s.iter.Close() }
8 changes: 7 additions & 1 deletion table/memtable/skl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestEmpty(t *testing.T) {

it.Seek(key)
require.False(t, it.Valid())
it.Close()
}

// TestBasic tests single-threaded inserts and updates and gets.
Expand Down Expand Up @@ -255,6 +256,7 @@ func TestIteratorNext(t *testing.T) {
l := newSkiplist(arenaSize)
defer l.Delete()
it := l.NewIterator()
defer it.Close()
require.False(t, it.Valid())
it.SeekToFirst()
require.False(t, it.Valid())
Expand All @@ -278,6 +280,7 @@ func TestIteratorPrev(t *testing.T) {
l := newSkiplist(arenaSize)
defer l.Delete()
it := l.NewIterator()
defer it.Close()
require.False(t, it.Valid())
it.SeekToFirst()
require.False(t, it.Valid())
Expand All @@ -302,7 +305,7 @@ func TestIteratorSeek(t *testing.T) {
defer l.Delete()

it := l.NewIterator()

defer it.Close()
require.False(t, it.Valid())
it.SeekToFirst()
require.False(t, it.Valid())
Expand Down Expand Up @@ -373,6 +376,7 @@ func TestPutWithHint(t *testing.T) {
cnt++
}
it := l.NewIterator()
defer it.Close()
var lastKey y.Key
cntGot := 0
for it.SeekToFirst(); it.Valid(); it.Next() {
Expand Down Expand Up @@ -456,6 +460,7 @@ func TestIterateMultiVersion(t *testing.T) {
keyVals := generateKeyValues("key", 4000)
skl := buildMultiVersionSkiopList(keyVals)
it := skl.NewIterator()
defer it.Close()
var lastKey y.Key
for it.SeekToFirst(); it.Valid(); it.Next() {
if !lastKey.IsEmpty() {
Expand Down Expand Up @@ -484,6 +489,7 @@ func TestIterateMultiVersion(t *testing.T) {
}
}
revIt := skl.NewUniIterator(true)
defer revIt.Close()
lastKey.Reset()
for revIt.Rewind(); revIt.Valid(); revIt.Next() {
if !lastKey.IsEmpty() {
Expand Down
2 changes: 2 additions & 0 deletions table/memtable/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,5 @@ func (it *listNodeIterator) Value() y.ValueStruct { return it.n.entries[it.idx].
func (it *listNodeIterator) FillValue(vs *y.ValueStruct) { *vs = it.Value() }

func (it *listNodeIterator) Valid() bool { return it.idx >= 0 && it.idx < len(it.n.latestOffs) }

func (it *listNodeIterator) Close() error { return nil }
2 changes: 2 additions & 0 deletions table/memtable/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ func TestListNodeIterator(t *testing.T) {
key := newKey(i)
require.EqualValues(t, it.Key().UserKey, key)
}
it.Close()
it = ln.newIterator(false)
for i := 1; i < 100; i++ {
it.Next()
key := newKey(i)
require.EqualValues(t, it.Key().UserKey, key)
}
it.Close()
}

func newKey(i int) []byte {
Expand Down
17 changes: 17 additions & 0 deletions table/merge_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,19 @@ func (mt *MergeIterator) FillValue(vs *y.ValueStruct) {
}
}

// Close implements y.Iterator.
func (mt *MergeIterator) Close() error {
smallerErr := mt.smaller.iter.Close()
biggerErr := mt.bigger.iter.Close()
if smallerErr != nil {
return y.Wrapf(smallerErr, "MergeIterator")
}
if biggerErr != nil {
return y.Wrapf(biggerErr, "MergeIterator")
}
return nil
}

// NewMergeIterator creates a merge iterator
func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator {
if len(iters) == 0 {
Expand Down Expand Up @@ -226,3 +239,7 @@ func (e *EmptyIterator) FillValue(vs *y.ValueStruct) {}
func (e *EmptyIterator) Valid() bool {
return false
}

func (e *EmptyIterator) Close() error {
return nil
}
17 changes: 16 additions & 1 deletion table/merge_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (s *SimpleIterator) Valid() bool {
return s.idx >= 0 && s.idx < len(s.latestOffs)
}

func (s *SimpleIterator) Close() error {
return nil
}

func newSimpleIterator(keys []string, vals []string, reversed bool) *SimpleIterator {
k := make([]y.Key, len(keys))
v := make([][]byte, len(vals))
Expand Down Expand Up @@ -167,6 +171,7 @@ func TestSimpleIterator(t *testing.T) {
keys := []string{"1", "2", "3"}
vals := []string{"v1", "v2", "v3"}
it := newSimpleIterator(keys, vals, false)
defer it.Close()
it.Rewind()
k, v := getAll(it)
require.EqualValues(t, keys, k)
Expand All @@ -186,6 +191,7 @@ func TestMergeSingle(t *testing.T) {
vals := []string{"v1", "v2", "v3"}
it := newSimpleIterator(keys, vals, false)
mergeIt := NewMergeIterator([]y.Iterator{it}, false)
defer mergeIt.Close()
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, keys, k)
Expand All @@ -197,6 +203,7 @@ func TestMergeSingleReversed(t *testing.T) {
vals := []string{"v1", "v2", "v3"}
it := newSimpleIterator(keys, vals, true)
mergeIt := NewMergeIterator([]y.Iterator{it}, true)
defer mergeIt.Close()
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, reversed(keys), k)
Expand All @@ -210,6 +217,7 @@ func TestMergeMore(t *testing.T) {
it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, false)

mergeIt := NewMergeIterator([]y.Iterator{it1, it2, it3, it4}, false)
defer mergeIt.Close()
expectedKeys := []string{"1", "2", "3", "5", "7", "9"}
expectedVals := []string{"a1", "b2", "a3", "b5", "a7", "d9"}
mergeIt.Rewind()
Expand All @@ -225,6 +233,7 @@ func TestMergeIteratorNested(t *testing.T) {
it := newSimpleIterator(keys, vals, false)
mergeIt := NewMergeIterator([]y.Iterator{it}, false)
mergeIt2 := NewMergeIterator([]y.Iterator{mergeIt}, false)
defer mergeIt2.Close()
mergeIt2.Rewind()
k, v := getAll(mergeIt2)
require.EqualValues(t, keys, k)
Expand All @@ -237,6 +246,7 @@ func TestMergeIteratorSeek(t *testing.T) {
it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, false)
it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, false)
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false)
defer mergeIt.Close()
mergeIt.Seek([]byte("4"))
k, v := getAll(mergeIt)
require.EqualValues(t, []string{"5", "7", "9"}, k)
Expand All @@ -249,6 +259,7 @@ func TestMergeIteratorSeekReversed(t *testing.T) {
it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, true)
it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, true)
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true)
defer mergeIt.Close()
mergeIt.Seek([]byte("5"))
k, v := getAll(mergeIt)
require.EqualValues(t, []string{"5", "3", "2", "1"}, k)
Expand All @@ -261,6 +272,7 @@ func TestMergeIteratorSeekInvalid(t *testing.T) {
it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, false)
it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, false)
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false)
defer mergeIt.Close()
mergeIt.Seek([]byte("f"))
require.False(t, mergeIt.Valid())
}
Expand All @@ -271,6 +283,7 @@ func TestMergeIteratorSeekInvalidReversed(t *testing.T) {
it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, true)
it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, true)
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true)
defer mergeIt.Close()
mergeIt.Seek([]byte("0"))
require.False(t, mergeIt.Valid())
}
Expand All @@ -280,7 +293,7 @@ func TestMergeIteratorDuplicate(t *testing.T) {
it2 := newSimpleIterator([]string{"1"}, []string{"1"}, false)
it3 := newSimpleIterator([]string{"2"}, []string{"2"}, false)
it := NewMergeIterator([]y.Iterator{it3, it2, it1}, false)

defer it.Close()
var cnt int
for it.Rewind(); it.Valid(); it.Next() {
require.EqualValues(t, cnt+48, it.Key().UserKey[0])
Expand Down Expand Up @@ -322,6 +335,7 @@ func TestMultiVersionMergeIterator(t *testing.T) {
}
require.True(t, curVer <= 70)
}
it.Close()
}
}

Expand All @@ -338,6 +352,7 @@ func BenchmarkMergeIterator(b *testing.B) {
}
}
mergeIter := NewMergeIterator(simpleIters, false)
defer mergeIter.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mergeIter.Rewind()
Expand Down
Loading

0 comments on commit 0885bbf

Please sign in to comment.