From 5c58849f2eb38409d60c418e81664c970127dbb5 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Wed, 27 Nov 2024 23:36:44 +0200 Subject: [PATCH] refactor aws sqs queue scaler configuration (#6358) Signed-off-by: Omer Aplatony --- pkg/scalers/aws_sqs_queue_scaler.go | 115 +++++------------------ pkg/scalers/aws_sqs_queue_scaler_test.go | 22 ++--- 2 files changed, 34 insertions(+), 103 deletions(-) diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 1de9bf7f285..6f1b6b5d0ee 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -19,14 +19,6 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) -const ( - defaultTargetQueueLength = 5 - targetQueueLengthDefault = 5 - activationTargetQueueLengthDefault = 0 - defaultScaleOnInFlight = true - defaultScaleOnDelayed = false -) - type awsSqsQueueScaler struct { metricType v2.MetricTargetType metadata *awsSqsQueueMetadata @@ -35,16 +27,16 @@ type awsSqsQueueScaler struct { } type awsSqsQueueMetadata struct { - targetQueueLength int64 - activationTargetQueueLength int64 - queueURL string + TargetQueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"` + ActivationTargetQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"` + QueueURL string `keda:"name=queueURL;queueURLFromEnv, order=triggerMetadata;resolvedEnv"` queueName string - awsRegion string - awsEndpoint string + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` awsAuthorization awsutils.AuthorizationMetadata triggerIndex int - scaleOnInFlight bool - scaleOnDelayed bool + ScaleOnInFlight bool `keda:"name=scaleOnInFlight, order=triggerMetadata, default=true"` + ScaleOnDelayed bool `keda:"name=scaleOnDelayed, order=triggerMetadata, default=false"` awsSqsQueueMetricNames []types.QueueAttributeName } @@ -57,7 +49,7 @@ func NewAwsSqsQueueScaler(ctx context.Context, config *scalersconfig.ScalerConfi logger := InitializeLogger(config, "aws_sqs_queue_scaler") - meta, err := parseAwsSqsQueueMetadata(config, logger) + meta, err := parseAwsSqsQueueMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing SQS queue metadata: %w", err) } @@ -87,77 +79,26 @@ func (w sqsWrapperClient) GetQueueAttributes(ctx context.Context, params *sqs.Ge return w.sqsClient.GetQueueAttributes(ctx, params, optFns...) } -func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsSqsQueueMetadata, error) { - meta := awsSqsQueueMetadata{} - meta.targetQueueLength = defaultTargetQueueLength - meta.scaleOnInFlight = defaultScaleOnInFlight - meta.scaleOnDelayed = defaultScaleOnDelayed - - if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" { - queueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.targetQueueLength = targetQueueLengthDefault - logger.Error(err, "Error parsing SQS queue metadata queueLength, using default %n", targetQueueLengthDefault) - } else { - meta.targetQueueLength = queueLength - } - } - - if val, ok := config.TriggerMetadata["activationQueueLength"]; ok && val != "" { - activationQueueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.activationTargetQueueLength = activationTargetQueueLengthDefault - logger.Error(err, "Error parsing SQS queue metadata activationQueueLength, using default %n", activationTargetQueueLengthDefault) - } else { - meta.activationTargetQueueLength = activationQueueLength - } - } - - if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" { - scaleOnDelayed, err := strconv.ParseBool(val) - if err != nil { - meta.scaleOnDelayed = defaultScaleOnDelayed - logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed) - } else { - meta.scaleOnDelayed = scaleOnDelayed - } - } +func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig) (*awsSqsQueueMetadata, error) { + meta := &awsSqsQueueMetadata{} - if val, ok := config.TriggerMetadata["scaleOnInFlight"]; ok && val != "" { - scaleOnInFlight, err := strconv.ParseBool(val) - if err != nil { - meta.scaleOnInFlight = defaultScaleOnInFlight - logger.Error(err, "Error parsing SQS queue metadata scaleOnInFlight, using default %n", defaultScaleOnInFlight) - } else { - meta.scaleOnInFlight = scaleOnInFlight - } + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing SQS queue metadata: %w", err) } meta.awsSqsQueueMetricNames = []types.QueueAttributeName{} meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessages) - if meta.scaleOnInFlight { + if meta.ScaleOnInFlight { meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessagesNotVisible) } - if meta.scaleOnDelayed { + if meta.ScaleOnDelayed { meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessagesDelayed) } - if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" { - meta.queueURL = val - } else if val, ok := config.TriggerMetadata["queueURLFromEnv"]; ok && val != "" { - if val, ok := config.ResolvedEnv[val]; ok && val != "" { - meta.queueURL = val - } else { - return nil, fmt.Errorf("queueURLFromEnv `%s` env variable value is empty", config.TriggerMetadata["queueURLFromEnv"]) - } - } else { - return nil, fmt.Errorf("no queueURL given") - } - - queueURL, err := url.ParseRequestURI(meta.queueURL) + queueURL, err := url.ParseRequestURI(meta.QueueURL) if err != nil { // queueURL is not a valid URL, using it as queueName - meta.queueName = meta.queueURL + meta.queueName = meta.QueueURL } else { queueURLPath := queueURL.Path queueURLPathParts := strings.Split(queueURLPath, "/") @@ -168,16 +109,6 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo meta.queueName = queueURLPathParts[2] } - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) if err != nil { return nil, err @@ -187,17 +118,17 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func createSqsClient(ctx context.Context, metadata *awsSqsQueueMetadata) (*sqs.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, err } return sqs.NewFromConfig(*cfg, func(options *sqs.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }), nil } @@ -212,7 +143,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling(context.Context) []v2.Metric Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-sqs-%s", s.metadata.queueName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength), + Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueLength), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -229,14 +160,14 @@ func (s *awsSqsQueueScaler) GetMetricsAndActivity(ctx context.Context, metricNam metric := GenerateMetricInMili(metricName, float64(queuelen)) - return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil + return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationTargetQueueLength, nil } // Get SQS Queue Length func (s *awsSqsQueueScaler) getAwsSqsQueueLength(ctx context.Context) (int64, error) { input := &sqs.GetQueueAttributesInput{ AttributeNames: s.metadata.awsSqsQueueMetricNames, - QueueUrl: aws.String(s.metadata.queueURL), + QueueUrl: aws.String(s.metadata.QueueURL), } output, err := s.sqsWrapperClient.GetQueueAttributes(ctx, input) diff --git a/pkg/scalers/aws_sqs_queue_scaler_test.go b/pkg/scalers/aws_sqs_queue_scaler_test.go index 6e0b065ab07..fb5db401ecc 100644 --- a/pkg/scalers/aws_sqs_queue_scaler_test.go +++ b/pkg/scalers/aws_sqs_queue_scaler_test.go @@ -144,8 +144,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ "awsRegion": "eu-west-1"}, testAWSSQSAuthentication, testAWSSQSEmptyResolvedEnv, - false, - "properly formed queue, invalid queueLength"}, + true, + "invalid integer value for queueLength"}, {map[string]string{ "queueURL": testAWSSQSProperQueueURL, "queueLength": "1", @@ -162,8 +162,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ "awsRegion": "eu-west-1"}, testAWSSQSAuthentication, testAWSSQSEmptyResolvedEnv, - false, - "properly formed queue, invalid activationQueueLength"}, + true, + "invalid integer value for activationQueueLength"}, {map[string]string{ "queueURL": testAWSSQSProperQueueURL, "queueLength": "1", @@ -304,7 +304,7 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ map[string]string{ "QUEUE_URL": "", }, - true, + false, "empty QUEUE_URL env value"}, } @@ -392,7 +392,7 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{ func TestSQSParseMetadata(t *testing.T) { for _, testData := range testAWSSQSMetadata { - _, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}, logr.Discard()) + _, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) } @@ -405,7 +405,7 @@ func TestSQSParseMetadata(t *testing.T) { func TestAWSSQSGetMetricSpecForScaling(t *testing.T) { for _, testData := range awsSQSMetricIdentifiers { ctx := context.Background() - meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } @@ -421,24 +421,24 @@ func TestAWSSQSGetMetricSpecForScaling(t *testing.T) { func TestAWSSQSScalerGetMetrics(t *testing.T) { for index, testData := range awsSQSGetMetricTestData { - meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, TriggerIndex: index}, logr.Discard()) + meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, TriggerIndex: index}) if err != nil { t.Fatal("Could not parse metadata:", err) } scaler := awsSqsQueueScaler{"", meta, &mockSqs{}, logr.Discard()} value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName") - switch meta.queueURL { + switch meta.QueueURL { case testAWSSQSErrorQueueURL: assert.Error(t, err, "expect error because of sqs api error") case testAWSSQSBadDataQueueURL: assert.Error(t, err, "expect error because of bad data return from sqs") default: expectedMessages := testAWSSQSApproximateNumberOfMessagesVisible - if meta.scaleOnInFlight { + if meta.ScaleOnInFlight { expectedMessages += testAWSSQSApproximateNumberOfMessagesNotVisible } - if meta.scaleOnDelayed { + if meta.ScaleOnDelayed { expectedMessages += testAWSSQSApproximateNumberOfMessagesDelayed } assert.EqualValues(t, int64(expectedMessages), value[0].Value.Value())