forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretry.go
156 lines (133 loc) · 3.81 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package lib
import (
"time"
)
const (
defaultMinFailures = 0
defaultMaxWait = 2 * time.Minute
)
// Interface used for offloading jitter calculations from the RetryWaiter
type Jitter interface {
AddJitter(baseTime time.Duration) time.Duration
}
// Calculates a random jitter between 0 and up to a specific percentage of the baseTime
type JitterRandomStagger struct {
// int64 because we are going to be doing math against an int64 to represent nanoseconds
percent int64
}
// Creates a new JitterRandomStagger
func NewJitterRandomStagger(percent int) *JitterRandomStagger {
if percent < 0 {
percent = 0
}
return &JitterRandomStagger{
percent: int64(percent),
}
}
// Implments the Jitter interface
func (j *JitterRandomStagger) AddJitter(baseTime time.Duration) time.Duration {
if j.percent == 0 {
return baseTime
}
// time.Duration is actually a type alias for int64 which is why casting
// to the duration type and then dividing works
return baseTime + RandomStagger((baseTime*time.Duration(j.percent))/100)
}
// RetryWaiter will record failed and successful operations and provide
// a channel to wait on before a failed operation can be retried.
type RetryWaiter struct {
minFailures uint
minWait time.Duration
maxWait time.Duration
jitter Jitter
failures uint
}
// Creates a new RetryWaiter
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *RetryWaiter {
if minFailures < 0 {
minFailures = defaultMinFailures
}
if maxWait <= 0 {
maxWait = defaultMaxWait
}
if minWait <= 0 {
minWait = 0 * time.Nanosecond
}
return &RetryWaiter{
minFailures: uint(minFailures),
minWait: minWait,
maxWait: maxWait,
failures: 0,
jitter: jitter,
}
}
// calculates the necessary wait time before the
// next operation should be allowed.
func (rw *RetryWaiter) calculateWait() time.Duration {
waitTime := rw.minWait
if rw.failures > rw.minFailures {
shift := rw.failures - rw.minFailures - 1
waitTime = rw.maxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > rw.maxWait {
waitTime = rw.maxWait
}
if rw.jitter != nil {
waitTime = rw.jitter.AddJitter(waitTime)
}
}
if waitTime < rw.minWait {
waitTime = rw.minWait
}
return waitTime
}
// calculates the waitTime and returns a chan
// that will become selectable once that amount
// of time has elapsed.
func (rw *RetryWaiter) wait() <-chan struct{} {
waitTime := rw.calculateWait()
ch := make(chan struct{})
if waitTime > 0 {
time.AfterFunc(waitTime, func() { close(ch) })
} else {
// if there should be 0 wait time then we ensure
// that the chan will be immediately selectable
close(ch)
}
return ch
}
// Marks that an operation is successful which resets the failure count.
// The chan that is returned will be immediately selectable
func (rw *RetryWaiter) Success() <-chan struct{} {
rw.Reset()
return rw.wait()
}
// Marks that an operation failed. The chan returned will be selectable
// once the calculated retry wait amount of time has elapsed
func (rw *RetryWaiter) Failed() <-chan struct{} {
rw.failures += 1
ch := rw.wait()
return ch
}
// Resets the internal failure counter
func (rw *RetryWaiter) Reset() {
rw.failures = 0
}
// WaitIf is a convenice method to record whether the last
// operation was a success or failure and return a chan that
// will be selectablw when the next operation can be done.
func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
if failure {
return rw.Failed()
}
return rw.Success()
}
// WaitIfErr is a convenience method to record whether the last
// operation was a success or failure based on whether the err
// is nil and then return a chan that will be selectable when
// the next operation can be done.
func (rw *RetryWaiter) WaitIfErr(err error) <-chan struct{} {
return rw.WaitIf(err != nil)
}