Skip to content

Commit

Permalink
add nats-streaming e2e tests. (kedacore#1304)
Browse files Browse the repository at this point in the history
* Signed-off-by: balchua <[email protected]>
  • Loading branch information
balchua authored Nov 1, 2020
1 parent 5c760cc commit a9e118f
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 0 deletions.
189 changes: 189 additions & 0 deletions tests/scalers/stan-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import * as fs from 'fs'

export class StanHelper {
static install(t, stanNamespace: string) {
const tmpFile = tmp.fileSync()
// deploy stan
fs.writeFileSync(tmpFile.name, stanManifest)
sh.exec('kubectl create namespace stan')
t.is(
0,
sh.exec(`kubectl -n ${stanNamespace} apply -f ${tmpFile.name}`).code, 'creating stan statefulset should work.'
)
t.is(
0,
sh.exec(`kubectl -n ${stanNamespace} wait --for=condition=Ready --timeout=600s po/stan-nats-ss-0`).code, 'Stan pod should be available.'
)

}

static uninstall(t, stanNamespace: string){
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, stanManifest)
sh.exec(`kubectl -n ${stanNamespace} delete -f ${tmpFile.name}`)

}

static publishMessages(t, testNamespace: string) {
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, pubYaml)
t.is(
0,
sh.exec(`kubectl -n ${testNamespace} apply -f ${tmpFile.name}`).code, 'creating stan producer should work.'
)
}

static installConsumer(t, testNamespace: string) {
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, subYaml)
t.is(
0,
sh.exec(`kubectl -n ${testNamespace} apply -f ${tmpFile.name}`).code, 'creating stan consumer deployment should work.'
)
}

static uninstallWorkloads(t, testNamespace: string){
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, subYaml)
sh.exec(`kubectl -n ${testNamespace} delete -f ${tmpFile.name}`)
fs.writeFileSync(tmpFile.name, pubYaml)
sh.exec(`kubectl -n ${testNamespace} delete -f ${tmpFile.name}`)
}

}

const stanManifest = `
# Source: nats-ss/templates/service.yaml
apiVersion: v1
kind: Service
metadata:
name: stan-nats-ss
labels:
app: nats-ss
chart: nats-ss-0.0.1
release: stan
heritage: Helm
spec:
type: ClusterIP
ports:
- name: client
port: 4222
targetPort: 4222
protocol: TCP
- name: monitor
port: 8222
targetPort: 8222
protocol: TCP
selector:
app: nats-ss
release: stan
---
# Source: nats-ss/templates/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: stan-nats-ss
labels:
app: nats-ss
chart: nats-ss-0.0.1
release: stan
heritage: Helm
spec:
serviceName: nats-ss
replicas: 1
selector:
matchLabels:
app: nats-ss
template:
metadata:
labels:
app: nats-ss
release: stan
spec:
containers:
- name: nats-ss
image: nats-streaming:0.16.2
imagePullPolicy: IfNotPresent
command:
- /nats-streaming-server
args:
- -m=8222
- -st=FILE
- --dir=/nats-datastore
- --cluster_id=local-stan
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- mountPath: /nats-datastore
name: nats-datastore
volumes:
- name: nats-datastore
emptyDir: {}
`

const pubYaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: pub
labels:
app.kubernetes.io/name: pub
helm.sh/chart: pub-0.0.3
app.kubernetes.io/instance: pub
app.kubernetes.io/version: "0.0.3"
app.kubernetes.io/managed-by: Helm
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: pub
app.kubernetes.io/instance: pub
template:
metadata:
labels:
app.kubernetes.io/name: pub
app.kubernetes.io/instance: pub
spec:
containers:
- name: pub
image: "balchu/gonuts-pub:c02e4ee-dirty"
imagePullPolicy: Always
command: ["/app"]
args: ["-s", "nats://stan-nats-ss.stan:4222", "-d", "10", "-limit", "1000", "Test"]
`

const subYaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: sub
labels:
app.kubernetes.io/name: sub
helm.sh/chart: sub-0.0.3
app.kubernetes.io/instance: sub
app.kubernetes.io/version: "0.0.3"
app.kubernetes.io/managed-by: Helm
spec:
replicas: 0
selector:
matchLabels:
app.kubernetes.io/name: sub
app.kubernetes.io/instance: sub
template:
metadata:
labels:
app.kubernetes.io/name: sub
app.kubernetes.io/instance: sub
spec:
containers:
- name: sub
image: "balchu/gonuts-sub:c02e4ee"
imagePullPolicy: Always
command: ["/app"]
args: ["-d", "5000", "-s", "nats://stan-nats-ss.stan:4222","-d","10","--durable","ImDurable", "--qgroup", "grp1", "Test"]
`
96 changes: 96 additions & 0 deletions tests/scalers/stan.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import * as async from 'async'
import * as fs from 'fs'
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import test from 'ava'
import { StanHelper } from './stan-helpers'

const testNamespace = 'gonuts'
const stanNamespace = 'stan'
const queueName = 'test'

test.before(t => {
sh.config.silent = true
sh.exec(`kubectl create namespace gonuts`)
StanHelper.install(t, stanNamespace);
StanHelper.installConsumer(t, testNamespace)
StanHelper.publishMessages(t, testNamespace)

});

test.serial('Deployment should have 0 replicas on start', t => {
const replicaCount = sh.exec(`kubectl get deployment.apps/sub --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout

t.log('replica count: %s', replicaCount);
t.is(replicaCount, '0', 'replica count should start out as 0')

})

test.serial(`Deployment should scale to 5 with 1000 messages on the queue then back to 0`, t => {
// deploy scaler
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, scaledObjectYaml)
t.is(
0,
sh.exec(`kubectl -n ${testNamespace} apply -f ${tmpFile.name}`).code, 'creating scaledObject should work.'
)


// with messages published, the consumer deployment should start receiving the messages
let replicaCount = '0'
for (let i = 0; i < 10 && replicaCount !== '5'; i++) {
replicaCount = sh.exec(
`kubectl get deployment.apps/sub --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
).stdout
t.log('replica count is:' + replicaCount)
if (replicaCount !== '5') {
sh.exec('sleep 5s')
}
}

t.is('5', replicaCount, 'Replica count should be 5 after 10 seconds')

for (let i = 0; i < 50 && replicaCount !== '0'; i++) {
replicaCount = sh.exec(
`kubectl get deployment.apps/sub --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
).stdout
if (replicaCount !== '0') {
sh.exec('sleep 5s')
}
}

t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes')
})

test.after.always.cb('clean up stan deployment', t => {
sh.exec(`kubectl -n ${testNamespace} delete scaledobject.keda.sh/stan-scaledobject`)

StanHelper.uninstall(t, stanNamespace)
sh.exec(`kubectl delete namespace ${stanNamespace}`)
StanHelper.uninstallWorkloads(t, testNamespace)
sh.exec(`kubectl delete namespace ${testNamespace}`)
t.end()
})


const scaledObjectYaml = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: stan-scaledobject
spec:
pollingInterval: 3
cooldownPeriod: 10
minReplicaCount: 0
maxReplicaCount: 5
scaleTargetRef:
name: sub
triggers:
- type: stan
metadata:
natsServerMonitoringEndpoint: "stan-nats-ss.stan:8222"
queueGroup: "grp1"
durableName: "ImDurable"
subject: "Test"
lagThreshold: "10"
`

0 comments on commit a9e118f

Please sign in to comment.