Skip to content

Commit

Permalink
Merge pull request hardcore-os#6 from hardcore-os/develop_for_neil
Browse files Browse the repository at this point in the history
fea: skiplist concurrent ops
  • Loading branch information
chenat9 authored and sunquan.logic committed Aug 27, 2021
2 parents 2177aed + 2d780c0 commit c1658f9
Show file tree
Hide file tree
Showing 26 changed files with 565 additions and 136 deletions.
8 changes: 4 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ type (
}
)

func Open(options *Options) *DB {
db := &DB{opt: options}
func Open(opt *Options) *DB {
db := &DB{opt: opt}
// 初始化LSM结构
db.lsm = lsm.NewLSM(&lsm.Options{})
db.lsm = lsm.NewLSM(&lsm.Options{WorkDir: opt.WorkDir, MemTableSize: opt.MemTableSize})
// 初始化vlog结构
db.vlog = vlog.NewVLog(&vlog.Options{})
// 初始化统计信息
db.stats = newStats(options)
db.stats = newStats(opt)
// 启动 sstable 的合并压缩过程
go db.lsm.StartMerge()
// 启动 vlog gc 过程
Expand Down
6 changes: 4 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package corekv

import (
"github.com/hardcore-os/corekv/iterator"
"github.com/hardcore-os/corekv/utils/codec"
"testing"
"time"

"github.com/hardcore-os/corekv/iterator"
"github.com/hardcore-os/corekv/utils/codec"
)

func TestAPI(t *testing.T) {
Expand All @@ -28,6 +29,7 @@ func TestAPI(t *testing.T) {
IsAsc: false,
})
defer func() { _ = iter.Close() }()
defer func() { _ = iter.Close() }()
for iter.Rewind(); iter.Valid(); iter.Next() {
it := iter.Item()
t.Logf("db.NewIterator key=%s, value=%s, expiresAt=%d", it.Entry().Key, it.Entry().Value, it.Entry().ExpiresAt)
Expand Down
2 changes: 2 additions & 0 deletions file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ type CoreFile interface {
Write(b []byte) (n int, err error)
Read(b []byte) (n int, err error)
Close() error
Truncature(n int64) error
ReName(name string) error
}
83 changes: 58 additions & 25 deletions file/manifest.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package file

import (
"bufio"
"encoding/csv"
"io"
"encoding/json"
"fmt"
"io/ioutil"

"github.com/hardcore-os/corekv/utils"
)

// Manifest 维护sst文件元信息的文件
type Manifest struct {
opt *Options
f CoreFile
tables [][]string // l0-l7 的sst file name
tables [][]*Cell // l0-l7 的sst file name
}

// WalFile
// Cell 是一行Manifest的封装
type Cell struct {
SSTName string
}

// Close
func (mf *Manifest) Close() error {
if err := mf.f.Close(); err != nil {
return err
Expand All @@ -22,35 +29,61 @@ func (mf *Manifest) Close() error {
}

// Tables 获取table的list
func (mf *Manifest) Tables() [][]string {
func (mf *Manifest) Tables() [][]*Cell {
return mf.tables
}

// OpenManifest
func OpenManifest(opt *Options) *Manifest {
mf := &Manifest{
f: OpenMockFile(opt),
tables: make([][]string, utils.MaxLevelNum),
tables: make([][]*Cell, utils.MaxLevelNum),
opt: opt,
}
reader := csv.NewReader(bufio.NewReader(mf.f))
level := 0
for {
if level > utils.MaxLevelNum {
break
}
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
panic(err)
}
if len(mf.tables[level]) == 0 {
mf.tables[level] = make([]string, len(line))
}
for j, tableName := range line {
mf.tables[level][j] = tableName
data, err := ioutil.ReadAll(mf.f)
utils.Panic(err)
// 如果是新创建的数据库则直接启动,不需要加载sst
if len(data) == 0 {
return mf
}
tables := make([][]string, 0)
utils.Panic(json.Unmarshal(data, &tables))
// TODO 如果文件损坏或者为空则根据之前的检查点来恢复较旧的manifest文件
for i, ts := range tables {
mf.tables[i] = make([]*Cell, 0)
for _, name := range ts {
mf.tables[i] = append(mf.tables[i], &Cell{SSTName: name})
}
level++
}
return mf
}

// AppendSST 存储level表到manifest的level中
func (mf *Manifest) AppendSST(levelNum int, cell *Cell) (err error) {
mf.tables[levelNum] = append(mf.tables[levelNum], cell)
// TODO 保留旧的MANIFEST文件作为检查点,当前直接截断
err = mf.f.Truncature(0)
if err != nil {
return err
}
res := make([][]string, len(mf.tables))
for i, cells := range mf.tables {
res[i] = make([]string, 0)
for _, cell := range cells {
res[i] = append(res[i], cell.SSTName)
}
}
data, err := json.Marshal(res)
if err != nil {
return err
}
// fmt.Println(string(data))
// panic(data)
// err = mf.f.Delete()
// if err != nil {
// return err
// }
ioutil.WriteFile(fmt.Sprintf("%s/%s", mf.opt.Dir, mf.opt.Name), data, 0666)
//_, err = mf.f.Write(data)
return err
}
18 changes: 16 additions & 2 deletions file/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

// MockFile
type MockFile struct {
f *os.File
f *os.File
opt *Options
}

// Close
Expand All @@ -27,6 +28,18 @@ func (lf *MockFile) Read(bytes []byte) (int, error) {
return lf.f.Read(bytes)
}

// Truncature 截断
func (lf *MockFile) Truncature(n int64) error {
return lf.f.Truncate(n)
}

// ReName 重命名
func (lf *MockFile) ReName(name string) error {
err := os.Rename(fmt.Sprintf("%s/%s", lf.opt.Dir, lf.opt.Name), fmt.Sprintf("%s/%s", lf.opt.Dir, name))
lf.opt.Name = name
return err
}

// Options
type Options struct {
Name string
Expand All @@ -37,7 +50,8 @@ type Options struct {
func OpenMockFile(opt *Options) *MockFile {
var err error
lf := &MockFile{}
lf.f, err = os.Open(fmt.Sprintf("%s/%s", opt.Dir, opt.Name))
lf.opt = opt
lf.f, err = os.OpenFile(fmt.Sprintf("%s/%s", opt.Dir, opt.Name), os.O_CREATE|os.O_RDWR, 0666)
utils.Panic(err)
return lf
}
63 changes: 63 additions & 0 deletions file/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ package file

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strings"

"github.com/hardcore-os/corekv/iterator"
"github.com/hardcore-os/corekv/utils"
)

// SSTable 文件的内存封装
type SSTable struct {
f *MockFile
maxKey []byte
minKey []byte
indexs []byte
fid string
}
Expand All @@ -30,12 +36,69 @@ func (ss *SSTable) Indexs() []byte {
} else {
dataStr, _ := idx.(string) // hello,0
ss.indexs = []byte(dataStr)
tmp := strings.Split(dataStr, ",")
ss.maxKey = []byte(tmp[len(tmp)-1])
ss.minKey = []byte(tmp[0])
}
}
return ss.indexs
}

// MaxKey 当前最大的key
func (ss *SSTable) MaxKey() []byte {
return ss.maxKey
}

// MinKey 当前最小的key
func (ss *SSTable) MinKey() []byte {
return ss.minKey
}

// FID 获取fid
func (ss *SSTable) FID() string {
return ss.fid
}

// SaveSkipListToSSTable 将跳表序列化到sst文件中
// TODO 设计sst文件格式,并重写此处flush逻辑
func (ss *SSTable) SaveSkipListToSSTable(sl *utils.SkipList) error {
iter := sl.NewIterator(&iterator.Options{})
indexs, datas, idx := make([]string, 0), make([]string, 0), 0
for iter.Rewind(); iter.Valid(); iter.Next() {
item := iter.Item().Entry()
indexs = append(indexs, string(item.Key))
indexs = append(indexs, fmt.Sprintf("%d", idx))
datas = append(datas, string(item.Value))
idx++
}
ssData := make(map[string]string, 0)
ssData["idx"] = strings.Join(indexs, ",")
ssData["data"] = strings.Join(datas, ",")
bData, err := json.Marshal(ssData)
if err != nil {
return err
}
if _, err := ss.f.Write(bData); err != nil {
return err
}
ss.indexs = []byte(ssData["idx"])
return nil
}

// LoadData 加载数据块
func (ss *SSTable) LoadData() (blocks [][]byte, offsets []int) {
ss.f.f.Seek(0, io.SeekStart)
bv, err := ioutil.ReadAll(ss.f)
utils.Panic(err)
m := make(map[string]interface{}, 0)
json.Unmarshal(bv, &m)
if data, ok := m["data"]; !ok {
panic("sst data is nil")
} else {
// TODO 所有的数据都放在一个 block中
dd := data.(string)
blocks = append(blocks, []byte(dd))
offsets = append(offsets, 0)
}
return blocks, offsets
}
3 changes: 1 addition & 2 deletions lsm/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "github.com/hardcore-os/corekv/utils"

type cache struct {
indexs *utils.CoreMap // key fid, value table
blocks *utils.CoreMap // key cacheID_blockOffset value block []byte
blocks *utils.CoreMap // key fid_blockOffset value block []byte
}
type blockBuffer struct {
b []byte
Expand All @@ -20,7 +20,6 @@ func newCache(opt *Options) *cache {
return &cache{indexs: utils.NewMap(), blocks: utils.NewMap()}
}


// TODO fid 使用字符串是不是会有性能损耗
func (c *cache) addIndex(fid string, t *table) {
c.indexs.Set(fid, t)
Expand Down
18 changes: 7 additions & 11 deletions lsm/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package lsm

import (
"github.com/hardcore-os/corekv/iterator"
"github.com/hardcore-os/corekv/utils"
"github.com/hardcore-os/corekv/utils/codec"
)

Expand Down Expand Up @@ -47,29 +46,26 @@ func (iter *Iterator) Close() error {

// 内存表迭代器
type memIterator struct {
it iterator.Item
iters []*Iterator
sl *utils.SkipList
innerIter iterator.Iterator
}

func (m *memTable) NewIterator(opt *iterator.Options) iterator.Iterator {
return &memIterator{sl: m.sl}
return &memIterator{innerIter: m.sl.NewSkipListIterator()}
}
func (iter *memIterator) Next() {
iter.it = nil
iter.innerIter.Next()
}
func (iter *memIterator) Valid() bool {
return iter.it != nil
return iter.innerIter.Valid()
}
func (iter *memIterator) Rewind() {
entry := iter.sl.Search([]byte("hello"))
iter.it = &Item{e: entry}
iter.innerIter.Rewind()
}
func (iter *memIterator) Item() iterator.Item {
return iter.it
return iter.innerIter.Item()
}
func (iter *memIterator) Close() error {
return nil
return iter.innerIter.Close()
}

// levelManager上的迭代器
Expand Down
Loading

0 comments on commit c1658f9

Please sign in to comment.