Skip to content

Commit

Permalink
lock count when multiple goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
omigo committed Aug 25, 2017
1 parent 1c391e4 commit 9ed4ee8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
15 changes: 12 additions & 3 deletions sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package sketch
import (
"fmt"
"math"
"sync"
"unsafe"

"github.com/arstd/log"
"github.com/spaolacci/murmur3"
)

Expand All @@ -17,6 +17,7 @@ type Sketch struct {
width uint32
depth uint32
count [][]CountType
mutex sync.RWMutex
}

// WidthDepth returns width and depth by formula in paper.
Expand All @@ -34,11 +35,11 @@ func WidthDepth(errorRatio, uncertainty float64) (width, depth uint32) {
defaultUncertainty = 1.0 / 1e3 // 0.1%
)
if errorRatio < 1.0/1e9 || errorRatio > 0.1 {
log.Warnf("error ratio %g not in [1e-9,0.1], use default %g", errorRatio, defaultErrorRatio)
fmt.Printf("error ratio %g not in [1e-9,0.1], use default %g\n", errorRatio, defaultErrorRatio)
errorRatio = defaultErrorRatio
}
if uncertainty < 1.0/1e9 || uncertainty > 0.1 {
log.Warnf("certainty %g not in [1e-9,0.1], use default %g", uncertainty, defaultUncertainty)
fmt.Printf("certainty %g not in [1e-9,0.1], use default %g\n", uncertainty, defaultUncertainty)
uncertainty = defaultUncertainty
}

Expand Down Expand Up @@ -69,11 +70,13 @@ func (sk *Sketch) String() string {
}

func (sk *Sketch) Clear() {
sk.mutex.Lock()
for i := uint32(0); i < sk.depth; i++ {
for j := uint32(0); j < sk.width; j++ {
sk.count[i][j] = 0
}
}
sk.mutex.Unlock()
}

func (sk *Sketch) Incr(dat []byte) (min CountType) {
Expand All @@ -86,12 +89,14 @@ func (sk *Sketch) Add(dat []byte, cnt CountType) (min CountType) {

min += cnt

sk.mutex.Lock()
for i := uint32(0); i < sk.depth; i++ {
v := sk.count[i][pos[i]]
if v < min {
sk.count[i][pos[i]] = min
}
}
sk.mutex.Unlock()

return min
}
Expand All @@ -114,11 +119,15 @@ func (sk *Sketch) positions(dat []byte) (pos []uint32) {

func (sk *Sketch) query(pos []uint32) (min CountType) {
min = Max

sk.mutex.RLock()
for i := uint32(0); i < sk.depth; i++ {
v := sk.count[i][pos[i]]
if min > v {
min = v
}
}
sk.mutex.RUnlock()

return min
}
42 changes: 35 additions & 7 deletions sketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package sketch
import (
"encoding/binary"
"fmt"
"sync"
"testing"

"innotechx.com/go-common/misc/random"
)

func TestWidthDepth(t *testing.T) {
Expand Down Expand Up @@ -58,7 +57,7 @@ func TestSketch(t *testing.T) {
}

sk := New(WidthDepth(1.0/float64(len(cases)), 0.001))
t.Log(sk.String())
fmt.Println(sk.String())
for _, c := range cases {
for j := CountType(0); j < c.times; j++ {
sk.Add(c.dat, c.cnt)
Expand All @@ -74,10 +73,37 @@ func TestSketch(t *testing.T) {
}
}

func TestRace(t *testing.T) {
sk := New(WidthDepth(0.0001, 0.001))
fmt.Println(sk.String())

var wg sync.WaitGroup
for g := 0; g < 100; g++ {
wg.Add(2)
go func() {
bs := make([]byte, 8)
for i := 0; i < 10000; i++ {
binary.BigEndian.PutUint64(bs, uint64(i)*65537)
sk.Incr(bs)
}
wg.Done()
}()
go func() {
bs := make([]byte, 8)
for i := 0; i < 10000; i++ {
binary.BigEndian.PutUint64(bs, uint64(i)*65537)
sk.Query(bs)
}
wg.Done()
}()
}
wg.Wait()
}

const nsample = 7e6

func TestErrors(t *testing.T) {
sk := New(WidthDepth(0.7/7e6, 0.001))
sk := New(WidthDepth(0.8/nsample, 0.001))
fmt.Println(sk.String())
bs := make([]byte, 4)
for i := uint32(0); i < nsample; i++ {
Expand All @@ -102,23 +128,25 @@ func TestErrors(t *testing.T) {
func BenchmarkIncr(b *testing.B) {
sk := New(WidthDepth(1.0/nsample, 0.001))
b.Log(sk.String())
bs := make([]byte, 8)
b.ResetTimer()
for i := 0; i < b.N; i++ {
bs := random.Bytes(16)
binary.BigEndian.PutUint64(bs, uint64(i)*65537)
sk.Incr(bs)
}
}

func BenchmarkQuery(b *testing.B) {
sk := New(WidthDepth(1.0/nsample, 0.001))
b.Log(sk.String())
bs := make([]byte, 8)
for i := 0; i < b.N; i++ {
bs := random.Bytes(16)
binary.BigEndian.PutUint64(bs, uint64(i)*65537)
sk.Incr(bs)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
bs := random.Bytes(16)
binary.BigEndian.PutUint64(bs, uint64(i)*65537)
sk.Query(bs)
}
}

0 comments on commit 9ed4ee8

Please sign in to comment.