diff --git a/db.go b/db.go index ad4e9a235..1eef7a6f7 100644 --- a/db.go +++ b/db.go @@ -449,7 +449,7 @@ func (db *DB) DeleteFilesInRange(start, end []byte) { for i, tbl := range pruneTbls { it := tbl.NewIterator(false) // TODO: use rate limiter to avoid burst IO. - for it.Rewind(); it.Valid(); it.Next() { + for it.Rewind(); it.Valid(); y.NextAllVersion(it) { discardStats.collect(it.Value()) } deletes[i] = tbl @@ -846,7 +846,7 @@ func (db *DB) writeLevel0Table(s *memtable.Table, f *os.File) error { b := sstable.NewTableBuilder(f, db.limiter, 0, db.opt.TableBuilderOptions) defer b.Close() - for iter.Rewind(); iter.Valid(); iter.Next() { + for iter.Rewind(); iter.Valid(); y.NextAllVersion(iter) { key := iter.Key() value := iter.Value() if db.opt.ValueThreshold > 0 && len(value.Value) > db.opt.ValueThreshold { diff --git a/iterator.go b/iterator.go index 3b7fcf11b..bbd4eefd8 100644 --- a/iterator.go +++ b/iterator.go @@ -197,11 +197,11 @@ func (opts *IteratorOptions) OverlapMemTable(t *memtable.Table) bool { return true } iter := t.NewIterator(false) - iter.Seek(opts.StartKey) + iter.Seek(opts.StartKey.UserKey) if !iter.Valid() { return false } - if iter.Key().Compare(opts.EndKey) >= 0 { + if bytes.Compare(iter.Key().UserKey, opts.EndKey.UserKey) >= 0 { return false } return true @@ -259,8 +259,6 @@ type Iterator struct { item *Item itBuf Item vs y.ValueStruct - - lastUserKey []byte // Used to skip over multiple versions of the same key. } // NewIterator returns a new iterator. Depending upon the options, either only keys, or both @@ -326,16 +324,26 @@ func (it *Iterator) Close() { // Next would advance the iterator by one. Always check it.Valid() after a Next() // to ensure you have access to a valid it.Item(). func (it *Iterator) Next() { - if !it.opt.Reverse { - it.iitr.Next() - it.parseItemForward() + if it.opt.AllVersions && it.Valid() && it.iitr.NextVersion() { + it.updateItem() return } - it.parseItemReverse() + it.iitr.Next() + it.parseItem() return } -func (it *Iterator) parseItemForward() { +func (it *Iterator) updateItem() { + it.iitr.FillValue(&it.vs) + item := &it.itBuf + item.key = it.iitr.Key() + item.meta = it.vs.Meta + item.userMeta = it.vs.UserMeta + item.vptr = it.vs.Value + it.item = item +} + +func (it *Iterator) parseItem() { iitr := it.iitr for iitr.Valid() { key := iitr.Key() @@ -344,166 +352,47 @@ func (it *Iterator) parseItemForward() { continue } if key.Version > it.readTs { - iitr.Next() - continue - } - if !it.opt.AllVersions { - if possibleSameKey(it.lastUserKey, key.UserKey) && bytes.Equal(it.lastUserKey, key.UserKey) { - iitr.Next() - continue - } - it.lastUserKey = y.SafeCopy(it.lastUserKey, key.UserKey) - iitr.FillValue(&it.vs) - if isDeleted(it.vs.Meta) { + if !y.SeekToVersion(iitr, it.readTs) { iitr.Next() continue } - } else { - iitr.FillValue(&it.vs) } - item := &it.itBuf - item.key = key - item.meta = it.vs.Meta - item.userMeta = it.vs.UserMeta - item.vptr = it.vs.Value - it.item = item + it.updateItem() + if !it.opt.AllVersions && isDeleted(it.vs.Meta) { + iitr.Next() + continue + } return } it.item = nil } -func possibleSameKey(aKey, bKey []byte) bool { - if len(aKey) != len(bKey) { - return false - } - lastIdx := len(aKey) - 1 - if aKey[lastIdx] != bKey[lastIdx] { - return false - } - return true -} - func isDeleted(meta byte) bool { return meta&bitDelete > 0 } -func (it *Iterator) setItem(item *Item) { - it.item = item -} - -// parseItemReverseOnce handles reverse iteration -// implementation. We store keys such that their versions are sorted in descending order. This makes -// forward iteration efficient, but reverse iteration complicated. This tradeoff is better because -// forward iteration is more common than reverse. -// -// This function advances the iterator. -func (it *Iterator) parseItemReverseOnce() bool { - mi := it.iitr - key := mi.Key() - - // Skip badger keys. - if !it.opt.internalAccess && key.UserKey[0] == '!' { - mi.Next() - return false - } - - // Skip any versions which are beyond the readTs. - if key.Version > it.readTs { - mi.Next() - return false - } - - if it.opt.AllVersions { - // Return deleted or expired values also, otherwise user can't figure out - // whether the key was deleted. - it.iitr.FillValue(&it.vs) - item := &it.itBuf - it.fill(item) - it.setItem(item) - mi.Next() - return true - } - -FILL: - // If deleted, advance and return. - mi.FillValue(&it.vs) - if isDeleted(it.vs.Meta) { - mi.Next() - return false - } - - item := &it.itBuf - it.fill(item) - // fill item based on current cursor position. All Next calls have returned, so reaching here - // means no Next was called. - - mi.Next() // Advance but no fill item yet. - if !mi.Valid() { - it.setItem(item) - return true - } - - // Reverse direction. - mik := mi.Key() - if mik.Version <= it.readTs && mik.SameUserKey(item.key) { - // This is a valid potential candidate. - goto FILL - } - // Ignore the next candidate. Return the current one. - it.setItem(item) - return true -} - -func (it *Iterator) fill(item *Item) { - item.meta = it.vs.Meta - item.userMeta = y.SafeCopy(item.userMeta, it.vs.UserMeta) - item.key.Copy(it.iitr.Key()) - item.vptr = y.SafeCopy(item.vptr, it.vs.Value) -} - -func (it *Iterator) parseItemReverse() { - it.item = nil - for it.iitr.Valid() { - if it.parseItemReverseOnce() { - // parseItemReverseOnce calls one extra next. - // This is used to deal with the complexity of reverse iteration. - break - } - } -} - // Seek would seek to the provided key if present. If absent, it would seek to the next smallest key // greater than provided if iterating in the forward direction. Behavior would be reversed is // iterating backwards. func (it *Iterator) Seek(key []byte) { - it.lastUserKey = it.lastUserKey[:0] if !it.opt.Reverse { - it.iitr.Seek(y.KeyWithTs(key, it.txn.readTs)) - it.parseItemForward() - return - } - - if len(key) == 0 { - it.iitr.Rewind() - it.parseItemReverse() - return + it.iitr.Seek(key) + } else { + if len(key) == 0 { + it.iitr.Rewind() + } else { + it.iitr.Seek(key) + } } - it.iitr.Seek(y.KeyWithTs(key, 0)) - it.parseItemReverse() + it.parseItem() } // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the // smallest key if iterating forward, and largest if iterating backward. It does not keep track of // whether the cursor started with a Seek(). func (it *Iterator) Rewind() { - it.lastUserKey = it.lastUserKey[:0] - if !it.opt.Reverse { - it.iitr.Rewind() - it.parseItemForward() - return - } it.iitr.Rewind() - it.parseItemReverse() + it.parseItem() } func (it *Iterator) SetAllVersions(allVersions bool) { diff --git a/levels.go b/levels.go index 83a998fee..eae7cd7b7 100644 --- a/levels.go +++ b/levels.go @@ -415,7 +415,7 @@ func (lc *levelsController) compactBuildTables(level int, cd *compactDef, } lastKey.Reset() guard := searchGuard(it.Key().UserKey, guards) - for ; it.Valid(); it.Next() { + for ; it.Valid(); y.NextAllVersion(it) { numRead++ vs := it.Value() key := it.Key() diff --git a/table/concat_iterator.go b/table/concat_iterator.go index 926c3b05f..7c1c9ceec 100644 --- a/table/concat_iterator.go +++ b/table/concat_iterator.go @@ -1,6 +1,7 @@ package table import ( + "bytes" "sort" "github.com/coocood/badger/y" @@ -73,16 +74,16 @@ func (s *ConcatIterator) FillValue(vs *y.ValueStruct) { } // Seek brings us to element >= key if reversed is false. Otherwise, <= key. -func (s *ConcatIterator) Seek(key y.Key) { +func (s *ConcatIterator) Seek(key []byte) { var idx int if !s.reversed { idx = sort.Search(len(s.tables), func(i int) bool { - return s.tables[i].Biggest().Compare(key) >= 0 + return bytes.Compare(s.tables[i].Biggest().UserKey, key) >= 0 }) } else { n := len(s.tables) idx = n - 1 - sort.Search(n, func(i int) bool { - return s.tables[n-1-i].Smallest().Compare(key) <= 0 + return bytes.Compare(s.tables[n-1-i].Smallest().UserKey, key) <= 0 }) } if idx >= len(s.tables) || idx < 0 { @@ -118,3 +119,7 @@ func (s *ConcatIterator) Next() { } } } + +func (s *ConcatIterator) NextVersion() bool { + return s.cur.NextVersion() +} diff --git a/table/memtable/skl.go b/table/memtable/skl.go index 14a6bcb2c..16965a2b3 100644 --- a/table/memtable/skl.go +++ b/table/memtable/skl.go @@ -578,43 +578,29 @@ func (s *Iterator) FillValue(vs *y.ValueStruct) { // Next advances to the next position. func (s *Iterator) Next() { y.Assert(s.Valid()) + s.n = s.list.getNext(s.n, 0) + s.loadNode() +} + +func (s *Iterator) NextVersion() bool { if s.valListIdx+1 < len(s.valList) { s.setValueListIdx(s.valListIdx + 1) - } else { - s.n = s.list.getNext(s.n, 0) - s.loadNode() + return true } + return false } // Prev advances to the previous position. func (s *Iterator) Prev() { y.Assert(s.Valid()) - if s.valListIdx > 0 { - s.setValueListIdx(s.valListIdx - 1) - return - } s.n, _ = s.list.findNear(s.uk, true, false) // find <. No equality allowed. s.loadNode() - if len(s.valList) > 0 { - s.setValueListIdx(len(s.valList) - 1) - } } // Seek advances to the first entry with a key >= target. -func (s *Iterator) Seek(target y.Key) { - s.n, _ = s.list.findNear(target.UserKey, false, true) // find >=. +func (s *Iterator) Seek(target []byte) { + s.n, _ = s.list.findNear(target, false, true) // find >=. s.loadNode() - if target.Version < s.v.Version && bytes.Equal(target.UserKey, s.uk) { - if len(s.valList) > 0 { - for i := 1; i < len(s.valList); i++ { - s.setValueListIdx(i) - if target.Version >= s.v.Version { - return - } - } - } - s.Next() - } } func (s *Iterator) loadNode() { @@ -650,20 +636,9 @@ func (s *Iterator) setValueListIdx(idx int) { } // SeekForPrev finds an entry with key <= target. -func (s *Iterator) SeekForPrev(target y.Key) { - s.n, _ = s.list.findNear(target.UserKey, true, true) // find <=. +func (s *Iterator) SeekForPrev(target []byte) { + s.n, _ = s.list.findNear(target, true, true) // find <=. s.loadNode() - if target.Version > s.v.Version { - if len(s.valList) > 0 { - for i := len(s.valList) - 1; i >= 0; i-- { - s.setValueListIdx(i) - if target.Version <= s.v.Version { - return - } - } - } - s.Prev() - } } // SeekToFirst seeks position at the first entry in list. @@ -678,9 +653,6 @@ func (s *Iterator) SeekToFirst() { func (s *Iterator) SeekToLast() { s.n = s.list.findLast() s.loadNode() - if len(s.valList) > 0 { - s.setValueListIdx(len(s.valList) - 1) - } } // UniIterator is a unidirectional memtable iterator. It is a thin wrapper around @@ -708,6 +680,10 @@ func (s *UniIterator) Next() { } } +func (s *UniIterator) NextVersion() bool { + return s.iter.NextVersion() +} + // Rewind implements y.Interface func (s *UniIterator) Rewind() { if !s.reversed { @@ -718,7 +694,7 @@ func (s *UniIterator) Rewind() { } // Seek implements y.Interface -func (s *UniIterator) Seek(key y.Key) { +func (s *UniIterator) Seek(key []byte) { if !s.reversed { s.iter.Seek(key) } else { diff --git a/table/memtable/skl_test.go b/table/memtable/skl_test.go index 01cf51ff4..2518fe0f1 100644 --- a/table/memtable/skl_test.go +++ b/table/memtable/skl_test.go @@ -49,15 +49,15 @@ func length(s *skiplist) int { } func TestEmpty(t *testing.T) { - key := y.Key{UserKey: []byte("aaa"), Version: 1} + key := []byte("aaa") l := newSkiplist(arenaSize) - v := l.Get(key.UserKey, key.Version) + v := l.Get(key, 1) require.True(t, v.Value == nil) // Cannot use require.Nil for unsafe.Pointer nil. for _, less := range []bool{true, false} { for _, allowEqual := range []bool{true, false} { - n, found := l.findNear(key.UserKey, less, allowEqual) + n, found := l.findNear(key, less, allowEqual) require.Nil(t, n) require.False(t, found) } @@ -317,44 +317,44 @@ func TestIteratorSeek(t *testing.T) { v := it.Value() require.EqualValues(t, "01000", v.Value) - it.Seek(y.KeyWithTs([]byte("01000"), 0)) + it.Seek([]byte("01000")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01000", v.Value) - it.Seek(y.KeyWithTs([]byte("01005"), 0)) + it.Seek([]byte("01005")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01010", v.Value) - it.Seek(y.KeyWithTs([]byte("01010"), 0)) + it.Seek([]byte("01010")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01010", v.Value) - it.Seek(y.KeyWithTs([]byte("99999"), 0)) + it.Seek([]byte("99999")) require.False(t, it.Valid()) // Try SeekForPrev. - it.SeekForPrev(y.KeyWithTs([]byte("00"), 0)) + it.SeekForPrev([]byte("00")) require.False(t, it.Valid()) - it.SeekForPrev(y.KeyWithTs([]byte("01000"), 0)) + it.SeekForPrev([]byte("01000")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01000", v.Value) - it.SeekForPrev(y.KeyWithTs([]byte("01005"), 0)) + it.SeekForPrev([]byte("01005")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01000", v.Value) - it.SeekForPrev(y.KeyWithTs([]byte("01010"), 0)) + it.SeekForPrev([]byte("01010")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01010", v.Value) - it.SeekForPrev(y.KeyWithTs([]byte("99999"), 0)) + it.SeekForPrev([]byte("99999")) require.True(t, it.Valid()) v = it.Value() require.EqualValues(t, "01990", v.Value) @@ -459,7 +459,7 @@ func TestIterateMultiVersion(t *testing.T) { var lastKey y.Key for it.SeekToFirst(); it.Valid(); it.Next() { if !lastKey.IsEmpty() { - require.True(t, lastKey.Compare(it.Key()) < 0) + require.True(t, bytes.Compare(lastKey.UserKey, it.Key().UserKey) < 0) } lastKey.Copy(it.Key()) } @@ -471,14 +471,14 @@ func TestIterateMultiVersion(t *testing.T) { valStr := fmt.Sprintf("%d_%d", id, k.Version) require.Equal(t, valStr, string(val.Value)) } else { - it.Seek(k) + it.Seek(k.UserKey) if it.Valid() { - require.True(t, it.Key().Compare(k) >= 0) + require.True(t, bytes.Compare(it.Key().UserKey, k.UserKey) >= 0) var cpKey y.Key cpKey.Copy(it.Key()) it.Prev() if it.Valid() { - require.True(t, it.Key().Compare(k) < 0, "%s %s %s", it.Key(), cpKey, k) + require.True(t, bytes.Compare(it.Key().UserKey, k.UserKey) < 0, "%s %s %s", it.Key(), cpKey, k) } } } @@ -487,18 +487,18 @@ func TestIterateMultiVersion(t *testing.T) { lastKey.Reset() for revIt.Rewind(); revIt.Valid(); revIt.Next() { if !lastKey.IsEmpty() { - require.Truef(t, lastKey.Compare(revIt.Key()) > 0, "%v %v", lastKey.String(), revIt.Key().String()) + require.Truef(t, bytes.Compare(lastKey.UserKey, revIt.Key().UserKey) > 0, "%v %v", lastKey.String(), revIt.Key().String()) } lastKey.Copy(revIt.Key()) } for i := 0; i < 1000; i++ { k := y.KeyWithTs([]byte(key("key", int(z.FastRand()%4000))), uint64(5+z.FastRand()%5)) // reverse iterator never seek to the same key with smaller version. - revIt.Seek(k) + revIt.Seek(k.UserKey) if !revIt.Valid() { continue } - require.True(t, revIt.Key().Compare(k) <= 0, "%s %s", revIt.Key(), k) + require.True(t, bytes.Compare(revIt.Key().UserKey, k.UserKey) <= 0, "%s %s", revIt.Key(), k) } } diff --git a/table/memtable/table.go b/table/memtable/table.go index 8f6f78b3a..aee664021 100644 --- a/table/memtable/table.go +++ b/table/memtable/table.go @@ -103,7 +103,7 @@ func (t *Table) Biggest() y.Key { func (t *Table) HasOverlap(start, end y.Key, includeEnd bool) bool { it := t.NewIterator(false) - it.Seek(start) + it.Seek(start.UserKey) if !it.Valid() { return false } @@ -172,16 +172,22 @@ func (t *Table) putToList(entries []Entry) { } type listNode struct { - next unsafe.Pointer // *listNode - entries []Entry - memSize int64 + next unsafe.Pointer // *listNode + entries []Entry + latestOffs []uint32 + memSize int64 } func newListNode(entries []Entry) *listNode { - n := &listNode{entries: entries} - for _, e := range n.entries { + n := &listNode{entries: entries, latestOffs: make([]uint32, 0, len(entries))} + var curKey []byte + for i, e := range n.entries { sz := e.EstimateSize() n.memSize += sz + if !bytes.Equal(e.Key, curKey) { + n.latestOffs = append(n.latestOffs, uint32(i)) + curKey = e.Key + } } return n } @@ -219,6 +225,7 @@ func (n *listNode) newIterator(reverse bool) *listNodeIterator { type listNodeIterator struct { idx int + verIdx uint32 n *listNode reversed bool } @@ -226,33 +233,54 @@ type listNodeIterator struct { func (it *listNodeIterator) Next() { if !it.reversed { it.idx++ + it.verIdx = 0 } else { it.idx-- + it.verIdx = 0 + } +} + +func (it *listNodeIterator) NextVersion() bool { + nextKeyEntryOff := uint32(len(it.n.entries)) + if it.idx+1 < len(it.n.latestOffs) { + nextKeyEntryOff = it.n.latestOffs[it.idx+1] } + if it.getEntryIdx()+1 < nextKeyEntryOff { + it.verIdx++ + return true + } + return false +} + +func (it *listNodeIterator) getEntryIdx() uint32 { + return it.n.latestOffs[it.idx] + it.verIdx } func (it *listNodeIterator) Rewind() { if !it.reversed { it.idx = 0 + it.verIdx = 0 } else { - it.idx = len(it.n.entries) - 1 + it.idx = len(it.n.latestOffs) - 1 + it.verIdx = 0 } } -func (it *listNodeIterator) Seek(key y.Key) { - it.idx = sort.Search(len(it.n.entries), func(i int) bool { - e := it.n.entries[i] - return y.KeyWithTs(e.Key, e.Value.Version).Compare(key) >= 0 +func (it *listNodeIterator) Seek(key []byte) { + it.idx = sort.Search(len(it.n.latestOffs), func(i int) bool { + e := &it.n.entries[it.n.latestOffs[i]] + return bytes.Compare(e.Key, key) >= 0 }) + it.verIdx = 0 if it.reversed { - if !it.Valid() || !it.Key().Equal(key) { + if !it.Valid() || !bytes.Equal(it.Key().UserKey, key) { it.idx-- } } } func (it *listNodeIterator) Key() y.Key { - e := it.n.entries[it.idx] + e := &it.n.entries[it.getEntryIdx()] return y.KeyWithTs(e.Key, e.Value.Version) } @@ -260,4 +288,4 @@ func (it *listNodeIterator) Value() y.ValueStruct { return it.n.entries[it.idx]. func (it *listNodeIterator) FillValue(vs *y.ValueStruct) { *vs = it.Value() } -func (it *listNodeIterator) Valid() bool { return it.idx >= 0 && it.idx < len(it.n.entries) } +func (it *listNodeIterator) Valid() bool { return it.idx >= 0 && it.idx < len(it.n.latestOffs) } diff --git a/table/memtable/table_test.go b/table/memtable/table_test.go new file mode 100644 index 000000000..842eee0bd --- /dev/null +++ b/table/memtable/table_test.go @@ -0,0 +1,58 @@ +package memtable + +import ( + "fmt" + "github.com/coocood/badger/y" + "github.com/stretchr/testify/require" + "testing" +) + +func TestListNodeIterator(t *testing.T) { + var entries []Entry + for i := 0; i < 100; i++ { + numVer := i%10 + 1 + for j := 0; j < numVer; j++ { + entries = append(entries, newTestEntry(newKey(i), 10-j)) + } + } + ln := newListNode(entries) + it := ln.newIterator(true) + require.Equal(t, len(ln.latestOffs), 100) + for i := 0; i < 100; i++ { + key := newKey(i) + it.Seek(key) + require.EqualValues(t, it.Key().UserKey, key) + numVer := i%10 + 1 + require.True(t, it.Key().Version == 10) + for j := 1; j < numVer; j++ { + require.True(t, it.NextVersion()) + require.True(t, it.Key().Version == uint64(10-j)) + } + } + it.Rewind() + for i := 98; i >= 0; i-- { + it.Next() + key := newKey(i) + require.EqualValues(t, it.Key().UserKey, key) + } + it = ln.newIterator(false) + for i := 1; i < 100; i++ { + it.Next() + key := newKey(i) + require.EqualValues(t, it.Key().UserKey, key) + } +} + +func newKey(i int) []byte { + return []byte(fmt.Sprintf("key%.3d", i)) +} + +func newTestEntry(key []byte, version int) Entry { + return Entry{ + Key: key, + Value: y.ValueStruct{ + Value: key, + Version: uint64(version), + }, + } +} diff --git a/table/merge_iterator.go b/table/merge_iterator.go index 7d907bf12..4b034e3d0 100644 --- a/table/merge_iterator.go +++ b/table/merge_iterator.go @@ -1,18 +1,21 @@ package table import ( + "bytes" + "github.com/coocood/badger/y" ) // MergeTowIterator is a specialized MergeIterator that only merge tow iterators. // It is an optimization for compaction. type MergeIterator struct { - smaller mergeIteratorChild - bigger mergeIteratorChild + smaller *mergeIteratorChild + bigger *mergeIteratorChild // when the two iterators has the same value, the value in the second iterator is ignored. second y.Iterator reverse bool + sameKey bool } type mergeIteratorChild struct { @@ -55,26 +58,15 @@ func (mt *MergeIterator) fix() { return } for mt.smaller.valid { - cmp := mt.smaller.key.Compare(mt.bigger.key) + cmp := bytes.Compare(mt.smaller.key.UserKey, mt.bigger.key.UserKey) if cmp == 0 { - // Ignore the value in second iterator. - mt.second.Next() - var secondValid bool - if mt.second == mt.smaller.iter { - mt.smaller.reset() - secondValid = mt.smaller.valid - } else { - mt.bigger.reset() - secondValid = mt.bigger.valid - } - if !secondValid { - if mt.second == mt.smaller.iter && mt.bigger.valid { - mt.swap() - } - return + mt.sameKey = true + if mt.smaller.iter == mt.second { + mt.swap() } - continue + return } + mt.sameKey = false if mt.reverse { if cmp < 0 { mt.swap() @@ -103,9 +95,48 @@ func (mt *MergeIterator) Next() { mt.smaller.iter.Next() } mt.smaller.reset() + if mt.sameKey && mt.bigger.valid { + if mt.bigger.merge != nil { + mt.bigger.merge.Next() + } else if mt.bigger.concat != nil { + mt.bigger.concat.Next() + } else { + mt.bigger.iter.Next() + } + mt.bigger.reset() + } mt.fix() } +func (mt *MergeIterator) NextVersion() bool { + if mt.smaller.iter.NextVersion() { + mt.smaller.reset() + return true + } + if !mt.sameKey { + return false + } + if !mt.bigger.valid { + return false + } + if mt.smaller.key.Version < mt.bigger.key.Version { + // The smaller is second iterator, the bigger is first iterator. + // We have swapped already, no more versions. + return false + } + if mt.smaller.key.Version == mt.bigger.key.Version { + // have duplicated key in the two iterators. + if mt.bigger.iter.NextVersion() { + mt.bigger.reset() + mt.swap() + return true + } + return false + } + mt.swap() + return true +} + // Rewind seeks to first element (or last element for reverse iterator). func (mt *MergeIterator) Rewind() { mt.smaller.iter.Rewind() @@ -116,7 +147,7 @@ func (mt *MergeIterator) Rewind() { } // Seek brings us to element with key >= given key. -func (mt *MergeIterator) Seek(key y.Key) { +func (mt *MergeIterator) Seek(key []byte) { mt.smaller.iter.Seek(key) mt.smaller.reset() mt.bigger.iter.Seek(key) @@ -159,6 +190,8 @@ func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator { mi := &MergeIterator{ second: iters[1], reverse: reverse, + smaller: new(mergeIteratorChild), + bigger: new(mergeIteratorChild), } mi.smaller.setIterator(iters[0]) mi.bigger.setIterator(iters[1]) @@ -172,9 +205,13 @@ type EmptyIterator struct{} func (e *EmptyIterator) Next() {} +func (e *EmptyIterator) NextVersion() bool { + return false +} + func (e *EmptyIterator) Rewind() {} -func (e *EmptyIterator) Seek(key y.Key) {} +func (e *EmptyIterator) Seek(key []byte) {} func (e *EmptyIterator) Key() y.Key { return y.Key{} diff --git a/table/merge_iterator_test.go b/table/merge_iterator_test.go index ab3b83f0d..3533743e4 100644 --- a/table/merge_iterator_test.go +++ b/table/merge_iterator_test.go @@ -17,10 +17,12 @@ package table import ( + "bytes" "fmt" "sort" "testing" + "github.com/coocood/badger/cache/z" "github.com/coocood/badger/y" "github.com/stretchr/testify/require" ) @@ -30,6 +32,9 @@ type SimpleIterator struct { vals [][]byte idx int reversed bool + + latestOffs []int + verIdx int } var ( @@ -39,62 +44,111 @@ var ( func (s *SimpleIterator) Next() { if !s.reversed { s.idx++ + s.verIdx = 0 } else { s.idx-- + s.verIdx = 0 + } +} + +func (s *SimpleIterator) NextVersion() bool { + nextEntryOff := len(s.keys) + if s.idx+1 < len(s.latestOffs) { + nextEntryOff = s.latestOffs[s.idx+1] + } + if s.entryIdx()+1 < nextEntryOff { + s.verIdx++ + return true } + return false } func (s *SimpleIterator) Rewind() { if !s.reversed { s.idx = 0 + s.verIdx = 0 } else { - s.idx = len(s.keys) - 1 + s.idx = len(s.latestOffs) - 1 + s.verIdx = 0 } } -func (s *SimpleIterator) Seek(key y.Key) { - if !s.reversed { - s.idx = sort.Search(len(s.keys), func(i int) bool { - return s.keys[i].Compare(key) >= 0 - }) - } else { - n := len(s.keys) - s.idx = n - 1 - sort.Search(n, func(i int) bool { - return s.keys[n-1-i].Compare(key) <= 0 - }) +func (s *SimpleIterator) Seek(key []byte) { + s.idx = sort.Search(len(s.latestOffs), func(i int) bool { + return bytes.Compare(s.keys[s.latestOffs[i]].UserKey, key) >= 0 + }) + s.verIdx = 0 + if s.reversed { + if !s.Valid() || !bytes.Equal(s.Key().UserKey, key) { + s.idx-- + } } } -func (s *SimpleIterator) Key() y.Key { return s.keys[s.idx] } +func (s *SimpleIterator) Key() y.Key { + return s.keys[s.entryIdx()] +} + +func (s *SimpleIterator) entryIdx() int { + return s.latestOffs[s.idx] + s.verIdx +} + func (s *SimpleIterator) Value() y.ValueStruct { return y.ValueStruct{ - Value: s.vals[s.idx], + Value: s.vals[s.entryIdx()], UserMeta: []byte{55}, Meta: 0, } } func (s *SimpleIterator) FillValue(vs *y.ValueStruct) { - vs.Value = s.vals[s.idx] + vs.Value = s.vals[s.entryIdx()] vs.UserMeta = []byte{55} // arbitrary value for test vs.Meta = 0 } func (s *SimpleIterator) Valid() bool { - return s.idx >= 0 && s.idx < len(s.keys) + return s.idx >= 0 && s.idx < len(s.latestOffs) } func newSimpleIterator(keys []string, vals []string, reversed bool) *SimpleIterator { k := make([]y.Key, len(keys)) v := make([][]byte, len(vals)) + lastestOffs := make([]int, len(keys)) y.Assert(len(keys) == len(vals)) for i := 0; i < len(keys); i++ { k[i] = y.KeyWithTs([]byte(keys[i]), 0) v[i] = []byte(vals[i]) + lastestOffs[i] = i } return &SimpleIterator{ - keys: k, - vals: v, - idx: -1, - reversed: reversed, + keys: k, + vals: v, + idx: -1, + reversed: reversed, + latestOffs: lastestOffs, + } +} + +func newMultiVersionSimpleIterator(maxVersion, minVersion uint64, reversed bool) *SimpleIterator { + var latestOffs []int + var keys []y.Key + var vals [][]byte + for i := 0; i < 100; i++ { + latestOffs = append(latestOffs, len(keys)) + key := []byte(fmt.Sprintf("key%.3d", i)) + for j := maxVersion; j >= minVersion; j-- { + keys = append(keys, y.KeyWithTs(key, j)) + vals = append(vals, key) + if z.FastRand()%4 == 0 { + break + } + } + } + return &SimpleIterator{ + keys: keys, + vals: vals, + idx: 0, + reversed: reversed, + latestOffs: latestOffs, } } @@ -183,7 +237,7 @@ func TestMergeIteratorSeek(t *testing.T) { it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, false) it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, false) mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false) - mergeIt.Seek(y.KeyWithTs([]byte("4"), 0)) + mergeIt.Seek([]byte("4")) k, v := getAll(mergeIt) require.EqualValues(t, []string{"5", "7", "9"}, k) require.EqualValues(t, []string{"b5", "a7", "d9"}, v) @@ -195,7 +249,7 @@ func TestMergeIteratorSeekReversed(t *testing.T) { it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, true) it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, true) mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true) - mergeIt.Seek(y.KeyWithTs([]byte("5"), 0)) + mergeIt.Seek([]byte("5")) k, v := getAll(mergeIt) require.EqualValues(t, []string{"5", "3", "2", "1"}, k) require.EqualValues(t, []string{"b5", "a3", "b2", "a1"}, v) @@ -207,7 +261,7 @@ func TestMergeIteratorSeekInvalid(t *testing.T) { it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, false) it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, false) mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false) - mergeIt.Seek(y.KeyWithTs([]byte("f"), 0)) + mergeIt.Seek([]byte("f")) require.False(t, mergeIt.Valid()) } @@ -217,7 +271,7 @@ func TestMergeIteratorSeekInvalidReversed(t *testing.T) { it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, true) it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, true) mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true) - mergeIt.Seek(y.KeyWithTs([]byte("0"), 0)) + mergeIt.Seek([]byte("0")) require.False(t, mergeIt.Valid()) } @@ -235,6 +289,42 @@ func TestMergeIteratorDuplicate(t *testing.T) { require.Equal(t, 3, cnt) } +func TestMultiVersionMergeIterator(t *testing.T) { + for _, reverse := range []bool{true, false} { + it1 := newMultiVersionSimpleIterator(100, 90, reverse) + it2 := newMultiVersionSimpleIterator(90, 80, reverse) + it3 := newMultiVersionSimpleIterator(80, 70, reverse) + it4 := newMultiVersionSimpleIterator(70, 60, reverse) + it := NewMergeIterator([]y.Iterator{it1, it2, it3, it4}, reverse) + + it.Rewind() + curKey := it.Key().UserKey + for i := 1; i < 100; i++ { + it.Next() + require.True(t, it.Valid()) + require.False(t, bytes.Equal(curKey, it.Key().UserKey)) + curKey = it.Key().UserKey + curVer := it.Key().Version + for it.NextVersion() { + require.True(t, it.Key().Version < curVer) + curVer = it.Key().Version + } + } + for i := 0; i < 100; i++ { + key := []byte(fmt.Sprintf("key%.3d", z.FastRand()%100)) + it.Seek(key) + require.True(t, it.Valid()) + require.EqualValues(t, it.Key().UserKey, key) + curVer := it.Key().Version + for it.NextVersion() { + require.True(t, it.Key().Version < curVer) + curVer = it.Key().Version + } + require.True(t, curVer <= 70) + } + } +} + func BenchmarkMergeIterator(b *testing.B) { num := 2 simpleIters := make([]y.Iterator, num) diff --git a/table/sstable/iterator.go b/table/sstable/iterator.go index 4e0f42938..eec7d8807 100644 --- a/table/sstable/iterator.go +++ b/table/sstable/iterator.go @@ -29,6 +29,7 @@ import ( type singleKeyIterator struct { oldOffset uint32 + loaded bool latestVal []byte oldVals entrySlice idx int @@ -37,13 +38,9 @@ type singleKeyIterator struct { func (ski *singleKeyIterator) set(oldOffset uint32, latestVal []byte) { ski.oldOffset = oldOffset - numEntries := bytesToU32(ski.oldBlock[oldOffset:]) - endOffsStartIdx := oldOffset + 4 - endOffsEndIdx := endOffsStartIdx + 4*numEntries - ski.oldVals.endOffs = bytesToU32Slice(ski.oldBlock[endOffsStartIdx:endOffsEndIdx]) - valueEndOff := endOffsEndIdx + ski.oldVals.endOffs[numEntries-1] - ski.oldVals.data = ski.oldBlock[endOffsEndIdx:valueEndOff] ski.latestVal = latestVal + ski.loaded = false + ski.idx = 0 } func (ski *singleKeyIterator) getVal() (val []byte) { @@ -54,18 +51,18 @@ func (ski *singleKeyIterator) getVal() (val []byte) { return oldEntry } -func (ski *singleKeyIterator) length() int { - return ski.oldVals.length() + 1 +func (ski *singleKeyIterator) loadOld() { + numEntries := bytesToU32(ski.oldBlock[ski.oldOffset:]) + endOffsStartIdx := ski.oldOffset + 4 + endOffsEndIdx := endOffsStartIdx + 4*numEntries + ski.oldVals.endOffs = bytesToU32Slice(ski.oldBlock[endOffsStartIdx:endOffsEndIdx]) + valueEndOff := endOffsEndIdx + ski.oldVals.endOffs[numEntries-1] + ski.oldVals.data = ski.oldBlock[endOffsEndIdx:valueEndOff] + ski.loaded = true } -func (ski *singleKeyIterator) seekVersion(sVer uint64) (val []byte) { - for ski.idx = 0; ski.idx < ski.length(); ski.idx++ { - val = ski.getVal() - if sVer >= binary.LittleEndian.Uint64(val) { - return - } - } - return +func (ski *singleKeyIterator) length() int { + return ski.oldVals.length() + 1 } type blockIterator struct { @@ -113,24 +110,12 @@ func (itr *blockIterator) loadEntries(data []byte) { // Seek brings us to the first block element that is >= input key. // The binary search will begin at `start`, you can use it to skip some items. -func (itr *blockIterator) seek(key y.Key) { +func (itr *blockIterator) seek(key []byte) { foundEntryIdx := sort.Search(itr.entries.length(), func(idx int) bool { itr.setIdx(idx) - return bytes.Compare(itr.key.UserKey, key.UserKey) >= 0 + return bytes.Compare(itr.key.UserKey, key) >= 0 }) itr.setIdx(foundEntryIdx) - if itr.err != nil { - return - } - if itr.key.Version > key.Version && bytes.Equal(key.UserKey, itr.key.UserKey) { - if itr.hasOldVersion() { - itr.val = itr.ski.seekVersion(key.Version) - itr.key.Version = binary.LittleEndian.Uint64(itr.val) - } - if itr.key.Version > key.Version { - itr.setIdx(foundEntryIdx + 1) - } - } } // seekToFirst brings us to the first element. Valid should return true. @@ -141,7 +126,6 @@ func (itr *blockIterator) seekToFirst() { // seekToLast brings us to the last element. Valid should return true. func (itr *blockIterator) seekToLast() { itr.setIdx(itr.entries.length() - 1) - itr.seekToLastVersion() } // setIdx sets the iterator to the entry index and set the current key and value. @@ -181,45 +165,11 @@ func (itr *blockIterator) hasOldVersion() bool { } func (itr *blockIterator) next() { - if itr.hasOldVersion() { - if itr.setVersionIdx(itr.ski.idx + 1) { - return - } - } itr.setIdx(itr.idx + 1) } func (itr *blockIterator) prev() { - if itr.prevVersion() { - return - } itr.setIdx(itr.idx - 1) - itr.seekToLastVersion() -} - -func (itr *blockIterator) prevVersion() bool { - if itr.hasOldVersion() { - if itr.setVersionIdx(itr.ski.idx - 1) { - return true - } - } - return false -} - -func (itr *blockIterator) seekToLastVersion() { - if itr.hasOldVersion() { - itr.setVersionIdx(itr.ski.length() - 1) - } -} - -func (itr *blockIterator) setVersionIdx(idx int) bool { - if idx >= 0 && idx < itr.ski.length() { - itr.ski.idx = idx - itr.val = itr.ski.getVal() - itr.key.Version = bytesToU64(itr.val) - return true - } - return false } // Iterator is an iterator for a Table. @@ -310,7 +260,7 @@ func (itr *Iterator) seekToLast() { itr.err = itr.bi.Error() } -func (itr *Iterator) seekInBlock(blockIdx int, key y.Key) { +func (itr *Iterator) seekInBlock(blockIdx int, key []byte) { itr.bpos = blockIdx block, err := itr.t.block(blockIdx, itr.tIdx) if err != nil { @@ -322,7 +272,7 @@ func (itr *Iterator) seekInBlock(blockIdx int, key y.Key) { itr.err = itr.bi.Error() } -func (itr *Iterator) seekFromOffset(blockIdx int, offset int, key y.Key) { +func (itr *Iterator) seekFromOffset(blockIdx int, offset int, key []byte) { itr.bpos = blockIdx block, err := itr.t.block(blockIdx, itr.tIdx) if err != nil { @@ -331,22 +281,22 @@ func (itr *Iterator) seekFromOffset(blockIdx int, offset int, key y.Key) { } itr.bi.setBlock(block) itr.bi.setIdx(offset) - if itr.bi.key.Compare(key) >= 0 { + if bytes.Compare(itr.bi.key.UserKey, key) >= 0 { return } itr.bi.seek(key) itr.err = itr.bi.err } -func (itr *Iterator) seekBlock(key y.Key) int { +func (itr *Iterator) seekBlock(key []byte) int { return sort.Search(len(itr.tIdx.blockEndOffsets), func(idx int) bool { blockBaseKey := itr.tIdx.baseKeys.getEntry(idx) - return bytes.Compare(blockBaseKey, key.UserKey) > 0 + return bytes.Compare(blockBaseKey, key) > 0 }) } // seekFrom brings us to a key that is >= input key. -func (itr *Iterator) seekFrom(key y.Key) { +func (itr *Iterator) seekFrom(key []byte) { itr.err = nil itr.reset() @@ -382,7 +332,7 @@ func (itr *Iterator) seekFrom(key y.Key) { } // seek will reset iterator and seek to >= key. -func (itr *Iterator) seek(key y.Key) { +func (itr *Iterator) seek(key []byte) { itr.err = nil itr.reset() if itr.surf == nil { @@ -391,7 +341,7 @@ func (itr *Iterator) seek(key y.Key) { } sit := itr.surf - sit.Seek(key.UserKey) + sit.Seek(key) if !sit.Valid() { itr.err = io.EOF return @@ -403,10 +353,10 @@ func (itr *Iterator) seek(key y.Key) { } // seekForPrev will reset iterator and seek to <= key. -func (itr *Iterator) seekForPrev(key y.Key) { +func (itr *Iterator) seekForPrev(key []byte) { // TODO: Optimize this. We shouldn't have to take a Prev step. itr.seekFrom(key) - if !itr.Key().Equal(key) { + if !bytes.Equal(itr.Key().UserKey, key) { itr.prev() } } @@ -493,6 +443,22 @@ func (itr *Iterator) Next() { } } +func (itr *Iterator) NextVersion() bool { + if itr.bi.ski.oldOffset == 0 { + return false + } + if !itr.bi.ski.loaded { + itr.bi.ski.loadOld() + } + if itr.bi.ski.idx+1 < itr.bi.ski.length() { + itr.bi.ski.idx++ + itr.bi.val = itr.bi.ski.getVal() + itr.bi.key.Version = bytesToU64(itr.bi.val) + return true + } + return false +} + // Rewind follows the y.Iterator interface func (itr *Iterator) Rewind() { if !itr.reversed { @@ -503,7 +469,7 @@ func (itr *Iterator) Rewind() { } // Seek follows the y.Iterator interface -func (itr *Iterator) Seek(key y.Key) { +func (itr *Iterator) Seek(key []byte) { if !itr.reversed { itr.seek(key) } else { diff --git a/table/sstable/table.go b/table/sstable/table.go index 0cc6a50d2..c9b2c8d1b 100644 --- a/table/sstable/table.go +++ b/table/sstable/table.go @@ -189,7 +189,7 @@ func (t *Table) Get(key y.Key, keyHash uint64) (y.ValueStruct, error) { } if !ok { it := t.NewIterator(false) - it.Seek(key) + it.Seek(key.UserKey) if !it.Valid() { return y.ValueStruct{}, nil } @@ -239,11 +239,14 @@ func (t *Table) pointGet(key y.Key, keyHash uint64) (y.Key, y.ValueStruct, bool, } it := t.newIterator(false) - it.seekFromOffset(int(blkIdx), int(offset), key) + it.seekFromOffset(int(blkIdx), int(offset), key.UserKey) if !it.Valid() || !key.SameUserKey(it.Key()) { return y.Key{}, y.ValueStruct{}, true, it.Error() } + if !y.SeekToVersion(it, key.Version) { + return y.Key{}, y.ValueStruct{}, true, it.Error() + } return it.Key(), it.Value(), true, nil } @@ -511,7 +514,7 @@ func (t *Table) HasOverlap(start, end y.Key, includeEnd bool) bool { // If there are errors occurred during seeking, // we assume the table has overlapped with the range to prevent data loss. it := t.newIteratorWithIdx(false, idx) - it.Seek(start) + it.Seek(start.UserKey) if !it.Valid() { return it.Error() != nil } diff --git a/table/sstable/table_test.go b/table/sstable/table_test.go index ccdfecd59..0d2a0d78d 100644 --- a/table/sstable/table_test.go +++ b/table/sstable/table_test.go @@ -344,7 +344,7 @@ func TestSeekBasic(t *testing.T) { } for _, tt := range data { - it.seek(y.KeyWithTs([]byte(tt.in), 0)) + it.seek([]byte(tt.in)) if !tt.valid { require.False(t, it.Valid()) continue @@ -378,7 +378,7 @@ func TestSeekForPrev(t *testing.T) { } for _, tt := range data { - it.seekForPrev(y.KeyWithTs([]byte(tt.in), 0)) + it.seekForPrev([]byte(tt.in)) if !tt.valid { require.False(t, it.Valid()) continue @@ -425,7 +425,7 @@ func TestIterateFromEnd(t *testing.T) { defer table.Delete() ti := table.newIterator(false) ti.reset() - ti.seek(y.KeyWithTs([]byte("zzzzzz"), 0)) // Seek to end, an invalid element. + ti.seek([]byte("zzzzzz")) // Seek to end, an invalid element. require.False(t, ti.Valid()) ti.seekToLast() for i := n - 1; i >= 0; i-- { @@ -449,7 +449,7 @@ func TestTable(t *testing.T) { ti := table.newIterator(false) kid := 1010 seek := y.KeyWithTs([]byte(key("key", kid)), 0) - for ti.seek(seek); ti.Valid(); ti.next() { + for ti.seek(seek.UserKey); ti.Valid(); ti.next() { k := ti.Key() require.EqualValues(t, string(k.UserKey), key("key", kid)) kid++ @@ -458,10 +458,10 @@ func TestTable(t *testing.T) { t.Errorf("Expected kid: 10000. Got: %v", kid) } - ti.seek(y.KeyWithTs([]byte(key("key", 99999)), 0)) + ti.seek([]byte(key("key", 99999))) require.False(t, ti.Valid()) - ti.seek(y.KeyWithTs([]byte(key("key", -1)), 0)) + ti.seek([]byte(key("key", -1))) require.True(t, ti.Valid()) k := ti.Key() require.EqualValues(t, string(k.UserKey), key("key", 0)) @@ -475,7 +475,7 @@ func TestIterateBackAndForth(t *testing.T) { seek := y.KeyWithTs([]byte(key("key", 1010)), 0) it := table.newIterator(false) - it.seek(seek) + it.seek(seek.UserKey) require.True(t, it.Valid()) k := it.Key() require.EqualValues(t, seek, k) @@ -492,7 +492,7 @@ func TestIterateBackAndForth(t *testing.T) { k = it.Key() require.EqualValues(t, key("key", 1010), k.UserKey) - it.seek(y.KeyWithTs([]byte(key("key", 2000)), 0)) + it.seek([]byte(key("key", 2000))) require.True(t, it.Valid()) k = it.Key() require.EqualValues(t, key("key", 2000), k.UserKey) @@ -525,17 +525,17 @@ func TestIterateMultiVersion(t *testing.T) { kHash := farm.Fingerprint64(k.UserKey) gotKey, _, ok, _ := table.pointGet(k, kHash) if ok { - require.True(t, gotKey.SameUserKey(k)) - require.True(t, gotKey.Compare(k) >= 0) + if !gotKey.IsEmpty() { + require.True(t, gotKey.SameUserKey(k)) + require.True(t, gotKey.Compare(k) >= 0) + } } else { - it.Seek(k) + it.Seek(k.UserKey) if it.Valid() { - require.True(t, it.Key().Compare(k) >= 0) - var cpKey y.Key - cpKey.Copy(it.Key()) - it.prev() - if it.Valid() { - require.True(t, it.Key().Compare(k) < 0, "%s %s %s", it.Key(), cpKey, k) + require.True(t, it.Key().Version == 9) + require.True(t, bytes.Compare(it.Key().UserKey, k.UserKey) >= 0) + if y.SeekToVersion(it, k.Version) { + require.True(t, it.Key().Version <= k.Version) } } } @@ -551,10 +551,11 @@ func TestIterateMultiVersion(t *testing.T) { for i := 0; i < 1000; i++ { k := y.KeyWithTs([]byte(key("key", int(z.FastRand()%4000))), uint64(5+z.FastRand()%5)) // reverse iterator never seek to the same key with smaller version. - revIt.Seek(k) + revIt.Seek(k.UserKey) if !revIt.Valid() { continue } + require.True(t, revIt.Key().Version == 9) require.True(t, revIt.Key().Compare(k) <= 0, "%s %s", revIt.Key(), k) } } @@ -638,22 +639,22 @@ func TestConcatIterator(t *testing.T) { } require.EqualValues(t, 30000, count) - it.Seek(y.KeyWithTs([]byte("a"), 0)) + it.Seek([]byte("a")) require.EqualValues(t, "keya0000", string(it.Key().UserKey)) vs := it.Value() require.EqualValues(t, "0", string(vs.Value)) - it.Seek(y.KeyWithTs([]byte("keyb"), 0)) + it.Seek([]byte("keyb")) require.EqualValues(t, "keyb0000", string(it.Key().UserKey)) vs = it.Value() require.EqualValues(t, "0", string(vs.Value)) - it.Seek(y.KeyWithTs([]byte("keyb9999b"), 0)) + it.Seek([]byte("keyb9999b")) require.EqualValues(t, "keyc0000", string(it.Key().UserKey)) vs = it.Value() require.EqualValues(t, "0", string(vs.Value)) - it.Seek(y.KeyWithTs([]byte("keyd"), 0)) + it.Seek([]byte("keyd")) require.False(t, it.Valid()) } { @@ -669,20 +670,20 @@ func TestConcatIterator(t *testing.T) { } require.EqualValues(t, 30000, count) - it.Seek(y.KeyWithTs([]byte("a"), 0)) + it.Seek([]byte("a")) require.False(t, it.Valid()) - it.Seek(y.KeyWithTs([]byte("keyb"), 0)) + it.Seek([]byte("keyb")) require.EqualValues(t, "keya9999", string(it.Key().UserKey)) vs := it.Value() require.EqualValues(t, "9999", string(vs.Value)) - it.Seek(y.KeyWithTs([]byte("keyb9999b"), 0)) + it.Seek([]byte("keyb9999b")) require.EqualValues(t, "keyb9999", string(it.Key().UserKey)) vs = it.Value() require.EqualValues(t, "9999", string(vs.Value)) - it.Seek(y.KeyWithTs([]byte("keyd"), 0)) + it.Seek([]byte("keyd")) require.EqualValues(t, "keyc9999", string(it.Key().UserKey)) vs = it.Value() require.EqualValues(t, "9999", string(vs.Value)) @@ -973,7 +974,7 @@ func BenchmarkPointGet(b *testing.B) { for i := 0; i < n; i++ { k := keys[rand.Intn(n)] it := tbl.newIterator(false) - it.Seek(k) + it.Seek(k.UserKey) if !it.Valid() { continue } @@ -1001,7 +1002,7 @@ func BenchmarkPointGet(b *testing.B) { resultKey, resultVs, ok, _ = tbl.pointGet(k, keyHash) if !ok { it := tbl.newIterator(false) - it.Seek(k) + it.Seek(k.UserKey) if !it.Valid() { continue } @@ -1108,7 +1109,7 @@ func BenchmarkRandomRead(b *testing.B) { for i := 0; i < b.N; i++ { itr := tbl.newIterator(false) no := r.Intn(n) - k := y.KeyWithTs([]byte(fmt.Sprintf("%016x", no)), 0) + k := []byte(fmt.Sprintf("%016x", no)) v := []byte(fmt.Sprintf("%d", no)) itr.Seek(k) if !itr.Valid() { diff --git a/transaction.go b/transaction.go index 679911589..d5c8a3d6d 100644 --- a/transaction.go +++ b/transaction.go @@ -173,13 +173,18 @@ func (pi *pendingWritesIterator) Next() { pi.nextIdx++ } +func (pi *pendingWritesIterator) NextVersion() bool { + // We do not support adding multiple versions in a transaction. + return false +} + func (pi *pendingWritesIterator) Rewind() { pi.nextIdx = 0 } -func (pi *pendingWritesIterator) Seek(key y.Key) { +func (pi *pendingWritesIterator) Seek(key []byte) { pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool { - cmp := pi.entries[idx].Key.Compare(key) + cmp := bytes.Compare(pi.entries[idx].Key.UserKey, key) if !pi.reversed { return cmp >= 0 } diff --git a/transaction_test.go b/transaction_test.go index a025b6395..af019cf34 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -171,12 +171,7 @@ func TestTxnVersions(t *testing.T) { } checkAllVersions := func(itr *Iterator, i int) { - var version uint64 - if itr.opt.Reverse { - version = 1 - } else { - version = uint64(i) - } + version := uint64(i) count := 0 for itr.Rewind(); itr.Valid(); itr.Next() { @@ -189,12 +184,7 @@ func TestTxnVersions(t *testing.T) { exp := fmt.Sprintf("valversion=%d", version) require.Equal(t, exp, string(val), "v=%d", version) count++ - - if itr.opt.Reverse { - version++ - } else { - version-- - } + version-- } require.Equal(t, i, count, "i=%d", i) // Should loop as many times as i. } diff --git a/writer_ingest.go b/writer_ingest.go index 0aa539444..361d403bc 100644 --- a/writer_ingest.go +++ b/writer_ingest.go @@ -73,7 +73,7 @@ func (w *writeWorker) prepareIngestTask(task *ingestTask) (ts uint64, wg *sync.W y.Assert(mTbls.tables[0] != nil) it := mTbls.getMutable().NewIterator(false) for _, t := range task.tbls { - it.Seek(t.Smallest()) + it.Seek(t.Smallest().UserKey) if it.Valid() && it.Key().Compare(t.Biggest()) <= 0 { wg = w.flushMemTable() break @@ -172,7 +172,7 @@ func (w *writeWorker) overlapWithFlushingMemTables(kr keyRange) bool { imms := tbls.tables[:atomic.LoadUint32(&tbls.length)] for _, mt := range imms { it := mt.NewIterator(false) - it.Seek(kr.left) + it.Seek(kr.left.UserKey) if !it.Valid() || it.Key().Compare(kr.right) <= 0 { return true } @@ -204,7 +204,7 @@ func (w *writeWorker) checkRangeInLevel(kr keyRange, level int) (overlappingTabl for i := left; i < right; i++ { it := handler.tables[i].NewIterator(false) - it.Seek(kr.left) + it.Seek(kr.left.UserKey) if it.Valid() && it.Key().Compare(kr.right) <= 0 { overlap = true break diff --git a/y/iterator.go b/y/iterator.go index 244864b2c..9e4adb831 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -16,7 +16,9 @@ package y -import "encoding/binary" +import ( + "encoding/binary" +) // ValueStruct represents the value info that can be associated with a key, but also the internal // Meta field. @@ -76,11 +78,37 @@ func (v *ValueStruct) EncodeTo(buf []byte) []byte { // Iterator is an interface for a basic iterator. type Iterator interface { + // Next returns the next entry with different key on the latest version. + // If old version is needed, call NextVersion. Next() + // NextVersion set the current entry to an older version. + // The iterator must be valid to call this method. + // It returns true if there is an older version, returns false if there is no older version. + // The iterator is still valid and on the same key. + NextVersion() bool Rewind() - Seek(key Key) + Seek(key []byte) Key() Key Value() ValueStruct FillValue(vs *ValueStruct) Valid() bool } + +// SeekToVersion seeks a valid Iterator to the version that <= the given version. +func SeekToVersion(it Iterator, version uint64) bool { + if version >= it.Key().Version { + return true + } + for it.NextVersion() { + if version >= it.Key().Version { + return true + } + } + return false +} + +func NextAllVersion(it Iterator) { + if !it.NextVersion() { + it.Next() + } +}