-
Notifications
You must be signed in to change notification settings - Fork 5
/
monitoring_service_test.go
124 lines (104 loc) · 2.57 KB
/
monitoring_service_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package sqsd
import (
"context"
"fmt"
"sort"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestMonitoringService(t *testing.T) {
rcvCh := make(chan Message, 100)
nextCh := make(chan struct{}, 100)
testInvokerFn := func(ctx context.Context, q Message) error {
rcvCh <- q
<-nextCh
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
broker := make(chan Message, 3)
w := startWorker(ctx, testInvoker(testInvokerFn), broker, &Gateway{})
monitor := NewMonitoringService(w)
resp, err := monitor.CurrentWorkings(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Empty(t, resp.GetTasks())
for i := 1; i <= 3; i++ {
broker <- Message{
ID: fmt.Sprintf("id:%d", i),
}
}
time.Sleep(100 * time.Millisecond)
resp, err = monitor.CurrentWorkings(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, resp)
tasks := resp.GetTasks()
assert.Len(t, tasks, 3)
ids := make([]string, 0, 3)
for i := 0; i < 3; i++ {
ids = append(ids, tasks[i].GetId())
nextCh <- struct{}{}
}
sort.Slice(ids, func(i, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
time.Sleep(100 * time.Millisecond)
for i := 4; i <= 6; i++ {
broker <- Message{
ID: fmt.Sprintf("id:%d", i),
}
}
time.Sleep(100 * time.Millisecond)
resp, err = monitor.CurrentWorkings(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, resp)
tasks = resp.GetTasks()
assert.Len(t, tasks, 3)
ids2 := make([]string, 0, 3)
for i := 0; i < 3; i++ {
ids2 = append(ids2, tasks[i].GetId())
nextCh <- struct{}{}
}
sort.Slice(ids2, func(i, j int) bool {
return strings.Compare(ids2[i], ids2[j]) < 0
})
assert.NotEqual(t, ids, ids2)
time.Sleep(50 * time.Millisecond)
for i := 7; i <= 9; i++ {
broker <- Message{
ID: fmt.Sprintf("id:%d", i),
}
}
time.Sleep(100 * time.Millisecond)
resp, err = monitor.CurrentWorkings(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, resp)
tasks = resp.GetTasks()
assert.Len(t, tasks, 3)
ids3 := make([]string, 0, 3)
for i := 0; i < 3; i++ {
ids3 = append(ids3, tasks[i].GetId())
nextCh <- struct{}{}
}
assert.NotEqual(t, ids, ids3)
assert.NotEqual(t, ids2, ids3)
time.Sleep(50 * time.Millisecond)
broker <- Message{
ID: "id:10",
}
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- monitor.WaitUntilAllEnds(time.Hour)
}()
time.Sleep(100 * time.Millisecond)
resp, err = monitor.CurrentWorkings(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, resp)
tasks = resp.GetTasks()
assert.Len(t, tasks, 1)
nextCh <- struct{}{}
assert.NoError(t, <-errCh)
}