-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathstate_test.go
175 lines (166 loc) · 5.97 KB
/
state_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright 2018-2020 (c) Cognizant Digital Business, Evolutionary AI. All rights reserved. Issued under the Apache 2.0 License.
package main
import (
"flag"
"github.com/leaf-ai/go-service/pkg/server"
"testing"
// MIT License
)
var (
useK8s = flag.Bool("use-k8s", false, "Enables any Kubernetes cluster specific tests, even if Kubernetes is not present")
skipCheckK8s = flag.Bool("skip-k8s", false, "Skip Kubernetes liveness checks for tests that can run with or without it")
)
// This file contains the implementation of a test that will simulate a state change
// for the server and will verify that the schedulers respond appropriately. States
// are controlled using kubernetes and so this test will exercise the state management
// without using the k8s modules, these are tested separately
// TestStates will exercise the internal changing of states within the queue processing
// of the server. It tests the state changes without using the kubernetes side. The k8s
// testing is done in a specific test case that just tests that component when the
// test is run within a working cluster. To do this properly k8s should be used with a
// bundled rabbitMQ server.
func TestStates(t *testing.T) {
if err := server.IsAliveK8s(); err != nil && !*useK8s {
t.Skip("kubernetes specific testing disabled")
}
return
//// We need a queuing system up and running because the states and queue states that
//// are tracked in prometheus will only update in our production code when the
//// scheduler actually finds a reference to some queuing
//if err := runner.PingRMQServer(*amqpURL, *amqpMgtURL); err != nil {
// t.Fatal(err)
//}
//
//// send bogus updates by instrumenting the lifecycle listeners in cmd/runner/k8s.go
//select {
//case server.K8sStateUpdates().Master <- server.K8sStateUpdate{State: types.K8sRunning}:
//case <-time.After(time.Second):
// t.Fatal("state change could not be sent, no master was listening")
//}
//
//pClient := NewPrometheusClient("http://localhost:9090/metrics")
//
//foundRefreshers := false
//timeout := time.NewTicker(time.Minute)
//timeout.Stop()
//
//for !foundRefreshers {
// select {
// case <-timeout.C:
// t.Fatal()
// case <-time.After(2 * time.Second):
// metrics, err := pClient.Fetch("runner_queue_")
// if err != nil {
// t.Fatal(err)
// }
// for k := range metrics {
// if strings.Contains(k, "runner_queue_checked") {
// foundRefreshers = true
// }
// }
// }
//}
//timeout.Stop()
//
//// send bogus updates by instrumenting the lifecycle listeners in cmd/runner/k8s.go
//select {
//case server.K8sStateUpdates().Master <- server.K8sStateUpdate{State: types.K8sDrainAndSuspend}:
//case <-time.After(time.Second):
// t.Fatal("state change could not be sent, no master was listening")
//}
//
//// Retrieve prometheus counters to aws, and rabbit queue implementations
//
//defer func() {
// logger.Info("server state returning to running")
//
// select {
// case server.K8sStateUpdates().Master <- server.K8sStateUpdate{State: types.K8sRunning}:
// case <-time.After(time.Second):
// logger.Warn("state reset could not be sent, no master was listening")
// }
//}()
//
//timer := time.NewTicker(time.Second)
//
//// see what the prometheus counters do and make sure they match our drained state
//func() {
// defer timer.Stop()
//
// ignoredChanged := time.Now() // Tracks when the ignored metric changed
// ignored := 0 // This counts all of the ignored queues
// ignoredSinceLastChecked := 0 // This counts the times the ignored counter was bump since the last check counter change
//
// checkedChanged := time.Now() // Tracks when the checked metric changed
// checked := 0 // This count the last known number of checked queues
//
// for {
// select {
// case <-timer.C:
// metrics, err := pClient.Fetch("runner_queue_")
// if err != nil {
// t.Fatal(err)
// }
// for k, metric := range metrics {
// switch k {
// case "runner_queue_ignored":
// // Track the number of ignored queue checks. We want this
// // to increase in this case for a time without the
// // successful checks increasing. This will validate that
// // the drained state is being respected by the server
// total := 0
// for _, m := range metric.GetMetric() {
// total += int(*m.GetCounter().Value)
// }
// // If we have yet to get any ignored tracking we initialize it
// if ignored == 0 {
// ignoredChanged = time.Now()
// ignored = total
// continue
// }
// // Track the number of times that the ignored count is stable
// if ignored != total {
// ignoredChanged = time.Now()
// ignored = total
// ignoredSinceLastChecked++
// continue
// }
//
// case "runner_queue_checked":
// total := 0
// for _, m := range metric.GetMetric() {
// total += int(*m.GetCounter().Value)
// }
// // If we have yet to get any checked tracking we initialize it
// if checked == 0 {
// checkedChanged = time.Now()
// checked = total
// continue
// }
// // Track the number of times that the checked count is stable
// if checked != total {
// ignoredSinceLastChecked = 0
// checkedChanged = time.Now()
// checked = total
// continue
// }
// }
// } // End of for k, v := range metrics
// }
// // The checked counters should not have changed after the ignored counters were,
// // if so the server has not yet respected the change in state
// if ignoredChanged.Before(checkedChanged) {
// continue
// }
//
// // If the ignored counter was modified at least twice since the last
// // checked total changed then we assume the server has respected the change
// if ignoredSinceLastChecked >= 2 {
// return
// }
// }
//}()
// Consider someway to combining some elements of the three of them
// Consider splitting out the lifecycle listeners channel side into a channel pattern library
// done
}