Skip to content

Commit

Permalink
Merge branch 'main' into fix_skiplist_search_valts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenat9 authored Aug 22, 2022
2 parents 9634bda + 13859a7 commit b839803
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 41 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.vscode
work_test
testdata
testdata
.idea
17 changes: 4 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (db *DB) Get(key []byte) (*utils.Entry, error) {
if len(key) == 0 {
return nil, utils.ErrEmptyKey
}

originKey := key
var (
entry *utils.Entry
err error
Expand All @@ -162,24 +164,13 @@ func (db *DB) Get(key []byte) (*utils.Entry, error) {
entry.Value = utils.SafeCopy(nil, result)
}

if isDeletedOrExpired(entry) {
if lsm.IsDeletedOrExpired(entry) {
return nil, utils.ErrKeyNotFound
}
entry.Key = originKey
return entry, nil
}

// 判断是否过期 是可删除
func isDeletedOrExpired(e *utils.Entry) bool {
if e.Value == nil {
return true
}
if e.ExpiresAt == 0 {
return false
}

return e.ExpiresAt <= uint64(time.Now().Unix())
}

func (db *DB) Info() *Stats {
// 读取stats结构,打包数据并返回
return db.stats
Expand Down
14 changes: 8 additions & 6 deletions lsm/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (lm *levelManager) subcompact(it utils.Iterator, kr keyRange, cd compactDef
for ; it.Valid(); it.Next() {
key := it.Item().Entry().Key
//version := utils.ParseTs(key)
isExpired := isDeletedOrExpired(0, it.Item().Entry().ExpiresAt)
isExpired := IsDeletedOrExpired(it.Item().Entry())
if !utils.SameKey(key, lastKey) {
// 如果迭代器返回的key大于当前key的范围就不用执行了
if len(kr.right) > 0 && utils.CompareKeys(key, kr.right) >= 0 {
Expand Down Expand Up @@ -953,11 +953,15 @@ func (lm *levelManager) checkOverlap(tables []*table, lev int) bool {
}

// 判断是否过期 是可删除
func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
if expiresAt == 0 {
func IsDeletedOrExpired(e *utils.Entry) bool {
if e.Value == nil {
return true
}
if e.ExpiresAt == 0 {
return false
}
return expiresAt <= uint64(time.Now().Unix())

return e.ExpiresAt <= uint64(time.Now().Unix())
}

// compactStatus
Expand Down Expand Up @@ -1117,7 +1121,6 @@ func (r keyRange) equals(dst keyRange) bool {
}

func (r *keyRange) extend(kr keyRange) {
// TODO(ibrahim): Is this needed?
if kr.isEmpty() {
return
}
Expand All @@ -1140,7 +1143,6 @@ func (r keyRange) overlapsWith(dst keyRange) bool {
if r.isEmpty() {
return true
}
// TODO(ibrahim): Do you need this?
// Empty dst doesn't overlap with anything.
if dst.isEmpty() {
return false
Expand Down
12 changes: 8 additions & 4 deletions lsm/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,14 @@ func (lh *levelHandler) searchLNSST(key []byte) (*utils.Entry, error) {
return nil, utils.ErrKeyNotFound
}
func (lh *levelHandler) getTable(key []byte) *table {
for i := len(lh.tables) - 1; i >= 0; i-- {
if bytes.Compare(key, lh.tables[i].ss.MinKey()) > -1 &&
bytes.Compare(key, lh.tables[i].ss.MaxKey()) < 1 {
return lh.tables[i]
if len(lh.tables) > 0 && (bytes.Compare(key, lh.tables[0].ss.MinKey()) < 0 || bytes.Compare(key, lh.tables[len(lh.tables)-1].ss.MaxKey()) > 0) {
return nil
} else {
for i := len(lh.tables) - 1; i >= 0; i-- {
if bytes.Compare(key, lh.tables[i].ss.MinKey()) > -1 &&
bytes.Compare(key, lh.tables[i].ss.MaxKey()) < 1 {
return lh.tables[i]
}
}
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions lsm/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ func (it *tableIterator) Seek(key []byte) {
var ko pb.BlockOffset
idx := sort.Search(len(it.t.ss.Indexs().GetOffsets()), func(idx int) bool {
utils.CondPanic(!it.t.offsets(&ko, idx), fmt.Errorf("tableutils.Seek idx < 0 || idx > len(index.GetOffsets()"))
if idx == len(it.t.ss.Indexs().GetOffsets()) {
return true
}
return utils.CompareKeys(ko.GetKey(), key) > 0
})
if idx == 0 {
Expand Down Expand Up @@ -347,6 +350,9 @@ func (t *table) offsets(ko *pb.BlockOffset, i int) bool {
if i < 0 || i > len(index.GetOffsets()) {
return false
}
if i == len(index.GetOffsets()) {
return true
}
*ko = *index.GetOffsets()[i]
return true
}
Expand Down
10 changes: 7 additions & 3 deletions utils/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package utils

import (
"github.com/pkg/errors"
"log"
"sync/atomic"
"unsafe"

"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -76,6 +77,7 @@ func (s *Arena) allocate(sz uint32) uint32 {
newBuf := make([]byte, len(s.buf)+int(growBy))
AssertTrue(len(s.buf) == copy(newBuf, s.buf))
s.buf = newBuf
// fmt.Print(len(s.buf), " ")
}
return offset - sz
}
Expand Down Expand Up @@ -144,9 +146,11 @@ func (s *Arena) getVal(offset uint32, size uint32) (ret ValueStruct) {
// nil, then the zero offset is returned.
func (s *Arena) getNodeOffset(nd *node) uint32 {
if nd == nil {
return 0
return 0 //返回空指针
}

//implement me here!!!
//获取某个节点,在 arena 当中的偏移量
//unsafe.Pointer等价于void*,uintptr可以专门把void*的对于地址转化为数值型变量
return uint32(uintptr(unsafe.Pointer(nd)) - uintptr(unsafe.Pointer(&s.buf[0])))
}

Expand Down
14 changes: 11 additions & 3 deletions utils/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewCache(size int) *Cache {
slru: newSLRU(data, slruO, slruSz-slruO),
door: newFilter(size, 0.01), //布隆过滤器设置误差率为0.01
c: newCmSketch(int64(size)),
data: data, //共用同一个 slice 存储数据
data: data, //共用同一个 map 存储数据
}

}
Expand Down Expand Up @@ -97,7 +97,7 @@ func (c *Cache) set(key, value interface{}) bool {

// 这里进行 PK,必须在 bloomfilter 中出现过一次,才允许 PK
// 在 bf 中出现,说明访问频率 >= 2
if !c.door.Allow(uint32(keyHash)) {
if !c.door.Allow(uint32(eitem.key)) {
return true
}

Expand Down Expand Up @@ -133,17 +133,19 @@ func (c *Cache) get(key interface{}) (interface{}, bool) {

val, ok := c.data[keyHash]
if !ok {
c.door.Allow(uint32(keyHash))
c.c.Increment(keyHash)
return nil, false
}

item := val.Value.(*storeItem)

if item.conflict != conflictHash {
c.door.Allow(uint32(keyHash))
c.c.Increment(keyHash)
return nil, false
}

c.door.Allow(uint32(keyHash))
c.c.Increment(item.key)

v := item.value
Expand Down Expand Up @@ -229,3 +231,9 @@ func MemHash(data []byte) uint64 {
ss := (*stringStruct)(unsafe.Pointer(&data))
return uint64(memhash(ss.str, 0, uintptr(ss.len)))
}

func (c *Cache) String() string {
var s string
s += c.lru.String() + " | " + c.slru.String()
return s
}
7 changes: 5 additions & 2 deletions utils/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package cache

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCacheBasicCRUD(t *testing.T) {
Expand All @@ -12,17 +13,19 @@ func TestCacheBasicCRUD(t *testing.T) {
key := fmt.Sprintf("key%d", i)
val := fmt.Sprintf("val%d", i)
cache.Set(key, val)
fmt.Printf("set %s: %s\n", key, cache)
}

for i := 0; i < 1000; i++ {
key := fmt.Sprintf("key%d", i)
val := fmt.Sprintf("val%d", i)
res, ok := cache.Get(key)
if ok {
fmt.Printf("get %s: %s\n", key, cache)
assert.Equal(t, val, res)
continue
}
assert.Equal(t, res, nil)

}
fmt.Printf("at last: %s\n", cache)
}
13 changes: 12 additions & 1 deletion utils/cache/lru.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cache

import "container/list"
import (
"container/list"
"fmt"
)

type windowLRU struct {
data map[uint64]*list.Element
Expand Down Expand Up @@ -47,3 +50,11 @@ func (lru *windowLRU) add(newitem storeItem) (eitem storeItem, evicted bool) {
func (lru *windowLRU) get(v *list.Element) {
lru.list.MoveToFront(v)
}

func (lru *windowLRU) String() string {
var s string
for e := lru.list.Front(); e != nil; e = e.Next() {
s += fmt.Sprintf("%v,", e.Value.(*storeItem).value)
}
return s
}
19 changes: 17 additions & 2 deletions utils/cache/s2lru.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cache

import "container/list"
import (
"container/list"
"fmt"
)

type segmentedLRU struct {
data map[uint64]*list.Element
Expand All @@ -9,7 +12,7 @@ type segmentedLRU struct {
}

const (
STAGE_ONE = iota
STAGE_ONE = iota + 1
STAGE_TWO
)

Expand Down Expand Up @@ -96,3 +99,15 @@ func (slru *segmentedLRU) victim() *storeItem {
v := slru.stageOne.Back()
return v.Value.(*storeItem)
}

func (slru *segmentedLRU) String() string {
var s string
for e := slru.stageTwo.Front(); e != nil; e = e.Next() {
s += fmt.Sprintf("%v,", e.Value.(*storeItem).value)
}
s += fmt.Sprintf(" | ")
for e := slru.stageOne.Front(); e != nil; e = e.Next() {
s += fmt.Sprintf("%v,", e.Value.(*storeItem).value)
}
return s
}
1 change: 1 addition & 0 deletions utils/skiplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Key differences:
package utils

import (
"fmt"
"log"
"math"
"strings"
Expand Down
2 changes: 1 addition & 1 deletion utils/skiplist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestSkipListBasicCRUD(t *testing.T) {
assert.Nil(t, list.Search([]byte(RandString(10))).Value)

//Update a entry
entry2_new := NewEntry([]byte(RandString(10)), []byte("Val1+1"))
entry2_new := NewEntry(entry1.Key, []byte("Val1+1"))
list.Add(entry2_new)
assert.Equal(t, entry2_new.Value, list.Search(entry2_new.Key).Value)
}
Expand Down
3 changes: 0 additions & 3 deletions vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,6 @@ func (vlog *valueLog) close() error {
for id, f := range vlog.filesMap {
f.Lock.Lock() // We won’t release the lock.
maxFid := vlog.maxFid
// TODO(ibrahim) - Do we need the following truncations on non-windows
// platforms? We expand the file only on windows and the vlog.woffset()
// should point to end of file on all other platforms.
if id == maxFid {
// truncate writable log file to correct offset.
if truncErr := f.Truncate(int64(vlog.woffset())); truncErr != nil && err == nil {
Expand Down
4 changes: 2 additions & 2 deletions vlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func TestValueGC(t *testing.T) {
require.NoError(t, err)
val := getItemValue(t, item)
require.NotNil(t, val)
require.True(t, bytes.Equal(item.Key, e.Key), "key not equal: e:%s, v:%s", e.Key, e.Key)
require.True(t, bytes.Equal(item.Value, e.Value), "value not equal: e:%s, v:%s", e.Value, e.Value)
require.True(t, bytes.Equal(item.Key, e.Key), "key not equal: e:%s, v:%s", e.Key, item.Key)
require.True(t, bytes.Equal(item.Value, e.Value), "value not equal: e:%s, v:%s", e.Value, item.Key)
}
}

Expand Down

0 comments on commit b839803

Please sign in to comment.