Skip to content

Commit

Permalink
Include vhost for RabbitMQ when retrieving queue info with useRegex (k…
Browse files Browse the repository at this point in the history
  • Loading branch information
Photonios authored Mar 4, 2022
1 parent ff145ae commit 7eeb9ba
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613))
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **RabbitMQ Scaler:** Include `vhost` for RabbitMQ when retrieving queue info with `useRegex` ([#2498](https://github.com/kedacore/keda/issues/2498))

### Breaking Changes

Expand Down
24 changes: 20 additions & 4 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
rabbitModeMessageRate = "MessageRate"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitRootVhostPath = "/%2F"
)

const (
Expand Down Expand Up @@ -412,21 +413,36 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
return nil, err
}

// Extract vhost from URL's path.
vhost := parsedURL.Path

// If the URL's path only contains a slash, it represents the trailing slash and
// must be ignored because it may cause confusion with the '/' vhost.
if vhost == "/" {
vhost = ""
}

// Override vhost if requested.
if s.metadata.vhostName != nil {
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
// If the desired vhost is "All" vhosts, no path is necessary
if *s.metadata.vhostName == "" {
vhost = ""
} else {
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
}
}

if vhost == "" || vhost == "/" || vhost == "//" {
vhost = "/%2F"
// Encode the '/' vhost if necessary.
if vhost == "//" {
vhost = rabbitRootVhostPath
}

// Clear URL path to get the correct host.
parsedURL.Path = ""

var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName))
}
Expand Down
45 changes: 35 additions & 10 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ var testQueueInfoTestData = []getQueueInfoTestData{
{`Password is incorrect`, http.StatusUnauthorized, false, nil, ""},
}

var vhostPathes = []string{"/myhost", "", "/", "//", "/%2F"}
var vhostPathes = []string{"/myhost", "", "/", "//", rabbitRootVhostPath}

var testQueueInfoTestDataSingleVhost = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "//"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, rabbitRootVhostPath},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
}

