forked from hybridgroup/gobot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
robot_work.go
199 lines (173 loc) · 4.75 KB
/
robot_work.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package gobot
import (
"context"
"fmt"
"sync"
"time"
"github.com/gofrs/uuid"
)
// RobotWorkRegistry contains all the work units registered on a Robot
type RobotWorkRegistry struct {
sync.RWMutex
r map[string]*RobotWork
}
const (
EveryWorkKind = "every"
AfterWorkKind = "after"
)
// RobotWork and the RobotWork registry represent units of executing computation
// managed at the Robot level. Unlike the utility functions gobot.After and gobot.Every,
// RobotWork units require a context.Context, and can be cancelled externally by calling code.
//
// Usage:
//
// someWork := myRobot.Every(context.Background(), time.Second * 2, func(){
// fmt.Println("Here I am doing work")
// })
//
// someWork.CallCancelFunc() // Cancel next tick and remove from work registry
//
// goroutines for Every and After are run on their own WaitGroups for synchronization:
//
// someWork2 := myRobot.Every(context.Background(), time.Second * 2, func(){
// fmt.Println("Here I am doing more work")
// })
//
// somework2.CallCancelFunc()
//
// // wait for both Every calls to finish
// robot.WorkEveryWaitGroup().Wait()
type RobotWork struct {
id uuid.UUID
kind string
tickCount int
ctx context.Context //nolint:containedctx // done by intention
cancelFunc context.CancelFunc
function func()
ticker *time.Ticker
duration time.Duration
}
// ID returns the UUID of the RobotWork
func (rw *RobotWork) ID() uuid.UUID {
return rw.id
}
// CancelFunc returns the context.CancelFunc used to cancel the work
func (rw *RobotWork) CancelFunc() context.CancelFunc {
return rw.cancelFunc
}
// CallCancelFunc calls the context.CancelFunc used to cancel the work
func (rw *RobotWork) CallCancelFunc() {
rw.cancelFunc()
}
// Ticker returns the time.Ticker used in an Every so that calling code can sync on the same channel
func (rw *RobotWork) Ticker() *time.Ticker {
if rw.kind == AfterWorkKind {
return nil
}
return rw.ticker
}
// TickCount returns the number of times the function successfully ran
func (rw *RobotWork) TickCount() int {
return rw.tickCount
}
// Duration returns the timeout until an After fires or the period of an Every
func (rw *RobotWork) Duration() time.Duration {
return rw.duration
}
func (rw *RobotWork) String() string {
format := `ID: %s
Kind: %s
TickCount: %d
`
return fmt.Sprintf(format, rw.id, rw.kind, rw.tickCount)
}
// WorkRegistry returns the Robot's WorkRegistry
func (r *Robot) WorkRegistry() *RobotWorkRegistry {
return r.workRegistry
}
// Every calls the given function for every tick of the provided duration.
func (r *Robot) Every(ctx context.Context, d time.Duration, f func()) *RobotWork {
rw := r.workRegistry.registerEvery(ctx, d, f)
r.WorkEveryWaitGroup.Add(1)
go func() {
EVERYWORK:
for {
select {
case <-rw.ctx.Done():
r.workRegistry.delete(rw.id)
rw.ticker.Stop()
break EVERYWORK
case <-rw.ticker.C:
f()
rw.tickCount++
}
}
r.WorkEveryWaitGroup.Done()
}()
return rw
}
// After calls the given function after the provided duration has elapsed
func (r *Robot) After(ctx context.Context, d time.Duration, f func()) *RobotWork {
rw := r.workRegistry.registerAfter(ctx, d, f)
ch := time.After(d)
r.WorkAfterWaitGroup.Add(1)
go func() {
AFTERWORK:
for {
select {
case <-rw.ctx.Done():
r.workRegistry.delete(rw.id)
break AFTERWORK
case <-ch:
f()
}
}
r.WorkAfterWaitGroup.Done()
}()
return rw
}
// Get returns the RobotWork specified by the provided ID. To delete something from the registry, it's
// necessary to call its context.CancelFunc, which will perform a goroutine-safe delete on the underlying
// map.
func (rwr *RobotWorkRegistry) Get(id uuid.UUID) *RobotWork {
rwr.Lock()
defer rwr.Unlock()
return rwr.r[id.String()]
}
// Delete returns the RobotWork specified by the provided ID
func (rwr *RobotWorkRegistry) delete(id uuid.UUID) {
rwr.Lock()
defer rwr.Unlock()
delete(rwr.r, id.String())
}
// registerAfter creates a new unit of RobotWork and sets up its context/cancellation
func (rwr *RobotWorkRegistry) registerAfter(ctx context.Context, d time.Duration, f func()) *RobotWork {
rwr.Lock()
defer rwr.Unlock()
id, _ := uuid.NewV4()
rw := &RobotWork{
id: id,
kind: AfterWorkKind,
function: f,
duration: d,
}
rw.ctx, rw.cancelFunc = context.WithCancel(ctx)
rwr.r[id.String()] = rw
return rw
}
// registerEvery creates a new unit of RobotWork and sets up its context/cancellation
func (rwr *RobotWorkRegistry) registerEvery(ctx context.Context, d time.Duration, f func()) *RobotWork {
rwr.Lock()
defer rwr.Unlock()
id, _ := uuid.NewV4()
rw := &RobotWork{
id: id,
kind: EveryWorkKind,
function: f,
duration: d,
ticker: time.NewTicker(d),
}
rw.ctx, rw.cancelFunc = context.WithCancel(ctx)
rwr.r[id.String()] = rw
return rw
}