Skip to content

Commit

Permalink
add core logic to support access token in postgres scaler (kedacore#5589
Browse files Browse the repository at this point in the history
)

* add core logic to support access token in postgres scaler

Signed-off-by: Ferdinand de Baecque <[email protected]>

* minor fix

Signed-off-by: Ferdinand de Baecque <[email protected]>

* run make build to fmt code

Signed-off-by: Ferdinand de Baecque <[email protected]>

* make regexp password pattern global

Signed-off-by: Ferdinand de Baecque <[email protected]>

* adapt to use placeholder for regexp

Signed-off-by: Ferdinand de Baecque <[email protected]>

* add missing authPodIdentity variable

Signed-off-by: Ferdinand de Baecque <[email protected]>

* lint code using gci write... command

Signed-off-by: Ferdinand de Baecque <[email protected]>

* lint import + add 2 unite tests

Signed-off-by: Ferdinand de Baecque <[email protected]>

* lint with make fmt

Signed-off-by: Ferdinand de Baecque <[email protected]>

* remove podIdentityAzure references (but keep AzureWorkload ones)

Signed-off-by: Ferdinand de Baecque <[email protected]>

* replace switch by if statements + fix error when comparing + close connection before recreating it

Signed-off-by: Ferdinand de Baecque <[email protected]>

* generate a new token if the current one has expired + add log info statement

Signed-off-by: Ferdinand de Baecque <[email protected]>

* minor change + add entry in CHANGELOG.md

Signed-off-by: Ferdinand de Baecque <[email protected]>

* Add first draft of an e2e test

Signed-off-by: Ferdinand de Baecque <[email protected]>

* Add comment and change package name

Signed-off-by: Ferdinand de Baecque <[email protected]>

* fix golanci lint

Signed-off-by: Ferdinand de Baecque <[email protected]>

* use identity 1 in e2e tests

Signed-off-by: Ferdinand de Baecque <[email protected]>

* fix e2e tests after testing it + change .env file

Signed-off-by: Ferdinand de Baecque <[email protected]>

* go fmt

Signed-off-by: Ferdinand de Baecque <[email protected]>

* remove entries in .env file

Signed-off-by: Ferdinand de Baecque <[email protected]>

* Add Postgres env variables

Signed-off-by: Ferdinand de Baecque <[email protected]>

* remove useless variables

Signed-off-by: Ferdinand de Baecque <[email protected]>

* Update e2e test to reset all the task using a query

Signed-off-by: Jorge Turrado <[email protected]>

* missing changes after rebase

Signed-off-by: Jorge Turrado <[email protected]>

* fix typo in the query

Signed-off-by: Jorge Turrado Ferrero <[email protected]>

* remove the load

Signed-off-by: Jorge Turrado <[email protected]>

* fix style

Signed-off-by: Jorge Turrado <[email protected]>

---------

Signed-off-by: Ferdinand de Baecque <[email protected]>
Signed-off-by: Jorge Turrado <[email protected]>
Signed-off-by: Jorge Turrado Ferrero <[email protected]>
Signed-off-by: Jorge Turrado <[email protected]>
Co-authored-by: Jorge Turrado <[email protected]>
Co-authored-by: Jorge Turrado Ferrero <[email protected]>
  • Loading branch information
