forked from cloudflare/go-stream
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbatcher.go
167 lines (149 loc) · 5.13 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package stream
import (
"time"
"github.com/cevian/go-stream/util/slog"
)
type BatchContainer interface {
Flush(chan<- Object) bool
FlushAll(chan<- Object) bool
HasItems() bool
Add(object Object)
IsFull() bool //container shouldn't remain full after a flush. Invariant: IsFull() implies HasItems()
}
type BatcherOperator struct {
*HardStopChannelCloser
*BaseIn
*BaseOut
name string
container BatchContainer
MaxOutstanding uint
processedDownstream ProcessedNotifier
minWaitAfterFirstItem time.Duration
minWaitBetweenFlushes time.Duration
minWaitForLeftover time.Duration
outstanding uint
total_flushes uint
}
func NewBatchOperator(name string, container BatchContainer, processedDownstream ProcessedNotifier) *BatcherOperator {
return &BatcherOperator{NewHardStopChannelCloser(), NewBaseIn(CHAN_SLACK), NewBaseOut(CHAN_SLACK), name, container, 1,
processedDownstream, time.Second, time.Second, time.Second, 0, 0}
}
func (op *BatcherOperator) SetTimeouts(td time.Duration) {
op.minWaitAfterFirstItem = td
op.minWaitBetweenFlushes = td
op.minWaitForLeftover = td
}
//INVARIANT CAN FLUSH OR WAITING: DownstreamCanAcceptFlush || DownstreamWillCallback
func (op *BatcherOperator) DownstreamCanAcceptFlush() bool {
return op.MaxOutstanding == 0 || op.outstanding < op.MaxOutstanding
}
func (op *BatcherOperator) DownstreamWillCallback() bool {
return op.MaxOutstanding != 0 && op.outstanding >= op.MaxOutstanding
}
func (op *BatcherOperator) Flush() {
op.total_flushes += 1
if op.container.Flush(op.Out()) {
op.outstanding += 1
}
}
func (op *BatcherOperator) LastFlush() {
op.total_flushes += 1
if op.container.FlushAll(op.Out()) {
op.outstanding += 1
}
}
func (op *BatcherOperator) Run() error {
defer close(op.Out())
/* batchExpired puts a lower bound on how often flushes occur */
var batchExpired <-chan time.Time
batchExpired = nil
//INVARIANT: if container.HasItems() then it will be flushed eventually
//We create a state machine with 3 boolean states, state hi = container.HasItems(), wcb = op.DownstreamWillCallback(), bne = (batchExpired != nil) (batch not expired)
//invariants required:
// INVARIANT LIMITED_DRCB => repeated DRCB calls will eventually cause wcb == false
// INVARIANT !wcb can only become wcb after a FLUSH
// INVARIANT CAN FLUSH OR WAIT => either DownstreamWillCallback or DownstreamCanAcceptFlush is true
//lets analyse cases where hi == true:
// wcb && bne =>
// Case IN => wcb && bne [Case BE or DRCB will eventually happen]
// Case BE => PROGRESS 1 || wcb && !bne
// Case DRCB => wcb && bne [can't recurse indefinitely by LIMITED_DRCB] || !wcb && bne
// wcb && !bne =>
// Case IN => wcb && !bne [case DRCB will eventually happen]
// Case BE => impossible
// Case DRCB =>
// DownstreamCanAcceptFlush => PROGRESS 2
// else: wcb && bne || wcb && !bne [can't recurse indef by LIMITED_DRCB] || !wcb && bne
//!wcb && bne
// case IN => !wcb && bne [case BE will eventually happen]
// case BE =>
// !DownstreamCanAcceptFlush => impossible [INVARIANT CANFLUSH_OR_WAIT]
// else => PROGRESS 2
//case DRCB => impossisible (!wcb)
//!wcb && !bne => impossible (all cases disallow this)
//liveness: has items => either batch_expired != nil or DownstreamWillCallback
for {
in := op.In()
if op.container.IsFull() {
if op.DownstreamCanAcceptFlush() {
//PROGRESS 1
op.Flush()
batchExpired = time.After(op.minWaitBetweenFlushes)
} else {
if !op.DownstreamWillCallback() && batchExpired == nil {
panic("Batcher deadlocked. Should not happen")
}
in = nil
}
}
select {
//case IN
case obj, ok := <-in:
if ok {
op.container.Add(obj)
if !op.DownstreamWillCallback() && op.container.HasItems() && batchExpired == nil { //used by first item
batchExpired = time.After(op.minWaitAfterFirstItem)
}
//IMPOSSIBLE: hi && !wcb && !bne
} else {
if op.container.HasItems() {
op.LastFlush()
}
if op.container.HasItems() {
slog.Fatalf("Last flush did not empty container, some stuff will never be sent")
}
slog.Debugf("Batch Operator ", op.name, " flushed ", op.total_flushes)
return nil
}
//case BE
case <-batchExpired:
batchExpired = nil
if op.DownstreamCanAcceptFlush() {
//PROGRESS 1
op.Flush()
batchExpired = time.After(op.minWaitBetweenFlushes)
}
if !op.DownstreamWillCallback() && op.container.HasItems() && batchExpired == nil {
batchExpired = time.After(op.minWaitForLeftover)
}
//impossibe: hi && !wcb && !bne
case <-op.StopNotifier:
//INVARIANT and PROGRESS Violated. Hard Stop
return nil
//case DRCB
case count := <-op.processedDownstream.NotificationChannel():
if op.outstanding == 0 {
panic("Should never happen, will cause underflow")
}
op.outstanding -= count
if op.DownstreamCanAcceptFlush() && op.container.HasItems() && batchExpired == nil {
op.Flush()
batchExpired = time.After(op.minWaitBetweenFlushes)
}
if !op.DownstreamWillCallback() && op.container.HasItems() && batchExpired == nil {
batchExpired = time.After(op.minWaitForLeftover)
}
//impossibe: hi && !wcb && !bne
}
}
}