Expand All @@ -216,14 +216,19 @@ func TestGetQueueInfo(t *testing.T) {

for _, testData := range allTestData {
testData := testData
expectedVhost := "myhost"

if testData.vhostPath != "/myhost" {
expectedVhost = "%2F"
var expectedVhostPath string
switch testData.vhostPath {
case "/myhost":
expectedVhostPath = "/myhost"
case rabbitRootVhostPath, "//":
expectedVhostPath = rabbitRootVhostPath
default:
expectedVhostPath = ""
}

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues/" + expectedVhost + "/evaluate_trials"
expectedPath := fmt.Sprintf("/api/queues%s/evaluate_trials", expectedVhostPath)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -325,10 +330,21 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{
{`{"items":[]}`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
}

var vhostPathesForRegex = []string{"", "/test-vh", rabbitRootVhostPath}

func TestGetQueueInfoWithRegex(t *testing.T) {
allTestData := []getQueueInfoTestData{}
for _, testData := range testRegexQueueInfoTestData {
for _, vhostPath := range vhostPathesForRegex {
testData := testData
testData.vhostPath = vhostPath
allTestData = append(allTestData, testData)
}
}

for _, testData := range allTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100"
expectedPath := fmt.Sprintf("/api/queues%s?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=100", testData.vhostPath)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -397,9 +413,18 @@ var testRegexPageSizeTestData = []getRegexPageSizeTestData{
}

func TestGetPageSizeWithRegex(t *testing.T) {
allTestData := []getRegexPageSizeTestData{}
for _, testData := range testRegexPageSizeTestData {
for _, vhostPath := range vhostPathesForRegex {
testData := testData
testData.queueInfo.vhostPath = vhostPath
allTestData = append(allTestData, testData)
}
}

for _, testData := range allTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize)
expectedPath := fmt.Sprintf("/api/queues%s?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.queueInfo.vhostPath, testData.pageSize)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down
27 changes: 27 additions & 0 deletions tests/scalers/rabbitmq-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ export class RabbitMQHelper {
)
}

static createVhost(t, namespace: string, host: string, username: string, password: string, vhostName: string) {
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, createVhostYaml.replace('{{HOST}}', host)
.replace('{{USERNAME_PASSWORD}}', `${username}:${password}`)
.replace('{{VHOST_NAME}}', vhostName)
.replace('{{VHOST_NAME}}', vhostName))
t.is(
0,
sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${namespace}`).code,
'creating a vhost should work.'
)
}

static publishMessages(t, namespace: string, connectionString: string, messageCount: number, queueName: string) {
// publish messages
const tmpFile = tmp.fileSync()
Expand Down Expand Up @@ -66,6 +79,20 @@ spec:
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}", "{{QUEUE_NAME}}"]
restartPolicy: Never`

const createVhostYaml = `apiVersion: batch/v1
kind: Job
metadata:
name: rabbitmq-create-vhost-{{VHOST_NAME}}
spec:
template:
spec:
containers:
- name: curl-client
image: curlimages/curl
imagePullPolicy: Always
command: ["curl", "-u", "{{USERNAME_PASSWORD}}", "-X", "PUT", "http://{{HOST}}/api/vhosts/{{VHOST_NAME}}"]
restartPolicy: Never`

const rabbitmqDeployYaml = `apiVersion: v1
kind: ConfigMap
metadata:
Expand Down
126 changes: 126 additions & 0 deletions tests/scalers/rabbitmq-queue-http-regex-vhost.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import * as async from 'async'
import * as fs from 'fs'
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import test from 'ava'
import { RabbitMQHelper } from './rabbitmq-helpers'
import {waitForDeploymentReplicaCount} from "./helpers";

const testNamespace = 'rabbitmq-queue-http-regex-vhost-test'
const rabbitmqNamespace = 'rabbitmq-http-regex-vhost-test'
const queueName = 'hello'
const dummyQueueName1 = 'hello-1'
const dummyQueueName2 = 'hellohellohello'
const username = "test-user"
const password = "test-password"
const vhost = "test-vh-regex"
const dummyVhost1 = "test-vh-regex-dummy-one"
const dummyVhost2 = "test-vh-regex-dummy-two"
const connectionHost = `rabbitmq.${rabbitmqNamespace}.svc.cluster.local`
const connectionHostWithAuth = `${username}:${password}@${connectionHost}`
const connectionString = `amqp://${connectionHostWithAuth}/${vhost}`
const connectionStringDummy1 = `amqp://${connectionHostWithAuth}/${dummyVhost1}`
const connectionStringDummy2 = `amqp://${connectionHostWithAuth}/${dummyVhost2}`
const messageCount = 500

test.before(t => {
RabbitMQHelper.installRabbit(t, username, password, vhost, rabbitmqNamespace)

sh.config.silent = true
// create deployment
const httpConnectionString = `http://${connectionHostWithAuth}/${vhost}`

RabbitMQHelper.createDeployment(t, testNamespace, deployYaml, connectionString, httpConnectionString, queueName)

RabbitMQHelper.createVhost(t, testNamespace, connectionHost, username, password, dummyVhost1)
RabbitMQHelper.createVhost(t, testNamespace, connectionHost, username, password, dummyVhost2)
})

test.serial('Deployment should have 0 replicas on start', t => {
const replicaCount = sh.exec(
`kubectl get deployment.apps/test-deployment --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
).stdout
t.is(replicaCount, '0', 'replica count should start out as 0')
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionStringDummy1, messageCount, dummyQueueName1)
RabbitMQHelper.publishMessages(t, testNamespace, connectionStringDummy2, messageCount, dummyQueueName2)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
t.true(await waitForDeploymentReplicaCount(0, 'test-deployment', testNamespace, 50, 5000), 'Replica count should be 0 after 3 minutes')
})

test.after.always.cb('clean up rabbitmq-queue deployment', t => {
const resources = [
'scaledobject.keda.sh/test-scaledobject',
'secret/test-secrets-api',
'deployment.apps/test-deployment',
]

for (const resource of resources) {
sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`)
}
sh.exec(`kubectl delete namespace ${testNamespace}`)
// remove rabbitmq
RabbitMQHelper.uninstallRabbit(rabbitmqNamespace)
t.end()
})

const deployYaml = `apiVersion: v1
kind: Secret
metadata:
name: test-secrets-api
data:
RabbitApiHost: {{CONNECTION_STRING_BASE64}}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-deployment
labels:
app: test-deployment
spec:
replicas: 0
selector:
matchLabels:
app: test-deployment
template:
metadata:
labels:
app: test-deployment
spec:
containers:
- name: rabbitmq-consumer
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
args:
- '{{CONNECTION_STRING}}'
envFrom:
- secretRef:
name: test-secrets-api
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: test-scaledobject
spec:
scaleTargetRef:
name: test-deployment
pollingInterval: 5
cooldownPeriod: 10
minReplicaCount: 0
maxReplicaCount: 4
triggers:
- type: rabbitmq
metadata:
queueName: "^hell.{1}$"
hostFromEnv: RabbitApiHost
protocol: http
useRegex: 'true'
operation: sum
queueLength: '50'`

0 comments on commit 7eeb9ba

Please sign in to comment.