forked from alitto/pond
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroup_blackbox_test.go
153 lines (119 loc) · 3.1 KB
/
group_blackbox_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package pond_test
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/alitto/pond"
)
func TestGroupSubmit(t *testing.T) {
pool := pond.New(5, 1000)
assertEqual(t, 0, pool.RunningWorkers())
// Submit groups of tasks
var doneCount, taskCount int32
var groups []*pond.TaskGroup
for i := 0; i < 5; i++ {
group := pool.Group()
for j := 0; j < i+5; j++ {
group.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&doneCount, 1)
})
taskCount++
}
groups = append(groups, group)
}
// Wait for all groups to complete
for _, group := range groups {
group.Wait()
}
assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount))
}
func TestGroupContext(t *testing.T) {
pool := pond.New(3, 100)
assertEqual(t, 0, pool.RunningWorkers())
// Submit a group of tasks
var doneCount, startedCount int32
group, ctx := pool.GroupContext(context.Background())
for i := 0; i < 10; i++ {
group.Submit(func() error {
atomic.AddInt32(&startedCount, 1)
select {
case <-time.After(5 * time.Millisecond):
atomic.AddInt32(&doneCount, 1)
case <-ctx.Done():
}
return nil
})
}
err := group.Wait()
assertEqual(t, nil, err)
assertEqual(t, int32(10), atomic.LoadInt32(&startedCount))
assertEqual(t, int32(10), atomic.LoadInt32(&doneCount))
}
func TestGroupContextWithError(t *testing.T) {
pool := pond.New(1, 100)
assertEqual(t, 0, pool.RunningWorkers())
expectedErr := errors.New("Something went wrong")
// Submit a group of tasks
var doneCount, startedCount int32
group, ctx := pool.GroupContext(context.Background())
for i := 0; i < 10; i++ {
n := i
group.Submit(func() error {
atomic.AddInt32(&startedCount, 1)
// Task number 5 fails
if n == 4 {
time.Sleep(10 * time.Millisecond)
return expectedErr
}
select {
case <-time.After(5 * time.Millisecond):
atomic.AddInt32(&doneCount, 1)
case <-ctx.Done():
}
return nil
})
}
err := group.Wait()
assertEqual(t, expectedErr, err)
pool.StopAndWait()
assertEqual(t, int32(5), atomic.LoadInt32(&startedCount))
assertEqual(t, int32(4), atomic.LoadInt32(&doneCount))
}
func TestGroupContextWithNilContext(t *testing.T) {
pool := pond.New(3, 100)
var thrownPanic interface{}
func() {
defer func() {
thrownPanic = recover()
}()
pool.GroupContext(nil)
}()
assertEqual(t, "a non-nil context needs to be specified when using GroupContext", thrownPanic)
}
func TestGroupContextWithCanceledContext(t *testing.T) {
pool := pond.New(3, 100)
assertEqual(t, 0, pool.RunningWorkers())
// Submit a group of tasks
var doneCount, startedCount int32
userCtx, cancel := context.WithCancel(context.Background())
group, ctx := pool.GroupContext(userCtx)
for i := 0; i < 10; i++ {
group.Submit(func() error {
atomic.AddInt32(&startedCount, 1)
select {
case <-time.After(10 * time.Millisecond):
atomic.AddInt32(&doneCount, 1)
case <-ctx.Done():
}
return nil
})
}
// Cancel context right after submitting tasks
cancel()
err := group.Wait()
assertEqual(t, context.Canceled, err)
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
}