Skip to content

Commit

Permalink
feat: add load wal method
Browse files Browse the repository at this point in the history
  • Loading branch information
sjcsjc123 committed Aug 20, 2023
1 parent a72b7f9 commit 8ed2029
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 15 deletions.
11 changes: 7 additions & 4 deletions db/column/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ func NewColumn(option config.ColumnOptions) (Column, error) {
}, nil
}

// column is a column family, it contains a wal and a map of column family
// the map of column family is a map of column family name and column family
// the wal is a global wal of all column family
type column struct {
mux sync.RWMutex
wal *wal.Wal
columnFamily map[string]*memory.Db
option config.ColumnOptions
mux sync.RWMutex // protect column family
wal *wal.Wal // wal of all column family
columnFamily map[string]*memory.Db // column family map
option config.ColumnOptions // column family options
}

func (c *column) CreateColumnFamily(name string) error {
Expand Down
80 changes: 73 additions & 7 deletions db/memory/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@ import (
"github.com/ByteStorage/FlyDB/config"
"github.com/ByteStorage/FlyDB/db/engine"
"github.com/ByteStorage/FlyDB/lib/wal"
"io"
"log"
"os"
"sync"
)

const (
// Record types
putType = byte(1)
deleteType = byte(2)
)

type Db struct {
option config.DbMemoryOptions
db *engine.DB
Expand All @@ -25,13 +33,16 @@ type Db struct {
func NewDB(option config.DbMemoryOptions) (*Db, error) {
// create a new memTable
mem := NewMemTable()

// dir path has been changed to dir path + column name
option.Option.DirPath = option.Option.DirPath + "/" + option.ColumnName
db, err := engine.NewDB(option.Option)
if err != nil {
return nil, err
}

w := option.Wal

// if wal is nil, create a new wal
// if wal is not nil, the wal was created by column family
if option.Wal == nil {
Expand All @@ -47,6 +58,7 @@ func NewDB(option config.DbMemoryOptions) (*Db, error) {
}
}

// initialize db
d := &Db{
mem: mem,
db: db,
Expand All @@ -58,17 +70,23 @@ func NewDB(option config.DbMemoryOptions) (*Db, error) {
wal: w,
pool: &sync.Pool{New: func() interface{} { return make([]byte, 0, 1024) }},
}

// when loading, the system will execute the every record in wal
d.load()
// async write to db
go d.async()
// async save wal
go d.wal.AsyncSave()
// async handler error message
go d.handlerErrMsg()
return d, nil
}

func (d *Db) handlerErrMsg() {
log := d.option.Option.DirPath + "/error.log"
msgLog := d.option.Option.DirPath + "/error.log"
for msg := range d.errMsgCh {
// write to log
_ = os.WriteFile(log, []byte(msg), 0666)
_ = os.WriteFile(msgLog, []byte(msg), 0666)
}
}

Expand Down Expand Up @@ -111,7 +129,7 @@ func (d *Db) Put(key []byte, value []byte) error {
// if active memTable size > define size, change to immutable memTable
if d.activeSize+keyLen+valueLen > d.option.MemSize {
// add to immutable memTable list
d.AddOldMemTable(d.mem)
d.addOldMemTable(d.mem)
// create new active memTable
d.mem = NewMemTable()
d.activeSize = 0
Expand Down Expand Up @@ -161,16 +179,27 @@ func (d *Db) Close() error {
return d.db.Close()
}

func (d *Db) AddOldMemTable(oldList *MemTable) {
func (d *Db) addOldMemTable(oldList *MemTable) {
d.oldListChan <- oldList
}

func (d *Db) async() {
for oldList := range d.oldListChan {
for key, value := range oldList.table {
err := d.db.Put([]byte(key), value)
if err != nil {
// TODO handle error: either log it, retry, or whatever makes sense for your application
// Write to db, try 3 times
ok := false
for i := 0; i < 3; i++ {
err := d.db.Put([]byte(key), value)
if err == nil {
ok = true
break
}
}
if !ok {
err := d.wal.Delete([]byte(key))
if err != nil {
d.errMsgCh <- "write to wal error when delete the key: " + string(key) + " error: " + err.Error()
}
}
d.totalSize -= int64(len(key) + len(value))
}
Expand All @@ -180,3 +209,40 @@ func (d *Db) async() {
func (d *Db) Clean() {
d.db.Clean()
}

func (d *Db) load() {
// Initialize reading from the start of the WAL.
d.wal.InitReading()

for {
record, err := d.wal.ReadNext()
if err == io.EOF {
break
}
if err != nil {
// Handle the error: log it, panic, return, etc.
log.Printf("Error reading from WAL: %v", err)
return
}

switch record.Type {
case putType:
// Assuming Db has a Put method
err := d.Put(record.Key, record.Value)
if err != nil {
// Handle the error: log it, panic, return, etc.
log.Printf("Error applying PUT from WAL: %v", err)
}
case deleteType:
// Assuming Db has a Delete method
err := d.Delete(record.Key)
if err != nil {
// Handle the error: log it, panic, return, etc.
log.Printf("Error applying DELETE from WAL: %v", err)
}
default:
// Handle unknown type.
log.Printf("Unknown record type in WAL: %v", record.Type)
}
}
}
67 changes: 63 additions & 4 deletions lib/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"hash/crc32"
"io"
"os"
"time"

Expand All @@ -21,10 +22,11 @@ const (

// Wal is a write-ahead log.
type Wal struct {
m *fileio.MMapIO // MMapIOManager
logNum uint32 // Log number
saveTime int64 // Save time
dirPath string
m *fileio.MMapIO // MMapIOManager
logNum uint32 // Log number
saveTime int64 // Save time
dirPath string // Dir path
readOffset int64 // Read offset
}

// NewWal creates a new WAL.
Expand Down Expand Up @@ -109,6 +111,63 @@ func (w *Wal) Delete(key []byte) error {
return w.writeRecord(deleteType, key, nil)
}

// Record is a structure that holds information about a record from the WAL.
type Record struct {
Type byte
Key []byte
Value []byte
}

// InitReading Initializes the WAL reading position to the start of the file.
func (w *Wal) InitReading() {
w.readOffset = 0
}

// ReadNext reads the next operation from the WAL.
func (w *Wal) ReadNext() (*Record, error) {
buffer := make([]byte, 4+2+1+4) // Buffer size to read headers
_, err := w.m.Read(buffer, w.readOffset)
if err == io.EOF {
return nil, io.EOF
}
if err != nil {
return nil, err
}

// Move readOffset
w.readOffset += int64(len(buffer))

// Verify CRC
expectedCRC := binary.LittleEndian.Uint32(buffer)
if crc32.ChecksumIEEE(buffer[4:]) != expectedCRC {
return nil, errors.New("corrupted record found")
}

// Get record size and type
size := binary.LittleEndian.Uint16(buffer[4:])
recordType := buffer[4+2]

// Read the payload
payload := make([]byte, size-4) // Subtract 4 for log number
_, err = w.m.Read(payload, w.readOffset)
if err != nil {
return nil, err
}

// Move readOffset again
w.readOffset += int64(len(payload))

// Parse based on record type
switch recordType {
case putType:
return &Record{Type: putType, Key: payload[:len(payload)-len(buffer)], Value: payload[len(payload)-len(buffer):]}, nil
case deleteType:
return &Record{Type: deleteType, Key: payload, Value: nil}, nil
default:
return nil, errors.New("unknown record type")
}
}

// Save flushes the WAL to disk.
func (w *Wal) Save() error {
return w.m.Sync()
Expand Down

0 comments on commit 8ed2029

Please sign in to comment.