Skip to content

Commit

Permalink
change y.Iterator interface to skip old versions. (pingcap#194)
Browse files Browse the repository at this point in the history
- Next only returns the latest version
- NextVersion returns the next older version.
  • Loading branch information
coocood authored May 28, 2020
1 parent 226126a commit c02ac36
Show file tree
Hide file tree
Showing 17 changed files with 469 additions and 393 deletions.
4 changes: 2 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (db *DB) DeleteFilesInRange(start, end []byte) {
for i, tbl := range pruneTbls {
it := tbl.NewIterator(false)
// TODO: use rate limiter to avoid burst IO.
for it.Rewind(); it.Valid(); it.Next() {
for it.Rewind(); it.Valid(); y.NextAllVersion(it) {
discardStats.collect(it.Value())
}
deletes[i] = tbl
Expand Down Expand Up @@ -846,7 +846,7 @@ func (db *DB) writeLevel0Table(s *memtable.Table, f *os.File) error {
b := sstable.NewTableBuilder(f, db.limiter, 0, db.opt.TableBuilderOptions)
defer b.Close()

for iter.Rewind(); iter.Valid(); iter.Next() {
for iter.Rewind(); iter.Valid(); y.NextAllVersion(iter) {
key := iter.Key()
value := iter.Value()
if db.opt.ValueThreshold > 0 && len(value.Value) > db.opt.ValueThreshold {
Expand Down
175 changes: 32 additions & 143 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ func (opts *IteratorOptions) OverlapMemTable(t *memtable.Table) bool {
return true
}
iter := t.NewIterator(false)
iter.Seek(opts.StartKey)
iter.Seek(opts.StartKey.UserKey)
if !iter.Valid() {
return false
}
if iter.Key().Compare(opts.EndKey) >= 0 {
if bytes.Compare(iter.Key().UserKey, opts.EndKey.UserKey) >= 0 {
return false
}
return true
Expand Down Expand Up @@ -259,8 +259,6 @@ type Iterator struct {
item *Item
itBuf Item
vs y.ValueStruct

lastUserKey []byte // Used to skip over multiple versions of the same key.
}

// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
Expand Down Expand Up @@ -326,16 +324,26 @@ func (it *Iterator) Close() {
// Next would advance the iterator by one. Always check it.Valid() after a Next()
// to ensure you have access to a valid it.Item().
func (it *Iterator) Next() {
if !it.opt.Reverse {
it.iitr.Next()
it.parseItemForward()
if it.opt.AllVersions && it.Valid() && it.iitr.NextVersion() {
it.updateItem()
return
}
it.parseItemReverse()
it.iitr.Next()
it.parseItem()
return
}

func (it *Iterator) parseItemForward() {
func (it *Iterator) updateItem() {
it.iitr.FillValue(&it.vs)
item := &it.itBuf
item.key = it.iitr.Key()
item.meta = it.vs.Meta
item.userMeta = it.vs.UserMeta
item.vptr = it.vs.Value
it.item = item
}

func (it *Iterator) parseItem() {
iitr := it.iitr
for iitr.Valid() {
key := iitr.Key()
Expand All @@ -344,166 +352,47 @@ func (it *Iterator) parseItemForward() {
continue
}
if key.Version > it.readTs {
iitr.Next()
continue
}
if !it.opt.AllVersions {
if possibleSameKey(it.lastUserKey, key.UserKey) && bytes.Equal(it.lastUserKey, key.UserKey) {
iitr.Next()
continue
}
it.lastUserKey = y.SafeCopy(it.lastUserKey, key.UserKey)
iitr.FillValue(&it.vs)
if isDeleted(it.vs.Meta) {
if !y.SeekToVersion(iitr, it.readTs) {
iitr.Next()
continue
}
} else {
iitr.FillValue(&it.vs)
}
item := &it.itBuf
item.key = key
item.meta = it.vs.Meta
item.userMeta = it.vs.UserMeta
item.vptr = it.vs.Value
it.item = item
it.updateItem()
if !it.opt.AllVersions && isDeleted(it.vs.Meta) {
iitr.Next()
continue
}
return
}
it.item = nil
}

func possibleSameKey(aKey, bKey []byte) bool {
if len(aKey) != len(bKey) {
return false
}
lastIdx := len(aKey) - 1
if aKey[lastIdx] != bKey[lastIdx] {
return false
}
return true
}

func isDeleted(meta byte) bool {
return meta&bitDelete > 0
}

func (it *Iterator) setItem(item *Item) {
it.item = item
}

// parseItemReverseOnce handles reverse iteration
// implementation. We store keys such that their versions are sorted in descending order. This makes
// forward iteration efficient, but reverse iteration complicated. This tradeoff is better because
// forward iteration is more common than reverse.
//
// This function advances the iterator.
func (it *Iterator) parseItemReverseOnce() bool {
mi := it.iitr
key := mi.Key()

// Skip badger keys.
if !it.opt.internalAccess && key.UserKey[0] == '!' {
mi.Next()
return false
}

// Skip any versions which are beyond the readTs.
if key.Version > it.readTs {
mi.Next()
return false
}

if it.opt.AllVersions {
// Return deleted or expired values also, otherwise user can't figure out
// whether the key was deleted.
it.iitr.FillValue(&it.vs)
item := &it.itBuf
it.fill(item)
it.setItem(item)
mi.Next()
return true
}

FILL:
// If deleted, advance and return.
mi.FillValue(&it.vs)
if isDeleted(it.vs.Meta) {
mi.Next()
return false
}

item := &it.itBuf
it.fill(item)
// fill item based on current cursor position. All Next calls have returned, so reaching here
// means no Next was called.

mi.Next() // Advance but no fill item yet.
if !mi.Valid() {
it.setItem(item)
return true
}

// Reverse direction.
mik := mi.Key()
if mik.Version <= it.readTs && mik.SameUserKey(item.key) {
// This is a valid potential candidate.
goto FILL
}
// Ignore the next candidate. Return the current one.
it.setItem(item)
return true
}

func (it *Iterator) fill(item *Item) {
item.meta = it.vs.Meta
item.userMeta = y.SafeCopy(item.userMeta, it.vs.UserMeta)
item.key.Copy(it.iitr.Key())
item.vptr = y.SafeCopy(item.vptr, it.vs.Value)
}

func (it *Iterator) parseItemReverse() {
it.item = nil
for it.iitr.Valid() {
if it.parseItemReverseOnce() {
// parseItemReverseOnce calls one extra next.
// This is used to deal with the complexity of reverse iteration.
break
}
}
}

// Seek would seek to the provided key if present. If absent, it would seek to the next smallest key
// greater than provided if iterating in the forward direction. Behavior would be reversed is
// iterating backwards.
func (it *Iterator) Seek(key []byte) {
it.lastUserKey = it.lastUserKey[:0]
if !it.opt.Reverse {
it.iitr.Seek(y.KeyWithTs(key, it.txn.readTs))
it.parseItemForward()
return
}

if len(key) == 0 {
it.iitr.Rewind()
it.parseItemReverse()
return
it.iitr.Seek(key)
} else {
if len(key) == 0 {
it.iitr.Rewind()
} else {
it.iitr.Seek(key)
}
}
it.iitr.Seek(y.KeyWithTs(key, 0))
it.parseItemReverse()
it.parseItem()
}

// Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
// smallest key if iterating forward, and largest if iterating backward. It does not keep track of
// whether the cursor started with a Seek().
func (it *Iterator) Rewind() {
it.lastUserKey = it.lastUserKey[:0]
if !it.opt.Reverse {
it.iitr.Rewind()
it.parseItemForward()
return
}
it.iitr.Rewind()
it.parseItemReverse()
it.parseItem()
}

func (it *Iterator) SetAllVersions(allVersions bool) {
Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (lc *levelsController) compactBuildTables(level int, cd *compactDef,
}
lastKey.Reset()
guard := searchGuard(it.Key().UserKey, guards)
for ; it.Valid(); it.Next() {
for ; it.Valid(); y.NextAllVersion(it) {
numRead++
vs := it.Value()
key := it.Key()
Expand Down
11 changes: 8 additions & 3 deletions table/concat_iterator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package table

import (
"bytes"
"sort"

"github.com/coocood/badger/y"
Expand Down Expand Up @@ -73,16 +74,16 @@ func (s *ConcatIterator) FillValue(vs *y.ValueStruct) {
}

// Seek brings us to element >= key if reversed is false. Otherwise, <= key.
func (s *ConcatIterator) Seek(key y.Key) {
func (s *ConcatIterator) Seek(key []byte) {
var idx int
if !s.reversed {
idx = sort.Search(len(s.tables), func(i int) bool {
return s.tables[i].Biggest().Compare(key) >= 0
return bytes.Compare(s.tables[i].Biggest().UserKey, key) >= 0
})
} else {
n := len(s.tables)
idx = n - 1 - sort.Search(n, func(i int) bool {
return s.tables[n-1-i].Smallest().Compare(key) <= 0
return bytes.Compare(s.tables[n-1-i].Smallest().UserKey, key) <= 0
})
}
if idx >= len(s.tables) || idx < 0 {
Expand Down Expand Up @@ -118,3 +119,7 @@ func (s *ConcatIterator) Next() {
}
}
}

func (s *ConcatIterator) NextVersion() bool {
return s.cur.NextVersion()
}
Loading

0 comments on commit c02ac36

Please sign in to comment.