Skip to content

Commit

Permalink
add FromEnv suffix to metadata resolved from env (kedacore#1072)
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed ElSayed <[email protected]>

Co-authored-by: Tomek Urbaszek <[email protected]>

Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
ahmelsayed and turbaszek authored Sep 3, 2020
1 parent ccdfb8d commit 9a904cc
Show file tree
Hide file tree
Showing 22 changed files with 284 additions and 352 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- Introduce a separate ScaledObject and ScaledJob([#653](https://github.com/kedacore/keda/issues/653))
- Remove `New()` and `Close()` from the interface of `service ExternalScaler` in `externalscaler.proto`.
- Removed deprecated brokerList for Kafka scaler ([#882](https://github.com/kedacore/keda/pull/882))
- All scalers metadata that is resolved from the scaleTarget environment have suffix `FromEnv` added. e.g: `connection` -> `connectionFromEnv`

### Other
- Update Operator SDK and k8s deps ([#1007](https://github.com/kedacore/keda/pull/1007),[#870](https://github.com/kedacore/keda/issues/870))
Expand Down
28 changes: 14 additions & 14 deletions pkg/scalers/aws_iam_authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import "fmt"
const (
awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
awsSecretAccessKeyEnvVar = "AWS_SECRET_ACCESS_KEY"
awsSessionTokenEnvVar = "AWS_SESSION_TOKEN"
)

type awsAuthorizationMetadata struct {
Expand Down Expand Up @@ -34,23 +33,24 @@ func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (a
}
meta.awsSecretAccessKey = authParams["awsSecretAccessKey"]
} else {
var keyName string
if keyName = metadata["awsAccessKeyID"]; keyName == "" {
keyName = awsAccessKeyIDEnvVar
if metadata["awsAccessKeyID"] != "" {
meta.awsAccessKeyID = metadata["awsAccessKeyID"]
} else if metadata["awsAccessKeyIDFromEnv"] != "" {
meta.awsAccessKeyID = resolvedEnv[metadata["awsAccessKeyID"]]
}
if val, ok := resolvedEnv[keyName]; ok && val != "" {
meta.awsAccessKeyID = val
} else {
return meta, fmt.Errorf("'%s' doesn't exist in the deployment environment", keyName)

if len(meta.awsAccessKeyID) == 0 {
return meta, fmt.Errorf("awsAccessKeyID not found")
}

if keyName = metadata["awsSecretAccessKey"]; keyName == "" {
keyName = awsSecretAccessKeyEnvVar
if metadata["awsSecretAccessKey"] != "" {
meta.awsSecretAccessKey = metadata["awsSecretAccessKey"]
} else if metadata["awsSecretAccessKeyFromEnv"] != "" {
meta.awsSecretAccessKey = resolvedEnv[metadata["awsSecretAccessKeyFromEnv"]]
}
if val, ok := resolvedEnv[keyName]; ok && val != "" {
meta.awsSecretAccessKey = val
} else {
return meta, fmt.Errorf("'%s' doesn't exist in the deployment environment", keyName)

if len(meta.awsSecretAccessKey) == 0 {
return meta, fmt.Errorf("awsSecretAccessKey not found")
}
}
}
Expand Down
53 changes: 21 additions & 32 deletions pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import (
)

const (
blobCountMetricName = "blobCount"
defaultTargetBlobCount = 5
defaultBlobDelimiter = "/"
defaultBlobPrefix = ""
defaultBlobConnectionSetting = "AzureWebJobsStorage"
blobCountMetricName = "blobCount"
defaultTargetBlobCount = 5
defaultBlobDelimiter = "/"
defaultBlobPrefix = ""
)

type azureBlobScaler struct {
Expand Down Expand Up @@ -74,44 +73,34 @@ func parseAzureBlobMetadata(metadata, resolvedEnv, authParams map[string]string,
return nil, "", fmt.Errorf("no blobContainerName given")
}

if val, ok := metadata["blobDelimiter"]; ok {
if val != "" {
meta.blobDelimiter = val
}
if val, ok := metadata["blobDelimiter"]; ok && val != "" {
meta.blobDelimiter = val
}

if val, ok := metadata["blobPrefix"]; ok {
if val != "" {
meta.blobPrefix = val + meta.blobDelimiter
}
if val, ok := metadata["blobPrefix"]; ok && val != "" {
meta.blobPrefix = val + meta.blobDelimiter
}

// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := metadata["useAAdPodIdentity"]; ok && podAuth == "" {
if val == "true" {
podAuth = "azure"
}
if val, ok := metadata["useAAdPodIdentity"]; ok && podAuth == "" && val == "true" {
podAuth = "azure"
}

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
if podAuth == "" || podAuth == "none" {
// Azure Blob Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
connection := authParams["connection"]
if connection != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = connection
} else {
connectionSetting := defaultBlobConnectionSetting
if val, ok := metadata["connection"]; ok && val != "" {
connectionSetting = val
}

if val, ok := resolvedEnv[connectionSetting]; ok {
meta.connection = val
} else {
return nil, "", fmt.Errorf("no connection setting given")
}
if authParams["connection"] != "" {
meta.connection = authParams["connection"]
} else if metadata["connection"] != "" {
meta.connection = metadata["connection"]
} else if metadata["connectionFromEnv"] != "" {
meta.connection = resolvedEnv[metadata["connectionFromEnv"]]
}

if len(meta.connection) == 0 {
return nil, "", fmt.Errorf("no connection setting given")
}
} else if podAuth == "azure" {
// If the Use AAD Pod Identity is present then check account name
Expand Down
44 changes: 22 additions & 22 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ import (
)

const (
defaultEventHubMessageThreshold = 64
eventHubMetricType = "External"
thresholdMetricName = "unprocessedEventThreshold"
defaultEventHubConsumerGroup = "$Default"
defaultEventHubConnectionSetting = "EventHub"
defaultStorageConnectionSetting = "AzureWebJobsStorage"
defaultBlobContainer = ""
defaultEventHubMessageThreshold = 64
eventHubMetricType = "External"
thresholdMetricName = "unprocessedEventThreshold"
defaultEventHubConsumerGroup = "$Default"
defaultBlobContainer = ""
)

var eventhubLog = logf.Log.WithName("azure_eventhub_scaler")
Expand All @@ -42,8 +40,8 @@ type EventHubMetadata struct {
}

// NewAzureEventHubScaler creates a new scaler for eventHub
func NewAzureEventHubScaler(resolvedEnv, metadata map[string]string) (Scaler, error) {
parsedMetadata, err := parseAzureEventHubMetadata(metadata, resolvedEnv)
func NewAzureEventHubScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
parsedMetadata, err := parseAzureEventHubMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("unable to get eventhub metadata: %s", err)
}
Expand All @@ -60,7 +58,7 @@ func NewAzureEventHubScaler(resolvedEnv, metadata map[string]string) (Scaler, er
}

// parseAzureEventHubMetadata parses metadata
func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*EventHubMetadata, error) {
func parseAzureEventHubMetadata(metadata, resolvedEnv, authParams map[string]string) (*EventHubMetadata, error) {
meta := EventHubMetadata{
eventHubInfo: azure.EventHubInfo{},
}
Expand All @@ -75,25 +73,27 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event
meta.threshold = threshold
}

storageConnectionSetting := defaultStorageConnectionSetting
if val, ok := metadata["storageConnection"]; ok && val != "" {
storageConnectionSetting = val
if authParams["storageConnection"] != "" {
meta.eventHubInfo.StorageConnection = authParams["storageConnection"]
} else if metadata["storageConnection"] != "" {
meta.eventHubInfo.StorageConnection = metadata["storageConnection"]
} else if metadata["storageConnectionFromEnv"] != "" {
meta.eventHubInfo.StorageConnection = resolvedEnv[metadata["storageConnectionFromEnv"]]
}

if val, ok := resolvedEnv[storageConnectionSetting]; ok {
meta.eventHubInfo.StorageConnection = val
} else {
if len(meta.eventHubInfo.StorageConnection) == 0 {
return nil, fmt.Errorf("no storage connection string given")
}

eventHubConnectionSetting := defaultEventHubConnectionSetting
if val, ok := metadata["connection"]; ok && val != "" {
eventHubConnectionSetting = val
if authParams["connection"] != "" {
meta.eventHubInfo.EventHubConnection = authParams["connection"]
} else if metadata["connection"] != "" {
meta.eventHubInfo.EventHubConnection = metadata["connection"]
} else if metadata["connectionFromEnv"] != "" {
meta.eventHubInfo.EventHubConnection = resolvedEnv[metadata["connectionFromEnv"]]
}

if val, ok := resolvedEnv[eventHubConnectionSetting]; ok {
meta.eventHubInfo.EventHubConnection = val
} else {
if len(meta.eventHubInfo.EventHubConnection) == 0 {
return nil, fmt.Errorf("no event hub connection string given")
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ var sampleEventHubResolvedEnv = map[string]string{eventHubConnectionSetting: "no
var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{
{map[string]string{}, true},
// properly formed event hub metadata
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
// missing event hub connection setting
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15"}, true},
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15"}, true},
// missing storage connection setting
{map[string]string{"consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true},
{map[string]string{"consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true},
// missing event hub consumer group - should replace with default
{map[string]string{"storageConnection": storageConnectionSetting, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
// missing unprocessed event threshold - should replace with default
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting}, false},
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false},
// added blob container details
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "blobContainer": testContainerName}, false},
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName}, false},
}

var eventHubMetricIdentifiers = []eventHubMetricIdentifier{
Expand All @@ -72,7 +72,7 @@ var testEventHubScaler = AzureEventHubScaler{
func TestParseEventHubMetadata(t *testing.T) {
// Test first with valid resolved environment
for _, testData := range parseEventHubMetadataDataset {
_, err := parseAzureEventHubMetadata(testData.metadata, sampleEventHubResolvedEnv)
_, err := parseAzureEventHubMetadata(testData.metadata, sampleEventHubResolvedEnv, map[string]string{})

if err != nil && !testData.isError {
t.Errorf("Expected success but got error: %s", err)
Expand Down Expand Up @@ -419,7 +419,7 @@ func DeleteContainerInStorage(ctx context.Context, endpoint *url.URL, credential

func TestEventHubGetMetricSpecForScaling(t *testing.T) {
for _, testData := range eventHubMetricIdentifiers {
meta, err := parseAzureEventHubMetadata(testData.metadataTestData.metadata, sampleEventHubResolvedEnv)
meta, err := parseAzureEventHubMetadata(testData.metadataTestData.metadata, sampleEventHubResolvedEnv, map[string]string{})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
52 changes: 22 additions & 30 deletions pkg/scalers/azure_monitor_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
)

const (
azureMonitorMetricName = "metricName"
targetValueName = "targetValue"
defaultClientIDSetting = ""
defaultClientPasswordSetting = ""
azureMonitorMetricName = "metricName"
targetValueName = "targetValue"
)

type azureMonitorScaler struct {
Expand Down Expand Up @@ -119,34 +117,28 @@ func parseAzureMonitorMetadata(metadata, resolvedEnv, authParams map[string]stri
}

if podIdentity == "" || podIdentity == "none" {
if val, ok := authParams["activeDirectoryClientId"]; ok && val != "" {
meta.azureMonitorInfo.ClientID = val
} else {
clientIDSetting := defaultClientIDSetting
if val, ok := metadata["activeDirectoryClientId"]; ok && val != "" {
clientIDSetting = val
}

if val, ok := resolvedEnv[clientIDSetting]; ok {
meta.azureMonitorInfo.ClientID = val
} else {
return nil, fmt.Errorf("no activeDirectoryClientId given")
}
if authParams["activeDirectoryClientId"] != "" {
meta.azureMonitorInfo.ClientID = authParams["activeDirectoryClientId"]
} else if metadata["activeDirectoryClientId"] != "" {
meta.azureMonitorInfo.ClientID = metadata["activeDirectoryClientId"]
} else if metadata["activeDirectoryClientIdFromEnv"] != "" {
meta.azureMonitorInfo.ClientID = resolvedEnv[metadata["activeDirectoryClientIdFromEnv"]]
}

if val, ok := authParams["activeDirectoryClientPassword"]; ok && val != "" {
meta.azureMonitorInfo.ClientPassword = val
} else {
clientPasswordSetting := defaultClientPasswordSetting
if val, ok := metadata["activeDirectoryClientPassword"]; ok && val != "" {
clientPasswordSetting = val
}

if val, ok := resolvedEnv[clientPasswordSetting]; ok {
meta.azureMonitorInfo.ClientPassword = val
} else {
return nil, fmt.Errorf("no activeDirectoryClientPassword given")
}
if len(meta.azureMonitorInfo.ClientID) == 0 {
return nil, fmt.Errorf("no activeDirectoryClientId given")
}

if authParams["activeDirectoryClientPassword"] != "" {
meta.azureMonitorInfo.ClientPassword = authParams["activeDirectoryClientPassword"]
} else if metadata["activeDirectoryClientPassword"] != "" {
meta.azureMonitorInfo.ClientPassword = metadata["activeDirectoryClientPassword"]
} else if metadata["activeDirectoryClientPasswordFromEnv"] != "" {
meta.azureMonitorInfo.ClientPassword = resolvedEnv[metadata["activeDirectoryClientPasswordFromEnv"]]
}

if len(meta.azureMonitorInfo.ClientPassword) == 0 {
return nil, fmt.Errorf("no activeDirectoryClientPassword given")
}
} else if podIdentity != "azure" {
return nil, fmt.Errorf("Azure Monitor doesn't support pod identity %s", podIdentity)
Expand Down
25 changes: 10 additions & 15 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const (
queueLengthMetricName = "queueLength"
defaultTargetQueueLength = 5
externalMetricType = "External"
defaultConnectionSetting = "AzureWebJobsStorage"
)

type azureQueueScaler struct {
Expand Down Expand Up @@ -80,21 +79,17 @@ func parseAzureQueueMetadata(metadata, resolvedEnv, authParams map[string]string
if podAuth == "" || podAuth == "none" {
// Azure Queue Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
connection := authParams["connection"]
if connection != "" {
if authParams["connection"] != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = connection
} else {
connectionSetting := defaultConnectionSetting
if val, ok := metadata["connection"]; ok && val != "" {
connectionSetting = val
}

if val, ok := resolvedEnv[connectionSetting]; ok {
meta.connection = val
} else {
return nil, "", fmt.Errorf("no connection setting given")
}
meta.connection = authParams["connection"]
} else if metadata["connection"] != "" {
meta.connection = metadata["connection"]
} else if metadata["connectionFromEnv"] != "" {
meta.connection = resolvedEnv[metadata["connectionFromEnv"]]
}

if len(meta.connection) == 0 {
return nil, "", fmt.Errorf("no connection setting given")
}
} else if podAuth == "azure" {
// If the Use AAD Pod Identity is present then check account name
Expand Down
16 changes: 7 additions & 9 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,15 @@ func parseAzureServiceBusMetadata(resolvedEnv, metadata, authParams map[string]s

if podIdentity == "" || podIdentity == "none" {
// get servicebus connection string
if val, ok := authParams["connection"]; ok {
meta.connection = val
} else if val, ok := metadata["connection"]; ok {
connectionSetting := val

if val, ok := resolvedEnv[connectionSetting]; ok {
meta.connection = val
}
if authParams["connection"] != "" {
meta.connection = authParams["connection"]
} else if metadata["connection"] != "" {
meta.connection = metadata["connection"]
} else if metadata["connectionFromEnv"] != "" {
meta.connection = resolvedEnv[metadata["connectionFromEnv"]]
}

if meta.connection == "" {
if len(meta.connection) == 0 {
return nil, fmt.Errorf("no connection setting given")
}
} else if podIdentity == "azure" {
Expand Down
Loading

0 comments on commit 9a904cc

Please sign in to comment.