forked from kedacore/keda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathazure_eventhub_test.go
297 lines (248 loc) · 10.4 KB
/
azure_eventhub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package scalers
import (
"context"
"fmt"
"net/url"
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
eventhub "github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-storage-blob-go/azblob"
)
const (
eventHubConsumerGroup = "testEventHubConsumerGroup"
eventHubConnectionSetting = "testEventHubConnectionSetting"
storageConnectionSetting = "testStorageConnectionSetting"
testEventHubNamespace = "kedatesteventhub"
testEventHubName = "eventhub1"
checkpointFormat = "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\"}"
testContainerName = "azure-webjobs-eventhub"
)
type parseEventHubMetadataTestData struct {
metadata map[string]string
isError bool
}
type resolvedEnvTestData struct {
resolvedEnv map[string]string
isError bool
}
var sampleEventHubResolvedEnv = map[string]string{eventHubConnectionSetting: "none", storageConnectionSetting: "none"}
var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{
{map[string]string{}, true},
// properly formed event hub metadata
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
// missing event hub connection setting
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15"}, true},
// missing storage connection setting
{map[string]string{"consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true},
// missing event hub consumer group - should replace with default
{map[string]string{"storageConnection": storageConnectionSetting, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
// missing unprocessed event threshold - should replace with default
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting}, false},
// added blob container details
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "blobContainer": testContainerName}, false},
}
var testEventHubScaler = AzureEventHubScaler{
metadata: &EventHubMetadata{
eventHubConnection: "none",
storageConnection: "none",
},
}
func TestParseEventHubMetadata(t *testing.T) {
// Test first with valid resolved environment
for _, testData := range parseEventHubMetadataDataset {
_, err := parseAzureEventHubMetadata(testData.metadata, sampleEventHubResolvedEnv)
if err != nil && !testData.isError {
t.Errorf("Expected success but got error: %s", err)
}
if testData.isError && err == nil {
t.Error("Expected error and got success")
}
}
}
func TestGetUnprocessedEventCountInPartition(t *testing.T) {
t.Log("This test will use the environment variable EVENTHUB_CONNECTION_STRING and STORAGE_CONNECTION_STRING if it is set.")
t.Log("If set, it will connect to the storage account and event hub to determine how many messages are in the event hub.")
t.Logf("EventHub has 1 message in partition 0 and 0 messages in partition 1")
eventHubKey := os.Getenv("AZURE_EVENTHUB_KEY")
storageConnectionString := os.Getenv("TEST_STORAGE_CONNECTION_STRING")
if eventHubKey != "" && storageConnectionString != "" {
eventHubConnectionString := fmt.Sprintf("Endpoint=sb://%s.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=%s;EntityPath=%s", testEventHubNamespace, eventHubKey, testEventHubName)
storageAccountName := strings.Split(strings.Split(storageConnectionString, ";")[1], "=")[1]
t.Log("Creating event hub client...")
hubOption := eventhub.HubWithPartitionedSender("0")
client, err := eventhub.NewHubFromConnectionString(eventHubConnectionString, hubOption)
if err != nil {
t.Errorf("Expected to create event hub client but got error: %s", err)
}
_, storageCredentials, err := GetStorageCredentials(storageConnectionString)
if err != nil {
t.Errorf("Expected to generate storage credentials but got error: %s", err)
}
if eventHubConnectionString == "" {
t.Fatal("Event hub connection string needed for test")
}
if storageConnectionString == "" {
t.Fatal("Storage connection string needed for test")
}
// Can actually test that numbers return
testEventHubScaler.metadata.eventHubConnection = eventHubConnectionString
testEventHubScaler.metadata.storageConnection = storageConnectionString
testEventHubScaler.client = client
testEventHubScaler.storageCredentials = storageCredentials
testEventHubScaler.metadata.eventHubConsumerGroup = "$Default"
// Send 1 message to event hub first
t.Log("Sending message to event hub")
err = SendMessageToEventHub(client)
if err != nil {
t.Error(err)
}
// Create fake checkpoint with path azure-webjobs-eventhub/<eventhub-namespace-name>.servicebus.windows.net/<eventhub-name>/$Default
t.Log("Creating container..")
ctx, err := CreateNewCheckpointInStorage(storageAccountName, storageCredentials, client)
if err != nil {
t.Errorf("err creating container: %s", err)
}
unprocessedEventCountInPartition0, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0")
unprocessedEventCountInPartition1, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1")
if err0 != nil {
t.Errorf("Expected success but got error: %s", err0)
}
if err1 != nil {
t.Errorf("Expected success but got error: %s", err1)
}
if unprocessedEventCountInPartition0 != 1 {
t.Errorf("Expected 1 message in partition 0, got %d", unprocessedEventCountInPartition0)
}
if unprocessedEventCountInPartition1 != 0 {
t.Errorf("Expected 0 messages in partition 1, got %d", unprocessedEventCountInPartition1)
}
// Delete container - this will also delete checkpoint
t.Log("Deleting container...")
err = DeleteContainerInStorage(ctx, storageAccountName, storageCredentials)
if err != nil {
t.Error(err)
}
}
}
const csharpSdkCheckpoint = `{
"Epoch": 123456,
"Offset": "test offset",
"Owner": "test owner",
"PartitionId": "test partitionId",
"SequenceNumber": 12345
}`
const pythonSdkCheckpoint = `{
"epoch": 123456,
"offset": "test offset",
"owner": "test owner",
"partition_id": "test partitionId",
"sequence_number": 12345
}`
func TestGetCheckpoint(t *testing.T) {
cckp, err := getCheckpoint([]byte(csharpSdkCheckpoint))
if err != nil {
t.Error(err)
}
pckp, err := getCheckpoint([]byte(pythonSdkCheckpoint))
if err != nil {
t.Error(err)
}
assert.Equal(t, cckp, pckp)
}
func CreateNewCheckpointInStorage(storageAccountName string, credential *azblob.SharedKeyCredential, client *eventhub.Hub) (context.Context, error) {
urlPath := fmt.Sprintf("%s.servicebus.windows.net/%s/$Default/", testEventHubNamespace, testEventHubName)
// Create container
ctx := context.Background()
url, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", storageAccountName, testContainerName))
containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{}))
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
return ctx, fmt.Errorf("failed to create container: %s", err)
}
// Create directory checkpoints will be in
err = os.MkdirAll(urlPath, 0777)
if err != nil {
return ctx, fmt.Errorf("Unable to create directory: %s", err)
}
defer os.RemoveAll(urlPath)
file, err := os.Create(fmt.Sprintf("%s/file", urlPath))
if err != nil {
return ctx, fmt.Errorf("Unable to create folder: %s", err)
}
defer file.Close()
blobFolderURL := containerURL.NewBlockBlobURL(urlPath)
// Upload file
_, err = azblob.UploadFileToBlockBlob(ctx, file, blobFolderURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
if err != nil {
return ctx, fmt.Errorf("Err uploading file to blob: %s", err)
}
// Make checkpoint blob files
if err := CreatePartitionFile(ctx, urlPath, "0", containerURL, client); err != nil {
return ctx, fmt.Errorf("failed to create partitionID 0 file: %s", err)
}
if err := CreatePartitionFile(ctx, urlPath, "1", containerURL, client); err != nil {
return ctx, fmt.Errorf("failed to create partitionID 1 file: %s", err)
}
return ctx, nil
}
func CreatePartitionFile(ctx context.Context, urlPathToPartition string, partitionID string, containerURL azblob.ContainerURL, client *eventhub.Hub) error {
// Create folder structure
filePath := urlPathToPartition + partitionID
partitionInfo, err := client.GetPartitionInformation(ctx, partitionID)
if err != nil {
return fmt.Errorf("unable to get partition info: %s", err)
}
f, err := os.Create(partitionID)
if err != nil {
return fmt.Errorf("unable to create file: %s", err)
}
if partitionID == "0" {
_, err = f.WriteString(fmt.Sprintf(checkpointFormat, partitionInfo.LastSequenceNumber-1, partitionID))
if err != nil {
return fmt.Errorf("unable to write to file: %s", err)
}
} else {
_, err = f.WriteString(fmt.Sprintf(checkpointFormat, partitionInfo.LastSequenceNumber, partitionID))
if err != nil {
return fmt.Errorf("unable to write to file: %s", err)
}
}
// Write checkpoints to file
file, err := os.Open(partitionID)
if err != nil {
return fmt.Errorf("Unable to create file: %s", err)
}
defer file.Close()
blobFileURL := containerURL.NewBlockBlobURL(filePath)
// Upload folder
_, err = azblob.UploadFileToBlockBlob(ctx, file, blobFileURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
if err != nil {
return fmt.Errorf("Err uploading file to blob: %s", err)
}
return nil
}
func SendMessageToEventHub(client *eventhub.Hub) error {
ctx := context.Background()
err := client.Send(ctx, eventhub.NewEventFromString("1"))
if err != nil {
return fmt.Errorf("Error sending msg: %s", err)
}
return nil
}
func DeleteContainerInStorage(ctx context.Context, storageAccountName string, credential *azblob.SharedKeyCredential) error {
url, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", storageAccountName, testContainerName))
containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{}))
_, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{
ModifiedAccessConditions: azblob.ModifiedAccessConditions{},
})
if err != nil {
return fmt.Errorf("failed to delete container in blob storage: %s", err)
}
return nil
}