Skip to content

Commit

Permalink
Use incrementing 64-bit CAS counter
Browse files Browse the repository at this point in the history
We ended up implementing it with an atomic variable.

!badger!head is used to store a sufficiently large CAS value that,
after replaying the value log, it is accurately greater than or equal
to the previously used CAS values.
  • Loading branch information
Sam Hughes committed Jul 28, 2017
1 parent 835ef24 commit 6abc548
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 93 deletions.
2 changes: 1 addition & 1 deletion compact_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
})
for i, kv := range keyValues {
y.AssertTrue(len(kv) == 2)
err := b.Add([]byte(kv[0]), y.ValueStruct{[]byte(kv[1]), 'A', uint16(i), 0})
err := b.Add([]byte(kv[0]), y.ValueStruct{[]byte(kv[1]), 'A', 0, uint64(i)})
if t != nil {
require.NoError(t, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type KVItem struct {
meta byte
userMeta byte
val []byte
casCounter uint16
casCounter uint64
slice *y.Slice
next *KVItem
}
Expand All @@ -51,7 +51,7 @@ func (item *KVItem) Value() []byte {
}

// Counter returns the CAS counter associated with the value.
func (item *KVItem) Counter() uint16 {
func (item *KVItem) Counter() uint64 {
return item.casCounter
}

Expand Down
65 changes: 51 additions & 14 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/trace"
Expand Down Expand Up @@ -112,10 +113,9 @@ var DefaultOptions = Options{

func (opt *Options) estimateSize(entry *Entry) int {
if len(entry.Value) < opt.ValueThreshold {
// 4 is for cas + meta
return len(entry.Key) + len(entry.Value) + 4
return len(entry.Key) + len(entry.Value) + y.MetaSize + y.UserMetaSize + y.CasSize
}
return len(entry.Key) + 16 + 4
return len(entry.Key) + 16 + y.MetaSize + y.UserMetaSize + y.CasSize
}

// KV provides the various functions required to interact with Badger.
Expand All @@ -137,6 +137,10 @@ type KV struct {
vptr valuePointer
writeCh chan *request
flushChan chan flushTask // For flushing memtables.

// Incremented in the non-concurrently accessed write loop. But also accessed outside. So
// we use an atomic op.
lastUsedCasCounter uint64
}

var ErrInvalidDir error = errors.New("Invalid Dir, directory does not exist")
Expand Down Expand Up @@ -223,6 +227,11 @@ func NewKV(opt *Options) (out *KV, err error) {
return nil, errors.Wrap(err, "Retrieving head")
}
val := item.Value()
// lastUsedCasCounter will either be the value stored in !badger!head, or some subsequently
// written value log entry that we replay. (Subsequent value log entries might be _less_
// than lastUsedCasCounter, if there was value log gc so we have to max() values while
// replaying.)
out.lastUsedCasCounter = item.casCounter

var vptr valuePointer
if len(val) > 0 {
Expand All @@ -238,6 +247,9 @@ func NewKV(opt *Options) (out *KV, err error) {
out.elog.Printf("First key=%s\n", e.Key)
}
first = false
if out.lastUsedCasCounter < e.casCounter {
out.lastUsedCasCounter = e.casCounter
}

if e.CASCounterCheck != 0 {
oldValue, err := out.get(e.Key)
Expand Down Expand Up @@ -584,6 +596,18 @@ func (s *KV) writeToLSM(b *request) error {
return nil
}

// lastCASCounter returns the last-used cas counter.
func (s *KV) lastCASCounter() uint64 {
return atomic.LoadUint64(&s.lastUsedCasCounter)
}

// newCASCounters generates a set of unique CAS counters -- the interval [x, x + howMany) where x
// is the return value.
func (s *KV) newCASCounters(howMany uint64) uint64 {
last := atomic.AddUint64(&s.lastUsedCasCounter, howMany)
return last - howMany + 1
}

// writeRequests is called serially by only one goroutine.
func (s *KV) writeRequests(reqs []*request) error {
if len(reqs) == 0 {
Expand All @@ -599,12 +623,17 @@ func (s *KV) writeRequests(reqs []*request) error {

s.elog.Printf("writeRequests called. Writing to value log")

// CAS counter for all operations has to go onto value log. Otherwise, if it is just in memtable for
// a long time, and following CAS operations use that as a check, when replaying, we will think that
// these CAS operations should fail, when they are actually valid.
// CAS counter for all operations has to go onto value log. Otherwise, if it is just in
// memtable for a long time, and following CAS operations use that as a check, when
// replaying, we will think that these CAS operations should fail, when they are actually
// valid.

// There is code (in flushMemtable) whose correctness depends on us generating CAS Counter
// values _before_ we modify s.vptr here.
for _, req := range reqs {
for _, e := range req.Entries {
e.casCounter = newCASCounter()
counterBase := s.newCASCounters(uint64(len(req.Entries)))
for i, e := range req.Entries {
e.casCounter = counterBase + uint64(i)
}
}
err := s.vlog.write(reqs)
Expand Down Expand Up @@ -816,7 +845,7 @@ func EntriesSet(s []*Entry, key, val []byte) []*Entry {
// CompareAndSet sets the given value, ensuring that the no other Set operation has happened,
// since last read. If the key has a different casCounter, this would not update the key
// and return an error.
func (s *KV) CompareAndSet(key []byte, val []byte, casCounter uint16) error {
func (s *KV) CompareAndSet(key []byte, val []byte, casCounter uint64) error {
e := &Entry{
Key: key,
Value: val,
Expand Down Expand Up @@ -847,7 +876,7 @@ func (s *KV) compareAsync(e *Entry, f func(error)) {
// CompareAndSetAsync is the asynchronous version of CompareAndSet. It accepts a callback function
// which is called when the CompareAndSet completes. Any error encountered during execution is
// passed as an argument to the callback function.
func (s *KV) CompareAndSetAsync(key []byte, val []byte, casCounter uint16, f func(error)) {
func (s *KV) CompareAndSetAsync(key []byte, val []byte, casCounter uint64, f func(error)) {
e := &Entry{
Key: key,
Value: val,
Expand Down Expand Up @@ -889,7 +918,7 @@ func EntriesDelete(s []*Entry, key []byte) []*Entry {

// CompareAndDelete deletes a key ensuring that it has not been changed since last read.
// If existing key has different casCounter, this would not delete the key and return an error.
func (s *KV) CompareAndDelete(key []byte, casCounter uint16) error {
func (s *KV) CompareAndDelete(key []byte, casCounter uint64) error {
e := &Entry{
Key: key,
Meta: BitDelete,
Expand All @@ -904,7 +933,7 @@ func (s *KV) CompareAndDelete(key []byte, casCounter uint16) error {
// CompareAndDeleteAsync is the asynchronous version of CompareAndDelete. It accepts a callback
// function which is called when the CompareAndDelete completes. Any error encountered during
// execution is passed as an argument to the callback function.
func (s *KV) CompareAndDeleteAsync(key []byte, casCounter uint16, f func(error)) {
func (s *KV) CompareAndDeleteAsync(key []byte, casCounter uint64, f func(error)) {
e := &Entry{
Key: key,
Meta: BitDelete,
Expand Down Expand Up @@ -983,10 +1012,18 @@ func (s *KV) flushMemtable(lc *y.LevelCloser) error {
if ft.vptr.Fid > 0 || ft.vptr.Offset > 0 || ft.vptr.Len > 0 {
s.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, 10)
s.Lock() // For vptr.
s.Lock() // For vptr and casAsOfVptr.
s.vptr.Encode(offset)
s.Unlock()
ft.mt.Put(head, y.ValueStruct{Value: offset}) // casCounter not needed.
// CAS counter is needed and is desirable -- it's the first value log entry
// we replay, so to speak, perhaps the only, and we use it to re-initialize
// the CAS counter.
//
// The write loop generates CAS counter values _before_ it sets vptr. It
// is crucial that we read the cas counter here _after_ reading vptr. That
// way, our value here is guaranteed to be >= the CASCounter values written
// before vptr (because they don't get replayed).
ft.mt.Put(head, y.ValueStruct{Value: offset, CASCounter: s.lastCASCounter()})
}
fileID, _ := s.lc.reserveFileIDs(1)
fd, err := y.OpenSyncedFile(table.NewFilename(fileID, s.opt.Dir), true)
Expand Down
21 changes: 6 additions & 15 deletions skl/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package skl

import (
"encoding/binary"
"sync/atomic"

"github.com/dgraph-io/badger/y"
Expand Down Expand Up @@ -50,16 +49,13 @@ func (s *Arena) Reset() {
// size of val. We could also store this size inside arena but the encoding and
// decoding will incur some overhead.
func (s *Arena) PutVal(v y.ValueStruct) uint32 {
l := uint32(len(v.Value)) + 4
l := uint32(v.EncodedSize())
n := atomic.AddUint32(&s.n, l)
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
m := n - l
s.buf[m] = v.Meta
s.buf[m+1] = v.UserMeta
binary.BigEndian.PutUint16(s.buf[m+2:m+4], v.CASCounter)
copy(s.buf[m+4:n], v.Value)
v.Encode(s.buf[m:])
return m
}

Expand All @@ -80,13 +76,8 @@ func (s *Arena) GetKey(offset uint32, size uint16) []byte {
}

// GetVal returns byte slice at offset. The given size should be just the value
// size and should NOT include the meta byte.
func (s *Arena) GetVal(offset uint32, size uint16) y.ValueStruct {
out := y.ValueStruct{
Value: s.buf[offset+4 : offset+4+uint32(size)],
Meta: s.buf[offset],
UserMeta: s.buf[offset+1],
CASCounter: binary.BigEndian.Uint16(s.buf[offset+2 : offset+4]),
}
return out
// size and should NOT include the meta bytes.
func (s *Arena) GetVal(offset uint32, size uint16) (ret y.ValueStruct) {
ret.DecodeEntireSlice(s.buf[offset : offset+uint32(y.ValueStructSerializedSize(size))])
return
}
20 changes: 10 additions & 10 deletions skl/skl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func TestBasic(t *testing.T) {

// Try inserting values.
// Somehow require.Nil doesn't work when checking for unsafe.Pointer(nil).
l.Put([]byte("key1"), y.ValueStruct{val1, 55, 60000, 0})
l.Put([]byte("key3"), y.ValueStruct{val3, 56, 60001, 0})
l.Put([]byte("key2"), y.ValueStruct{val2, 57, 60002, 0})
l.Put([]byte("key1"), y.ValueStruct{val1, 55, 0, 60000})
l.Put([]byte("key3"), y.ValueStruct{val3, 56, 0, 60001})
l.Put([]byte("key2"), y.ValueStruct{val2, 57, 0, 60002})

v := l.Get([]byte("key"))
require.True(t, v.Value == nil)
Expand All @@ -116,7 +116,7 @@ func TestBasic(t *testing.T) {
require.EqualValues(t, 56, v.Meta)
require.EqualValues(t, 60001, v.CASCounter)

l.Put([]byte("key2"), y.ValueStruct{val4, 12, 50000, 0})
l.Put([]byte("key2"), y.ValueStruct{val4, 12, 0, 50000})
v = l.Get([]byte("key2"))
require.True(t, v.Value != nil)
require.EqualValues(t, "00072", string(v.Value))
Expand All @@ -134,7 +134,7 @@ func TestConcurrentBasic(t *testing.T) {
go func(i int) {
defer wg.Done()
l.Put([]byte(fmt.Sprintf("%05d", i)),
y.ValueStruct{newValue(i), 0, uint16(i), 0})
y.ValueStruct{newValue(i), 0, 0, uint64(i)})
}(i)
}
wg.Wait()
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestOneKey(t *testing.T) {
wg.Add(1)
go func(i int) {
defer wg.Done()
l.Put(key, y.ValueStruct{newValue(i), 0, uint16(i), 0})
l.Put(key, y.ValueStruct{newValue(i), 0, 0, uint64(i)})
}(i)
}
// We expect that at least some write made it such that some read returns a value.
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestFindNear(t *testing.T) {
defer l.DecrRef()
for i := 0; i < 1000; i++ {
key := fmt.Sprintf("%05d", i*10+5)
l.Put([]byte(key), y.ValueStruct{newValue(i), 0, uint16(i), 0})
l.Put([]byte(key), y.ValueStruct{newValue(i), 0, 0, uint64(i)})
}

n, eq := l.findNear([]byte("00001"), false, false)
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestIteratorNext(t *testing.T) {
require.False(t, it.Valid())
for i := n - 1; i >= 0; i-- {
l.Put([]byte(fmt.Sprintf("%05d", i)),
y.ValueStruct{newValue(i), 0, uint16(i), 0})
y.ValueStruct{newValue(i), 0, 0, uint64(i)})
}
it.SeekToFirst()
for i := 0; i < n; i++ {
Expand All @@ -331,7 +331,7 @@ func TestIteratorPrev(t *testing.T) {
require.False(t, it.Valid())
for i := 0; i < n; i++ {
l.Put([]byte(fmt.Sprintf("%05d", i)),
y.ValueStruct{newValue(i), 0, uint16(i), 0})
y.ValueStruct{newValue(i), 0, 0, uint64(i)})
}
it.SeekToLast()
for i := n - 1; i >= 0; i-- {
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestIteratorSeek(t *testing.T) {
// 1000, 1010, 1020, ..., 1990.
for i := n - 1; i >= 0; i-- {
v := i*10 + 1000
l.Put([]byte(fmt.Sprintf("%05d", i*10+1000)), y.ValueStruct{newValue(v), 0, 555, 0})
l.Put([]byte(fmt.Sprintf("%05d", i*10+1000)), y.ValueStruct{newValue(v), 0, 0, 555})
}
it.Seek([]byte(""))
require.True(t, it.Valid())
Expand Down
8 changes: 4 additions & 4 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func (b *TableBuilder) addHelper(key []byte, v y.ValueStruct) {
h := header{
plen: uint16(len(key) - len(diffKey)),
klen: uint16(len(diffKey)),
vlen: uint16(len(v.Value) + 1 + 2 + 1), // Include meta byte and casCounter.
prev: b.prevOffset, // prevOffset is the location of the last key-value added.
vlen: uint16(len(v.Value) + y.MetaSize + y.UserMetaSize + y.CasSize),
prev: b.prevOffset, // prevOffset is the location of the last key-value added.
}
b.prevOffset = uint32(b.buf.Len()) - b.baseOffset // Remember current offset for the next Add call.

Expand All @@ -148,8 +148,8 @@ func (b *TableBuilder) addHelper(key []byte, v y.ValueStruct) {
b.buf.Write(diffKey) // We only need to store the key difference.
b.buf.WriteByte(v.Meta) // Meta byte precedes actual value.
b.buf.WriteByte(v.UserMeta)
var casBytes [2]byte
binary.BigEndian.PutUint16(casBytes[:], v.CASCounter)
var casBytes [y.CasSize]byte
binary.BigEndian.PutUint64(casBytes[:], v.CASCounter)
b.buf.Write(casBytes[:])
b.buf.Write(v.Value)
b.counter++ // Increment number of keys added for this current block.
Expand Down
12 changes: 3 additions & 9 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package table

import (
"bytes"
"encoding/binary"
"io"
"math"
"sort"
Expand Down Expand Up @@ -391,14 +390,9 @@ func (itr *TableIterator) Key() []byte {
return itr.bi.Key()
}

func (itr *TableIterator) Value() y.ValueStruct {
v := itr.bi.Value()
return y.ValueStruct{
Value: v[4:],
Meta: v[0],
UserMeta: v[1],
CASCounter: binary.BigEndian.Uint16(v[2:4]),
}
func (itr *TableIterator) Value() (ret y.ValueStruct) {
ret.DecodeEntireSlice(itr.bi.Value())
return
}

func (s *TableIterator) Next() {
Expand Down
8 changes: 4 additions & 4 deletions table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
})
for i, kv := range keyValues {
y.AssertTrue(len(kv) == 2)
err := b.Add([]byte(kv[0]), y.ValueStruct{[]byte(kv[1]), 'A', uint16(i), 0})
err := b.Add([]byte(kv[0]), y.ValueStruct{[]byte(kv[1]), 'A', 0, uint64(i)})
if t != nil {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -617,7 +617,7 @@ func BenchmarkRead(b *testing.B) {
for i := 0; i < n; i++ {
k := fmt.Sprintf("%016x", i)
v := fmt.Sprintf("%d", i)
y.Check(builder.Add([]byte(k), y.ValueStruct{[]byte(v), 123, 5555, 0}))
y.Check(builder.Add([]byte(k), y.ValueStruct{[]byte(v), 123, 0, 5555}))
}

f.Write(builder.Finish([]byte("somemetadata")))
Expand Down Expand Up @@ -647,7 +647,7 @@ func BenchmarkReadAndBuild(b *testing.B) {
for i := 0; i < n; i++ {
k := fmt.Sprintf("%016x", i)
v := fmt.Sprintf("%d", i)
y.Check(builder.Add([]byte(k), y.ValueStruct{[]byte(v), 123, 5555, 0}))
y.Check(builder.Add([]byte(k), y.ValueStruct{[]byte(v), 123, 0, 5555}))
}

f.Write(builder.Finish([]byte("somemetadata")))
Expand Down Expand Up @@ -687,7 +687,7 @@ func BenchmarkReadMerged(b *testing.B) {
// id := i*tableSize+j (not interleaved)
k := fmt.Sprintf("%016x", id)
v := fmt.Sprintf("%d", id)
y.Check(builder.Add([]byte(k), y.ValueStruct{[]byte(v), 123, 5555, 0}))
y.Check(builder.Add([]byte(k), y.ValueStruct{[]byte(v), 123, 0, 5555}))
}
f.Write(builder.Finish([]byte("somemetadata")))
tbl, err := OpenTable(f, MemoryMap)
Expand Down
Loading

0 comments on commit 6abc548

Please sign in to comment.