Skip to content

Commit

Permalink
Merge pull request coocood#35 from jb0n/master
Browse files Browse the repository at this point in the history
fixes for various race conditions
  • Loading branch information
coocood authored Feb 13, 2018
2 parents a955dc1 + e2d3298 commit f3233c8
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 28 deletions.
11 changes: 7 additions & 4 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"github.com/cespare/xxhash"
)

const (
minBufSize = 512 * 1024
)

type Cache struct {
locks [256]sync.Mutex
segments [256]segment
Expand All @@ -22,8 +26,8 @@ func hashFunc(data []byte) uint64 {
// `debug.SetGCPercent()`, set it to a much smaller value
// to limit the memory consumption and GC pause time.
func NewCache(size int) (cache *Cache) {
if size < 512*1024 {
size = 512 * 1024
if size < minBufSize {
size = minBufSize
}
cache = new(Cache)
for i := 0; i < 256; i++ {
Expand Down Expand Up @@ -179,8 +183,7 @@ func (cache *Cache) OverwriteCount() (overwriteCount int64) {
func (cache *Cache) Clear() {
for i := 0; i < 256; i++ {
cache.locks[i].Lock()
newSeg := newSegment(len(cache.segments[i].rb.data), i)
cache.segments[i] = newSeg
cache.segments[i].clear()
cache.locks[i].Unlock()
}
}
Expand Down
71 changes: 71 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
mrand "math/rand"
"strings"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -421,6 +423,75 @@ func TestSetLargerEntryDeletesWrongEntry(t *testing.T) {
}
}

func TestRace(t *testing.T) {
cache := NewCache(minBufSize)
inUse := 8
wg := sync.WaitGroup{}
var iters int64 = 1000

wg.Add(6)
addFunc := func() {
var i int64
for i = 0; i < iters; i++ {
err := cache.SetInt(int64(mrand.Intn(inUse)), []byte("abc"), 1)
if err != nil {
t.Errorf("err: %s", err)
}
}
wg.Done()
}
getFunc := func() {
var i int64
for i = 0; i < iters; i++ {
_, _ = cache.GetInt(int64(mrand.Intn(inUse))) //it will likely error w/ delFunc running too
}
wg.Done()
}
delFunc := func() {
var i int64
for i = 0; i < iters; i++ {
cache.DelInt(int64(mrand.Intn(inUse)))
}
wg.Done()
}
evacFunc := func() {
var i int64
for i = 0; i < iters; i++ {
_ = cache.EvacuateCount()
_ = cache.ExpiredCount()
_ = cache.EntryCount()
_ = cache.AverageAccessTime()
_ = cache.HitCount()
_ = cache.LookupCount()
_ = cache.HitRate()
_ = cache.OverwriteCount()
}
wg.Done()
}
resetFunc := func() {
var i int64
for i = 0; i < iters; i++ {
cache.ResetStatistics()
}
wg.Done()
}
clearFunc := func() {
var i int64
for i = 0; i < iters; i++ {
cache.Clear()
}
wg.Done()
}

go addFunc()
go getFunc()
go delFunc()
go evacFunc()
go resetFunc()
go clearFunc()
wg.Wait()
}

func BenchmarkCacheSet(b *testing.B) {
cache := NewCache(256 * 1024 * 1024)
var key [8]byte
Expand Down
68 changes: 44 additions & 24 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package freecache

import (
"errors"
"sync/atomic"
"time"
"unsafe"
)
Expand Down Expand Up @@ -98,10 +99,10 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e
hdr.valLen = uint32(len(value))
if hdr.valCap >= hdr.valLen {
//in place overwrite
seg.totalTime += int64(hdr.accessTime) - int64(originAccessTime)
atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
seg.overwrites++
atomic.AddInt64(&seg.overwrites, 1)
return
}
// avoid unnecessary memory copy.
Expand Down Expand Up @@ -143,8 +144,8 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e
seg.rb.Write(key)
seg.rb.Write(value)
seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
seg.totalTime += int64(now)
seg.totalCount++
atomic.AddInt64(&seg.totalTime, int64(now))
atomic.AddInt64(&seg.totalCount, 1)
seg.vacuumLen -= entryLen
return
}
Expand All @@ -159,31 +160,31 @@ func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModi
oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
if oldHdr.deleted {
consecutiveEvacuate = 0
seg.totalTime -= int64(oldHdr.accessTime)
seg.totalCount--
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
continue
}
expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now
leastRecentUsed := int64(oldHdr.accessTime)*seg.totalCount <= seg.totalTime
leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
if expired || leastRecentUsed || consecutiveEvacuate > 5 {
seg.delEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff)
if oldHdr.slotId == slotId {
slotModified = true
}
consecutiveEvacuate = 0
seg.totalTime -= int64(oldHdr.accessTime)
seg.totalCount--
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
if expired {
seg.totalExpired++
atomic.AddInt64(&seg.totalExpired, 1)
}
} else {
// evacuate an old entry that has been accessed recently for better cache hit rate.
newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff)
consecutiveEvacuate++
seg.totalEvacuate++
atomic.AddInt64(&seg.totalEvacuate, 1)
}
}
return
Expand All @@ -197,7 +198,7 @@ func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint
idx, match := seg.lookup(slot, hash16, key)
if !match {
err = ErrNotFound
seg.missCount += 1
atomic.AddInt64(&seg.missCount, 1)
return
}
ptr := &slot[idx]
Expand All @@ -210,19 +211,18 @@ func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint

if hdr.expireAt != 0 && hdr.expireAt <= now {
seg.delEntryPtr(slotId, hash16, ptr.offset)
seg.totalExpired++
atomic.AddInt64(&seg.totalExpired, 1)
err = ErrNotFound
seg.missCount += 1
atomic.AddInt64(&seg.missCount, 1)
return
}

seg.totalTime += int64(now - hdr.accessTime)
atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime))
hdr.accessTime = now
seg.rb.WriteAt(hdrBuf[:], ptr.offset)
value = make([]byte, hdr.valLen)

seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
seg.hitCount += 1
atomic.AddInt64(&seg.hitCount, 1)
return
}

Expand Down Expand Up @@ -296,7 +296,7 @@ func (seg *segment) insertEntryPtr(slotId uint8, hash16 uint16, offset int64, id
slotOff *= 2
}
seg.slotLens[slotId]++
seg.entryCount++
atomic.AddInt64(&seg.entryCount, 1)
slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
copy(slot[idx+1:], slot[idx:])
slot[idx].offset = offset
Expand All @@ -318,7 +318,7 @@ func (seg *segment) delEntryPtr(slotId uint8, hash16 uint16, offset int64) {
seg.rb.WriteAt(entryHdrBuf[:], offset)
copy(slot[idx:], slot[idx+1:])
seg.slotLens[slotId]--
seg.entryCount--
atomic.AddInt64(&seg.entryCount, -1)
}

func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) {
Expand Down Expand Up @@ -368,9 +368,29 @@ func (seg *segment) lookupByOff(slot []entryPtr, hash16 uint16, offset int64) (i
}

func (seg *segment) resetStatistics() {
seg.totalEvacuate = 0
seg.totalExpired = 0
seg.overwrites = 0
seg.hitCount = 0
seg.missCount = 0
atomic.StoreInt64(&seg.totalEvacuate, 0)
atomic.StoreInt64(&seg.totalExpired, 0)
atomic.StoreInt64(&seg.overwrites, 0)
atomic.StoreInt64(&seg.hitCount, 0)
atomic.StoreInt64(&seg.missCount, 0)
}

func (seg *segment) clear() {
bufSize := len(seg.rb.data)
seg.rb = NewRingBuf(bufSize, 0)
seg.vacuumLen = int64(bufSize)
seg.slotCap = 1
seg.slotsData = make([]entryPtr, 256*seg.slotCap)
for i := 0; i < len(seg.slotLens); i++ {
seg.slotLens[i] = 0
}

atomic.StoreInt64(&seg.hitCount, 0)
atomic.StoreInt64(&seg.missCount, 0)
atomic.StoreInt64(&seg.entryCount, 0)
atomic.StoreInt64(&seg.totalCount, 0)
atomic.StoreInt64(&seg.totalTime, 0)
atomic.StoreInt64(&seg.totalEvacuate, 0)
atomic.StoreInt64(&seg.totalExpired, 0)
atomic.StoreInt64(&seg.overwrites, 0)
}

0 comments on commit f3233c8

Please sign in to comment.