forked from evcc-io/evcc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
102 lines (83 loc) · 1.88 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package util
import (
"sync"
"time"
"github.com/evcc-io/evcc/api"
)
// Monitor monitors values for regular updates
type Monitor[T any] struct {
val T
mu sync.RWMutex
once sync.Once
done chan struct{}
updated time.Time
timeout time.Duration
}
// NewMonitor created a new monitor with given timeout
func NewMonitor[T any](timeout time.Duration) *Monitor[T] {
return &Monitor[T]{
done: make(chan struct{}),
timeout: timeout,
}
}
// Set updates the current value and timestamp
func (m *Monitor[T]) Set(val T) {
m.SetFunc(func(_ T) T { return val })
}
// SetFunc updates the current value and timestamp while holding the lock
func (m *Monitor[T]) SetFunc(set func(T) T) {
m.mu.Lock()
defer m.mu.Unlock()
m.val = set(m.val)
m.updated = time.Now()
m.once.Do(func() { close(m.done) })
}
// Get returns the current value or ErrOutdated if timeout exceeded
func (m *Monitor[T]) Get() (T, error) {
var res T
err := m.GetFunc(func(v T) {
res = v
})
return res, err
}
// GetFunc returns the current value or ErrOutdated if timeout exceeded while holding the lock
func (m *Monitor[T]) GetFunc(get func(T)) error {
m.mu.RLock()
defer m.mu.RUnlock()
// without timeout set, error if not yet received
if m.timeout == 0 {
select {
case <-m.done:
get(m.val)
return nil
default:
return api.ErrOutdated
}
}
if time.Since(m.updated) > m.timeout {
err := api.ErrOutdated
// wait once on very first call
if m.updated.IsZero() {
m.mu.RUnlock()
// mark as waited once
m.mu.Lock()
m.updated.Add(time.Nanosecond)
m.mu.Unlock()
select {
case <-m.done:
// got value and updated timestamp
err = nil
case <-time.After(m.timeout):
}
m.mu.RLock()
}
get(m.val)
return err
}
get(m.val)
return nil
}
// Done signals if monitor has been updated at least once
func (m *Monitor[T]) Done() <-chan struct{} {
return m.done
}