Skip to content

Commit

Permalink
refactor aws sqs queue scaler configuration (kedacore#6358)
Browse files Browse the repository at this point in the history
Signed-off-by: Omer Aplatony <[email protected]>
  • Loading branch information
omerap12 authored Nov 27, 2024
1 parent 29400ed commit 5c58849
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 103 deletions.
115 changes: 23 additions & 92 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
defaultTargetQueueLength = 5
targetQueueLengthDefault = 5
activationTargetQueueLengthDefault = 0
defaultScaleOnInFlight = true
defaultScaleOnDelayed = false
)

type awsSqsQueueScaler struct {
metricType v2.MetricTargetType
metadata *awsSqsQueueMetadata
Expand All @@ -35,16 +27,16 @@ type awsSqsQueueScaler struct {
}

type awsSqsQueueMetadata struct {
targetQueueLength int64
activationTargetQueueLength int64
queueURL string
TargetQueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"`
ActivationTargetQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"`
QueueURL string `keda:"name=queueURL;queueURLFromEnv, order=triggerMetadata;resolvedEnv"`
queueName string
awsRegion string
awsEndpoint string
AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"`
AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
awsAuthorization awsutils.AuthorizationMetadata
triggerIndex int
scaleOnInFlight bool
scaleOnDelayed bool
ScaleOnInFlight bool `keda:"name=scaleOnInFlight, order=triggerMetadata, default=true"`
ScaleOnDelayed bool `keda:"name=scaleOnDelayed, order=triggerMetadata, default=false"`
awsSqsQueueMetricNames []types.QueueAttributeName
}

Expand All @@ -57,7 +49,7 @@ func NewAwsSqsQueueScaler(ctx context.Context, config *scalersconfig.ScalerConfi

logger := InitializeLogger(config, "aws_sqs_queue_scaler")

meta, err := parseAwsSqsQueueMetadata(config, logger)
meta, err := parseAwsSqsQueueMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing SQS queue metadata: %w", err)
}
Expand Down Expand Up @@ -87,77 +79,26 @@ func (w sqsWrapperClient) GetQueueAttributes(ctx context.Context, params *sqs.Ge
return w.sqsClient.GetQueueAttributes(ctx, params, optFns...)
}

func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsSqsQueueMetadata, error) {
meta := awsSqsQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength
meta.scaleOnInFlight = defaultScaleOnInFlight
meta.scaleOnDelayed = defaultScaleOnDelayed

if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" {
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
meta.targetQueueLength = targetQueueLengthDefault
logger.Error(err, "Error parsing SQS queue metadata queueLength, using default %n", targetQueueLengthDefault)
} else {
meta.targetQueueLength = queueLength
}
}

if val, ok := config.TriggerMetadata["activationQueueLength"]; ok && val != "" {
activationQueueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
meta.activationTargetQueueLength = activationTargetQueueLengthDefault
logger.Error(err, "Error parsing SQS queue metadata activationQueueLength, using default %n", activationTargetQueueLengthDefault)
} else {
meta.activationTargetQueueLength = activationQueueLength
}
}

if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" {
scaleOnDelayed, err := strconv.ParseBool(val)
if err != nil {
meta.scaleOnDelayed = defaultScaleOnDelayed
logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed)
} else {
meta.scaleOnDelayed = scaleOnDelayed
}
}
func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig) (*awsSqsQueueMetadata, error) {
meta := &awsSqsQueueMetadata{}

if val, ok := config.TriggerMetadata["scaleOnInFlight"]; ok && val != "" {
scaleOnInFlight, err := strconv.ParseBool(val)
if err != nil {
meta.scaleOnInFlight = defaultScaleOnInFlight
logger.Error(err, "Error parsing SQS queue metadata scaleOnInFlight, using default %n", defaultScaleOnInFlight)
} else {
meta.scaleOnInFlight = scaleOnInFlight
}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing SQS queue metadata: %w", err)
}

meta.awsSqsQueueMetricNames = []types.QueueAttributeName{}
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessages)
if meta.scaleOnInFlight {
if meta.ScaleOnInFlight {
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)
}
if meta.scaleOnDelayed {
if meta.ScaleOnDelayed {
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessagesDelayed)
}

