Skip to content

Commit

Permalink
Support for recovery measure/buffer using wal (apache#313)
Browse files Browse the repository at this point in the history
* Update wal seriesID datatype to bytes
* Integrate wal into measure/buffer for memory recovery
* update bytes/string convert

---------

Co-authored-by: 吴晟 Wu Sheng <[email protected]>
  • Loading branch information
hailin0 and wu-sheng authored Aug 9, 2023
1 parent 4d105d0 commit 4655b73
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 184 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Release Notes.
- Implement Write-ahead Logging
- Document the clustering.
- Support multiple roles for banyand server.
- Support for recovery buffer using wal.

### Bugs

Expand Down
26 changes: 0 additions & 26 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,6 @@ func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}

// GlobalSeriesID identities a series in a shard.
type GlobalSeriesID struct {
Name string
SeriesID SeriesID
}

// Marshal encodes global series id to bytes.
func (s GlobalSeriesID) Marshal() []byte {
seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID))
nameBytes := []byte(s.Name)
return append(seriesIDBytes, nameBytes...)
}

// Volume returns the estimated bytes volume of global series id.
func (s GlobalSeriesID) Volume() int {
return 8 + len(s.Name)
}

// ParseGlobalSeriesID parses global series id from bytes.
func ParseGlobalSeriesID(b []byte) GlobalSeriesID {
return GlobalSeriesID{
SeriesID: SeriesID(convert.BytesToUint64(b[:8])),
Name: string(b[8:]),
}
}

// positionKey is a context key to store the module position.
var positionKey = contextPositionKey{}

Expand Down
24 changes: 12 additions & 12 deletions banyand/measure/tstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
const (
defaultNumBufferShards = 2
defaultWriteConcurrency = 1000
defaultWriteWal = true
plain = "tst"
encoded = "encoded"
)
Expand All @@ -56,6 +57,7 @@ type tsTable struct {
buffer *tsdb.Buffer
closeBufferTimer *time.Timer
position common.Position
path string
bufferSize int64
encoderBufferSize int64
lock sync.Mutex
Expand All @@ -72,13 +74,13 @@ func (t *tsTable) openBuffer() (err error) {
return nil
}
bufferSize := int(t.encoderBufferSize / defaultNumBufferShards)
if t.encoderBuffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
defaultWriteConcurrency, defaultNumBufferShards, t.encoderFlush); err != nil {
if t.encoderBuffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize,
defaultWriteConcurrency, defaultNumBufferShards, t.encoderFlush, defaultWriteWal, &t.path); err != nil {
return fmt.Errorf("failed to create encoder buffer: %w", err)
}
bufferSize = int(t.bufferSize / defaultNumBufferShards)
if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
defaultWriteConcurrency, defaultNumBufferShards, t.flush); err != nil {
if t.buffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize,
defaultWriteConcurrency, defaultNumBufferShards, t.flush, defaultWriteWal, &t.path); err != nil {
return fmt.Errorf("failed to create buffer: %w", err)
}
end := t.EndTime()
Expand Down Expand Up @@ -153,22 +155,19 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) {

func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
if t.encoderBuffer != nil {
t.writeToBuffer(key, val, ts)
return nil
return t.writeToBuffer(key, val, ts)
}
if err := t.openBuffer(); err != nil {
return err
}
t.writeToBuffer(key, val, ts)
return nil
return t.writeToBuffer(key, val, ts)
}

func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) {
func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) error {
if t.toEncode(key) {
t.encoderBuffer.Write(key, val, ts)
} else {
t.buffer.Write(key, val, ts)
return t.encoderBuffer.Write(key, val, ts)
}
return t.buffer.Write(key, val, ts)
}

func (t *tsTable) encoderFlush(shardIndex int, skl *skl.Skiplist) error {
Expand Down Expand Up @@ -228,6 +227,7 @@ func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker tsdb.BlockExpiryTracker
encoderSST: encoderSST,
sst: sst,
BlockExpiryTracker: &blockExpiryTracker,
path: root,
}
if table.IsActive() {
if err := table.openBuffer(); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions banyand/stream/tstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,13 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) {

func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
if t.buffer != nil {
t.buffer.Write(key, val, ts)
return nil
return t.buffer.Write(key, val, ts)
}

if err := t.openBuffer(); err != nil {
return err
}
t.buffer.Write(key, val, ts)
return nil
return t.buffer.Write(key, val, ts)
}

func (t *tsTable) flush(shardIndex int, skl *skl.Skiplist) error {
Expand Down
Loading

0 comments on commit 4655b73

Please sign in to comment.