Skip to content

Commit

Permalink
mock: add Counters, Gauges and Timers methods to get a map of all sta…
Browse files Browse the repository at this point in the history
…ts (lyft#90)

* mock: Sink replace mutex with atomic.Value

This improves Sink performance when called in parallel.

```
benchmark                             old ns/op     new ns/op     delta
BenchmarkFlushCounter-12              50.4          51.9          +2.98%
BenchmarkFlushCounter_Parallel-12     36.5          10.5          -71.23%
BenchmarkFlushTimer-12                52.1          49.8          -4.41%
BenchmarkFlushTimer_Parallel-12       42.0          13.5          -67.86%

benchmark                             old allocs     new allocs     delta
BenchmarkFlushCounter-12              0              0              +0.00%
BenchmarkFlushCounter_Parallel-12     0              0              +0.00%
BenchmarkFlushTimer-12                0              0              +0.00%
BenchmarkFlushTimer_Parallel-12       0              0              +0.00%

benchmark                             old bytes     new bytes     delta
BenchmarkFlushCounter-12              0             0             +0.00%
BenchmarkFlushCounter_Parallel-12     0             0             +0.00%
BenchmarkFlushTimer-12                0             0             +0.00%
BenchmarkFlushTimer_Parallel-12       0             0             +0.00%
```

* mock: add Counters, Gauges and Timers methods to get a map of all stats

This PR also increases test coverage.
  • Loading branch information
charlievieth authored Jun 18, 2020
1 parent d95cbc0 commit f9add79
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 50 deletions.
121 changes: 75 additions & 46 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,51 @@ type entry struct {
count int64
}

// A Sink is a mock sink meant for testing that is safe for concurrent use.
type Sink struct {
type sink struct {
counters sync.Map
timers sync.Map
gauges sync.Map
// write held only during Reset(), all map accesses must hold the read lock
mu sync.RWMutex
}

// A Sink is a mock sink meant for testing that is safe for concurrent use.
type Sink struct {
store atomic.Value
once sync.Once
}

func (s *Sink) sink() *sink {
s.once.Do(func() { s.store.Store(new(sink)) })
return s.store.Load().(*sink)
}

func (s *Sink) counters() *sync.Map { return &s.sink().counters }
func (s *Sink) timers() *sync.Map { return &s.sink().timers }
func (s *Sink) gauges() *sync.Map { return &s.sink().gauges }

// NewSink returns a new Sink which implements the stats.Sink interface and is
// suitable for testing.
func NewSink() *Sink {
return new(Sink)
s := &Sink{}
s.sink() // lazy init
return s
}

// Flush is a no-op method
func (*Sink) Flush() {}

// Reset resets the Sink's counters, timers and gauges to zero.
func (s *Sink) Reset() {
s.mu.Lock()
s.counters = sync.Map{}
s.timers = sync.Map{}
s.gauges = sync.Map{}
s.mu.Unlock()
s.store.Store(new(sink))
}

// FlushCounter implements the stats.Sink.FlushCounter method and adds val to
// stat name.
func (s *Sink) FlushCounter(name string, val uint64) {
s.mu.RLock()
v, ok := s.counters.Load(name)
counters := s.counters()
v, ok := counters.Load(name)
if !ok {
v, _ = s.counters.LoadOrStore(name, new(entry))
v, _ = counters.LoadOrStore(name, new(entry))
}
s.mu.RUnlock()
p := v.(*entry)
atomic.AddUint64(&p.val, val)
atomic.AddInt64(&p.count, 1)
Expand All @@ -57,12 +66,11 @@ func (s *Sink) FlushCounter(name string, val uint64) {
// FlushGauge implements the stats.Sink.FlushGauge method and adds val to
// stat name.
func (s *Sink) FlushGauge(name string, val uint64) {
s.mu.RLock()
v, ok := s.gauges.Load(name)
gauges := s.gauges()
v, ok := gauges.Load(name)
if !ok {
v, _ = s.gauges.LoadOrStore(name, new(entry))
v, _ = gauges.LoadOrStore(name, new(entry))
}
s.mu.RUnlock()
p := v.(*entry)
atomic.AddUint64(&p.val, val)
atomic.AddInt64(&p.count, 1)
Expand All @@ -83,22 +91,19 @@ func atomicAddFloat64(dest *uint64, delta float64) {
// FlushTimer implements the stats.Sink.FlushTimer method and adds val to
// stat name.
func (s *Sink) FlushTimer(name string, val float64) {
s.mu.RLock()
v, ok := s.timers.Load(name)
timers := s.timers()
v, ok := timers.Load(name)
if !ok {
v, _ = s.timers.LoadOrStore(name, new(entry))
v, _ = timers.LoadOrStore(name, new(entry))
}
s.mu.RUnlock()
p := v.(*entry)
atomicAddFloat64(&p.val, val)
atomic.AddInt64(&p.count, 1)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
s.mu.RLock()
v, ok := s.counters.Load(name)
s.mu.RUnlock()
v, ok := s.counters().Load(name)
if ok {
p := v.(*entry)
return atomic.LoadUint64(&p.val), true
Expand All @@ -108,9 +113,7 @@ func (s *Sink) LoadCounter(name string) (uint64, bool) {

// LoadGauge returns the value for stat name and if it was found.
func (s *Sink) LoadGauge(name string) (uint64, bool) {
s.mu.RLock()
v, ok := s.gauges.Load(name)
s.mu.RUnlock()
v, ok := s.gauges().Load(name)
if ok {
p := v.(*entry)
return atomic.LoadUint64(&p.val), true
Expand All @@ -120,9 +123,7 @@ func (s *Sink) LoadGauge(name string) (uint64, bool) {

// LoadTimer returns the value for stat name and if it was found.
func (s *Sink) LoadTimer(name string) (float64, bool) {
s.mu.RLock()
v, ok := s.timers.Load(name)
s.mu.RUnlock()
v, ok := s.timers().Load(name)
if ok {
p := v.(*entry)
bits := atomic.LoadUint64(&p.val)
Expand All @@ -131,6 +132,40 @@ func (s *Sink) LoadTimer(name string) (float64, bool) {
return 0, false
}

// Counters returns all the counters currently stored by the sink.
func (s *Sink) Counters() map[string]uint64 {
m := make(map[string]uint64)
s.counters().Range(func(k, v interface{}) bool {
p := v.(*entry)
m[k.(string)] = atomic.LoadUint64(&p.val)
return true
})
return m
}

// Gauges returns all the gauges currently stored by the sink.
func (s *Sink) Gauges() map[string]uint64 {
m := make(map[string]uint64)
s.gauges().Range(func(k, v interface{}) bool {
p := v.(*entry)
m[k.(string)] = atomic.LoadUint64(&p.val)
return true
})
return m
}

// Timers returns all the timers currently stored by the sink.
func (s *Sink) Timers() map[string]float64 {
m := make(map[string]float64)
s.timers().Range(func(k, v interface{}) bool {
p := v.(*entry)
bits := atomic.LoadUint64(&p.val)
m[k.(string)] = math.Float64frombits(bits)
return true
})
return m
}

// short-hand methods

// Counter is shorthand for LoadCounter, zero is returned if the stat is not found.
Expand All @@ -155,9 +190,7 @@ func (s *Sink) Timer(name string) float64 {

// CounterCallCount returns the number of times stat name has been called/updated.
func (s *Sink) CounterCallCount(name string) int64 {
s.mu.RLock()
v, ok := s.counters.Load(name)
s.mu.RUnlock()
v, ok := s.counters().Load(name)
if ok {
return atomic.LoadInt64(&v.(*entry).count)
}
Expand All @@ -166,9 +199,7 @@ func (s *Sink) CounterCallCount(name string) int64 {

// GaugeCallCount returns the number of times stat name has been called/updated.
func (s *Sink) GaugeCallCount(name string) int64 {
s.mu.RLock()
v, ok := s.gauges.Load(name)
s.mu.RUnlock()
v, ok := s.gauges().Load(name)
if ok {
return atomic.LoadInt64(&v.(*entry).count)
}
Expand All @@ -177,9 +208,7 @@ func (s *Sink) GaugeCallCount(name string) int64 {

// TimerCallCount returns the number of times stat name has been called/updated.
func (s *Sink) TimerCallCount(name string) int64 {
s.mu.RLock()
v, ok := s.timers.Load(name)
s.mu.RUnlock()
v, ok := s.timers().Load(name)
if ok {
return atomic.LoadInt64(&v.(*entry).count)
}
Expand Down Expand Up @@ -231,23 +260,23 @@ func (s *Sink) AssertTimerEquals(tb testing.TB, name string, exp float64) {
func (s *Sink) AssertCounterExists(tb testing.TB, name string) {
tb.Helper()
if _, ok := s.LoadCounter(name); !ok {
tb.Errorf("gostats/mock: Counter (%q): should exist", name)
tb.Errorf("gostats/mock: Counter (%q): not found", name)
}
}

// AssertGaugeExists asserts that Gauge name exists.
func (s *Sink) AssertGaugeExists(tb testing.TB, name string) {
tb.Helper()
if _, ok := s.LoadGauge(name); !ok {
tb.Errorf("gostats/mock: Gauge (%q): should exist", name)
tb.Errorf("gostats/mock: Gauge (%q): not found", name)
}
}

// AssertTimerExists asserts that Timer name exists.
func (s *Sink) AssertTimerExists(tb testing.TB, name string) {
tb.Helper()
if _, ok := s.LoadTimer(name); !ok {
tb.Errorf("gostats/mock: Timer (%q): should exist", name)
tb.Errorf("gostats/mock: Timer (%q): not found", name)
}
}

Expand Down Expand Up @@ -278,7 +307,7 @@ func (s *Sink) AssertTimerNotExists(tb testing.TB, name string) {
// AssertCounterCallCount asserts that Counter name was called exp times.
func (s *Sink) AssertCounterCallCount(tb testing.TB, name string, exp int) {
tb.Helper()
v, ok := s.counters.Load(name)
v, ok := s.counters().Load(name)
if !ok {
tb.Errorf("gostats/mock: Counter (%q): not found", name)
return
Expand All @@ -294,7 +323,7 @@ func (s *Sink) AssertCounterCallCount(tb testing.TB, name string, exp int) {
// AssertGaugeCallCount asserts that Gauge name was called exp times.
func (s *Sink) AssertGaugeCallCount(tb testing.TB, name string, exp int) {
tb.Helper()
v, ok := s.gauges.Load(name)
v, ok := s.gauges().Load(name)
if !ok {
tb.Errorf("gostats/mock: Gauge (%q): not found", name)
return
Expand All @@ -310,7 +339,7 @@ func (s *Sink) AssertGaugeCallCount(tb testing.TB, name string, exp int) {
// AssertTimerCallCount asserts that Timer name was called exp times.
func (s *Sink) AssertTimerCallCount(tb testing.TB, name string, exp int) {
tb.Helper()
v, ok := s.timers.Load(name)
v, ok := s.timers().Load(name)
if !ok {
tb.Errorf("gostats/mock: Timer (%q): not found", name)
return
Expand Down
2 changes: 1 addition & 1 deletion mock/sink_interface_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package mock_test

import (
"github.com/lyft/gostats"
stats "github.com/lyft/gostats"
"github.com/lyft/gostats/mock"
)

Expand Down
Loading

0 comments on commit f9add79

Please sign in to comment.