if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" {
meta.queueURL = val
} else if val, ok := config.TriggerMetadata["queueURLFromEnv"]; ok && val != "" {
if val, ok := config.ResolvedEnv[val]; ok && val != "" {
meta.queueURL = val
} else {
return nil, fmt.Errorf("queueURLFromEnv `%s` env variable value is empty", config.TriggerMetadata["queueURLFromEnv"])
}
} else {
return nil, fmt.Errorf("no queueURL given")
}

queueURL, err := url.ParseRequestURI(meta.queueURL)
queueURL, err := url.ParseRequestURI(meta.QueueURL)
if err != nil {
// queueURL is not a valid URL, using it as queueName
meta.queueName = meta.queueURL
meta.queueName = meta.QueueURL
} else {
queueURLPath := queueURL.Path
queueURLPathParts := strings.Split(queueURLPath, "/")
Expand All @@ -168,16 +109,6 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
meta.queueName = queueURLPathParts[2]
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return nil, err
Expand All @@ -187,17 +118,17 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo

meta.triggerIndex = config.TriggerIndex

return &meta, nil
return meta, nil
}

func createSqsClient(ctx context.Context, metadata *awsSqsQueueMetadata) (*sqs.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
if err != nil {
return nil, err
}
return sqs.NewFromConfig(*cfg, func(options *sqs.Options) {
if metadata.awsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
if metadata.AwsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
}
}), nil
}
Expand All @@ -212,7 +143,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling(context.Context) []v2.Metric
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-sqs-%s", s.metadata.queueName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength),
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueLength),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -229,14 +160,14 @@ func (s *awsSqsQueueScaler) GetMetricsAndActivity(ctx context.Context, metricNam

metric := GenerateMetricInMili(metricName, float64(queuelen))

return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationTargetQueueLength, nil
}

// Get SQS Queue Length
func (s *awsSqsQueueScaler) getAwsSqsQueueLength(ctx context.Context) (int64, error) {
input := &sqs.GetQueueAttributesInput{
AttributeNames: s.metadata.awsSqsQueueMetricNames,
QueueUrl: aws.String(s.metadata.queueURL),
QueueUrl: aws.String(s.metadata.QueueURL),
}

output, err := s.sqsWrapperClient.GetQueueAttributes(ctx, input)
Expand Down
22 changes: 11 additions & 11 deletions pkg/scalers/aws_sqs_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{
"awsRegion": "eu-west-1"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"properly formed queue, invalid queueLength"},
true,
"invalid integer value for queueLength"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
Expand All @@ -162,8 +162,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{
"awsRegion": "eu-west-1"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"properly formed queue, invalid activationQueueLength"},
true,
"invalid integer value for activationQueueLength"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
Expand Down Expand Up @@ -304,7 +304,7 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{
map[string]string{
"QUEUE_URL": "",
},
true,
false,
"empty QUEUE_URL env value"},
}

Expand Down Expand Up @@ -392,7 +392,7 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{

func TestSQSParseMetadata(t *testing.T) {
for _, testData := range testAWSSQSMetadata {
_, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}, logr.Discard())
_, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
}
Expand All @@ -405,7 +405,7 @@ func TestSQSParseMetadata(t *testing.T) {
func TestAWSSQSGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsSQSMetricIdentifiers {
ctx := context.Background()
meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard())
meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand All @@ -421,24 +421,24 @@ func TestAWSSQSGetMetricSpecForScaling(t *testing.T) {

func TestAWSSQSScalerGetMetrics(t *testing.T) {
for index, testData := range awsSQSGetMetricTestData {
meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, TriggerIndex: index}, logr.Discard())
meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, TriggerIndex: index})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
scaler := awsSqsQueueScaler{"", meta, &mockSqs{}, logr.Discard()}

value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName")
switch meta.queueURL {
switch meta.QueueURL {
case testAWSSQSErrorQueueURL:
assert.Error(t, err, "expect error because of sqs api error")
case testAWSSQSBadDataQueueURL:
assert.Error(t, err, "expect error because of bad data return from sqs")
default:
expectedMessages := testAWSSQSApproximateNumberOfMessagesVisible
if meta.scaleOnInFlight {
if meta.ScaleOnInFlight {
expectedMessages += testAWSSQSApproximateNumberOfMessagesNotVisible
}
if meta.scaleOnDelayed {
if meta.ScaleOnDelayed {
expectedMessages += testAWSSQSApproximateNumberOfMessagesDelayed
}
assert.EqualValues(t, int64(expectedMessages), value[0].Value.Value())
Expand Down

0 comments on commit 5c58849

Please sign in to comment.