forked from grafana/k6
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.go
125 lines (110 loc) · 3.51 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2021 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package output
import (
"fmt"
"sync"
"time"
"go.k6.io/k6/stats"
)
// SampleBuffer is a simple thread-safe buffer for metric samples. It should be
// used by most outputs, since we generally want to flush metric samples to the
// remote service asynchronously. We want to do it only every several seconds,
// and we don't want to block the Engine in the meantime.
type SampleBuffer struct {
sync.Mutex
buffer []stats.SampleContainer
maxLen int
}
// AddMetricSamples adds the given metric samples to the internal buffer.
func (sc *SampleBuffer) AddMetricSamples(samples []stats.SampleContainer) {
if len(samples) == 0 {
return
}
sc.Lock()
sc.buffer = append(sc.buffer, samples...)
sc.Unlock()
}
// GetBufferedSamples returns the currently buffered metric samples and makes a
// new internal buffer with some hopefully realistic size. If the internal
// buffer is empty, it will return nil.
func (sc *SampleBuffer) GetBufferedSamples() []stats.SampleContainer {
sc.Lock()
defer sc.Unlock()
buffered, bufferedLen := sc.buffer, len(sc.buffer)
if bufferedLen == 0 {
return nil
}
if bufferedLen > sc.maxLen {
sc.maxLen = bufferedLen
}
// Make the new buffer halfway between the previously allocated size and the
// maximum buffer size we've seen so far, to hopefully reduce copying a bit.
sc.buffer = make([]stats.SampleContainer, 0, (bufferedLen+sc.maxLen)/2)
return buffered
}
// PeriodicFlusher is a small helper for asynchronously flushing buffered metric
// samples on regular intervals. The biggest benefit is having a Stop() method
// that waits for one last flush before it returns.
type PeriodicFlusher struct {
period time.Duration
flushCallback func()
stop chan struct{}
stopped chan struct{}
once *sync.Once
}
func (pf *PeriodicFlusher) run() {
ticker := time.NewTicker(pf.period)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pf.flushCallback()
case <-pf.stop:
pf.flushCallback()
close(pf.stopped)
return
}
}
}
// Stop waits for the periodic flusher flush one last time and exit. You can
// safely call Stop() multiple times from different goroutines, you just can't
// call it from inside of the flushing function.
func (pf *PeriodicFlusher) Stop() {
pf.once.Do(func() {
close(pf.stop)
})
<-pf.stopped
}
// NewPeriodicFlusher creates a new PeriodicFlusher and starts its goroutine.
func NewPeriodicFlusher(period time.Duration, flushCallback func()) (*PeriodicFlusher, error) {
if period <= 0 {
return nil, fmt.Errorf("metric flush period should be positive but was %s", period)
}
pf := &PeriodicFlusher{
period: period,
flushCallback: flushCallback,
stop: make(chan struct{}),
stopped: make(chan struct{}),
once: &sync.Once{},
}
go pf.run()
return pf, nil
}