Skip to content

Commit

Permalink
feat: fix bug with channel block
Browse files Browse the repository at this point in the history
  • Loading branch information
sjcsjc123 committed Aug 21, 2023
1 parent 447c743 commit 4e7b812
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 11 deletions.
65 changes: 59 additions & 6 deletions db/memory/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Db struct {
oldListChan chan *MemTable
totalSize int64
activeSize int64
pool *sync.Pool
walTask chan func()
errMsgCh chan string
mux sync.RWMutex
walDataMtList []*MemTable
Expand Down Expand Up @@ -72,10 +72,11 @@ func NewDB(option config.DbMemoryOptions) (*Db, error) {
activeSize: 0,
totalSize: 0,
wal: w,
pool: &sync.Pool{New: func() interface{} { return make([]byte, 0, 1024) }},
walTask: make(chan func(), 10000000),
mux: sync.RWMutex{},
walDataMtList: make([]*MemTable, 0),
walDataMtListChan: make(chan *MemTable, 1000000),
errMsgCh: make(chan string, 1000000),
}

// when loading, the system will execute the every record in wal
Expand All @@ -86,6 +87,8 @@ func NewDB(option config.DbMemoryOptions) (*Db, error) {
go d.wal.AsyncSave()
// async handler error message
go d.handlerErrMsg()
// async worker
go d.work()
return d, nil
}

Expand All @@ -106,7 +109,12 @@ func (d *Db) Put(key []byte, value []byte) error {
keyLen := int64(len(key))
valueLen := int64(len(value))

d.pool.Put(func() {
//err := d.wal.Put(key, value)
//if err != nil {
// return err
//}

d.walTask <- func() {
// Write to wal, try 3 times
ok := false
for i := 0; i < 3; i++ {
Expand All @@ -122,7 +130,28 @@ func (d *Db) Put(key []byte, value []byte) error {
d.errMsgCh <- "write to wal error when delete the key: " + string(key) + " error: " + err.Error()
}
}
})
}

//err := d.pool.Submit(func() {
// // Write to wal, try 3 times
// ok := false
// for i := 0; i < 3; i++ {
// err := d.wal.Put(key, value)
// if err == nil {
// ok = true
// break
// }
// }
// if !ok {
// err := d.wal.Delete(key)
// if err != nil {
// d.errMsgCh <- "write to wal error when delete the key: " + string(key) + " error: " + err.Error()
// }
// }
//})
//if err != nil {
// return err
//}

// if sync write, save wal
if d.option.Option.SyncWrite {
Expand Down Expand Up @@ -187,7 +216,7 @@ func (d *Db) Delete(key []byte) error {
d.mux.Lock()
defer d.mux.Unlock()

d.pool.Put(func() {
d.walTask <- func() {
// Write to wal, try 3 times
ok := false
for i := 0; i < 3; i++ {
Expand All @@ -203,7 +232,25 @@ func (d *Db) Delete(key []byte) error {
d.errMsgCh <- "write to wal error when delete the key: " + string(key) + " error: " + err.Error()
}
}
})
}

//err := d.pool.Submit(func() {
// // Write to wal, try 3 times
// ok := false
// for i := 0; i < 3; i++ {
// err := d.wal.Delete(key)
// if err == nil {
// ok = true
// break
// }
// }
// if !ok {
// err := d.wal.Delete(key)
// if err != nil {
// d.errMsgCh <- "write to wal error when delete the key: " + string(key) + " error: " + err.Error()
// }
// }
//})
// get from active memTable
get, err := d.mem.Get(string(key))
if err == nil {
Expand Down Expand Up @@ -238,6 +285,12 @@ func (d *Db) Close() error {
return d.db.Close()
}

func (d *Db) work() {
for task := range d.walTask {
task()
}
}

func (d *Db) addOldMemTable(oldList *MemTable) {
d.oldListChan <- oldList
}
Expand Down
8 changes: 3 additions & 5 deletions db/memory/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (
)

func TestPutAndGet(t *testing.T) {
opts := config.DefaultOptions
dir, _ := os.MkdirTemp("", "flydb-benchmark")
opts.DirPath = dir
opts.DataFileSize = 64 * 1024 * 1024
err := os.Mkdir("./flydb-benchmark", os.ModePerm)
memOpt := config.DefaultDbMemoryOptions
memOpt.LogNum = 100
memOpt.FileSize = 100 * 1024 * 1024
memOpt.FileSize = 256 * 1024 * 1024
memOpt.TotalMemSize = 2 * 1024 * 1024 * 1024
memOpt.Option.DirPath = "./flydb-benchmark"

db, err := NewDB(memOpt)
defer db.Clean()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/kr/pretty v0.3.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/panjf2000/ants v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU=
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down

0 comments on commit 4e7b812

Please sign in to comment.