3 people authored Jul 29, 2024
1 parent d4fcc84 commit c740bf0
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Here is an overview of all new **experimental** features:
- **IBM MQ Scaler**: Add TLS support for IBM MQ scaler ([#5974](https://github.com/kedacore/keda/issues/5974))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **MYSQL Scaler**: Add support to fetch username from env ([#5883](https://github.com/kedacore/keda/issues/5883))
- **Postgres Scaler**: Add support for access token authentication to an Azure Postgres Flexible Server ([#5823](https://github.com/kedacore/keda/issues/5823))

### Fixes

Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1930,16 +1930,15 @@ github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I=
github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU=
github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
Expand Down Expand Up @@ -2023,6 +2022,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jstemmer/go-junit-report/v2 v2.1.0 h1:X3+hPYlSczH9IMIpSC9CQSZA0L+BipYafciZUWHEmsc=
github.com/jstemmer/go-junit-report/v2 v2.1.0/go.mod h1:mgHVr7VUo5Tn8OLVr1cKnLuEy0M92wdRntM99h7RkgQ=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
Expand Down Expand Up @@ -2074,7 +2075,6 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
Expand Down Expand Up @@ -2284,7 +2284,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
Expand Down
210 changes: 151 additions & 59 deletions pkg/scalers/postgresql_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,41 @@ import (
"context"
"database/sql"
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/go-logr/logr"
// PostreSQL drive required for this scaler
_ "github.com/jackc/pgx/v5/stdlib"
_ "github.com/jackc/pgx/v5/stdlib" // PostreSQL drive required for this scaler
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
// Azure AD resource ID for Azure Database for PostgreSQL is https://ossrdbms-aad.database.windows.net
// https://learn.microsoft.com/en-us/azure/postgresql/single-server/how-to-connect-with-managed-identity
azureDatabasePostgresResource = "https://ossrdbms-aad.database.windows.net/.default"
)

var (
passwordConnPattern = regexp.MustCompile(`%PASSWORD%`)
)

type postgreSQLScaler struct {
metricType v2.MetricTargetType
metadata *postgreSQLMetadata
connection *sql.DB
logger logr.Logger
metricType v2.MetricTargetType
metadata *postgreSQLMetadata
connection *sql.DB
podIdentity kedav1alpha1.AuthPodIdentity
logger logr.Logger
}

type postgreSQLMetadata struct {
Expand All @@ -30,120 +47,167 @@ type postgreSQLMetadata struct {
connection string
query string
triggerIndex int
azureAuthContext azureAuthContext
}

type azureAuthContext struct {
cred *azidentity.ChainedTokenCredential
token *azcore.AccessToken
}

// NewPostgreSQLScaler creates a new postgreSQL scaler
func NewPostgreSQLScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
func NewPostgreSQLScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "postgresql_scaler")

meta, err := parsePostgreSQLMetadata(config)
meta, podIdentity, err := parsePostgreSQLMetadata(logger, config)
if err != nil {
return nil, fmt.Errorf("error parsing postgreSQL metadata: %w", err)
}

conn, err := getConnection(meta, logger)
conn, err := getConnection(ctx, meta, podIdentity, logger)
if err != nil {
return nil, fmt.Errorf("error establishing postgreSQL connection: %w", err)
}
return &postgreSQLScaler{
metricType: metricType,
metadata: meta,
connection: conn,
logger: logger,
metricType: metricType,
metadata: meta,
connection: conn,
podIdentity: podIdentity,
logger: logger,
}, nil
}

func parsePostgreSQLMetadata(config *scalersconfig.ScalerConfig) (*postgreSQLMetadata, error) {
func parsePostgreSQLMetadata(logger logr.Logger, config *scalersconfig.ScalerConfig) (*postgreSQLMetadata, kedav1alpha1.AuthPodIdentity, error) {
meta := postgreSQLMetadata{}

authPodIdentity := kedav1alpha1.AuthPodIdentity{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
return nil, authPodIdentity, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["targetQueryValue"]; ok {
targetQueryValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("queryValue parsing error %w", err)
return nil, authPodIdentity, fmt.Errorf("queryValue parsing error %w", err)
}
meta.targetQueryValue = targetQueryValue
} else {
if config.AsMetricSource {
meta.targetQueryValue = 0
} else {
return nil, fmt.Errorf("no targetQueryValue given")
return nil, authPodIdentity, fmt.Errorf("no targetQueryValue given")
}
}

meta.activationTargetQueryValue = 0
if val, ok := config.TriggerMetadata["activationTargetQueryValue"]; ok {
activationTargetQueryValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetQueryValue parsing error %w", err)
return nil, authPodIdentity, fmt.Errorf("activationTargetQueryValue parsing error %w", err)
}
meta.activationTargetQueryValue = activationTargetQueryValue
}

switch {
case config.AuthParams["connection"] != "":
meta.connection = config.AuthParams["connection"]
case config.TriggerMetadata["connectionFromEnv"] != "":
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
default:
host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, err
}
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
switch {
case config.AuthParams["connection"] != "":
meta.connection = config.AuthParams["connection"]
case config.TriggerMetadata["connectionFromEnv"] != "":
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
default:
params, err := buildConnArray(config)
if err != nil {
return nil, authPodIdentity, fmt.Errorf("failed to parse fields related to the connection")
}

port, err := GetFromAuthOrMeta(config, "port")
if err != nil {
return nil, err
var password string
if config.AuthParams["password"] != "" {
password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}
params = append(params, "password="+escapePostgreConnectionParameter(password))
meta.connection = strings.Join(params, " ")
}

userName, err := GetFromAuthOrMeta(config, "userName")
case kedav1alpha1.PodIdentityProviderAzureWorkload:
params, err := buildConnArray(config)
if err != nil {
return nil, err
return nil, authPodIdentity, fmt.Errorf("failed to parse fields related to the connection")
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
cred, err := azure.NewChainedCredential(logger, config.PodIdentity)
if err != nil {
return nil, err
return nil, authPodIdentity, err
}
meta.azureAuthContext.cred = cred
authPodIdentity = kedav1alpha1.AuthPodIdentity{Provider: config.PodIdentity.Provider}

sslmode, err := GetFromAuthOrMeta(config, "sslmode")
if err != nil {
return nil, err
}

var password string
if config.AuthParams["password"] != "" {
password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

// Build connection str
var params []string
params = append(params, "host="+escapePostgreConnectionParameter(host))
params = append(params, "port="+escapePostgreConnectionParameter(port))
params = append(params, "user="+escapePostgreConnectionParameter(userName))
params = append(params, "dbname="+escapePostgreConnectionParameter(dbName))
params = append(params, "sslmode="+escapePostgreConnectionParameter(sslmode))
params = append(params, "password="+escapePostgreConnectionParameter(password))
params = append(params, "%PASSWORD%")
meta.connection = strings.Join(params, " ")
}
meta.triggerIndex = config.TriggerIndex
return &meta, nil

return &meta, authPodIdentity, nil
}

func buildConnArray(config *scalersconfig.ScalerConfig) ([]string, error) {
var params []string

host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, err
}

port, err := GetFromAuthOrMeta(config, "port")
if err != nil {
return nil, err
}

userName, err := GetFromAuthOrMeta(config, "userName")
if err != nil {
return nil, err
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
if err != nil {
return nil, err
}

sslmode, err := GetFromAuthOrMeta(config, "sslmode")
if err != nil {
return nil, err
}
params = append(params, "host="+escapePostgreConnectionParameter(host))
params = append(params, "port="+escapePostgreConnectionParameter(port))
params = append(params, "user="+escapePostgreConnectionParameter(userName))
params = append(params, "dbname="+escapePostgreConnectionParameter(dbName))
params = append(params, "sslmode="+escapePostgreConnectionParameter(sslmode))

return params, nil
}

func getConnection(meta *postgreSQLMetadata, logger logr.Logger) (*sql.DB, error) {
db, err := sql.Open("pgx", meta.connection)
func getConnection(ctx context.Context, meta *postgreSQLMetadata, podIdentity kedav1alpha1.AuthPodIdentity, logger logr.Logger) (*sql.DB, error) {
connectionString := meta.connection

if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
accessToken, err := getAzureAccessToken(ctx, meta, azureDatabasePostgresResource)
if err != nil {
return nil, err
}
newPasswordField := "password=" + escapePostgreConnectionParameter(accessToken)
connectionString = passwordConnPattern.ReplaceAllString(meta.connection, newPasswordField)
}

db, err := sql.Open("pgx", connectionString)
if err != nil {
logger.Error(err, fmt.Sprintf("Found error opening postgreSQL: %s", err))
return nil, err
Expand All @@ -168,6 +232,19 @@ func (s *postgreSQLScaler) Close(context.Context) error {

func (s *postgreSQLScaler) getActiveNumber(ctx context.Context) (float64, error) {
var id float64

if s.podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
if s.metadata.azureAuthContext.token.ExpiresOn.Before(time.Now()) {
s.logger.Info("The Azure Access Token expired, retrieving a new Azure Access Token and instantiating a new Postgres connection object.")
s.connection.Close()
newConnection, err := getConnection(ctx, s.metadata, s.podIdentity, s.logger)
if err != nil {
return 0, fmt.Errorf("error establishing postgreSQL connection: %w", err)
}
s.connection = newConnection
}
}

err := s.connection.QueryRowContext(ctx, s.metadata.query).Scan(&id)
if err != nil {
s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
Expand Down Expand Up @@ -210,3 +287,18 @@ func escapePostgreConnectionParameter(str string) string {
str = strings.ReplaceAll(str, "'", "\\'")
return fmt.Sprintf("'%s'", str)
}

func getAzureAccessToken(ctx context.Context, metadata *postgreSQLMetadata, scope string) (string, error) {
accessToken, err := metadata.azureAuthContext.cred.GetToken(ctx, policy.TokenRequestOptions{
Scopes: []string{
scope,
},
})
if err != nil {
return "", err
}

metadata.azureAuthContext.token = &accessToken

return metadata.azureAuthContext.token.Token, nil
}
Loading

0 comments on commit c740bf0

Please sign in to comment.