Skip to content

Commit

Permalink
refactor: refactor ScaledJob to be used with scale_handler (kedacore#…
Browse files Browse the repository at this point in the history
…4707)

Signed-off-by: Yoon Park <[email protected]>
  • Loading branch information
yoongon authored Jul 12, 2023
1 parent 5e658a4 commit 2a3d9f1
Show file tree
Hide file tree
Showing 10 changed files with 618 additions and 492 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ New deprecation(s):

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))

## v2.11.1

Expand Down
124 changes: 93 additions & 31 deletions apis/keda/v1alpha1/identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,37 @@ type testData struct {
soKind string
}

var tests = []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "all uppercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDOBJECT",
},
{
name: "camel case",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "missing namespace",
expectedIdentifier: "scaledobject..name",
soName: "name",
soKind: "scaledobject",
},
}

func TestGeneratedIdentifierForScaledObject(t *testing.T) {
tests := []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "all uppercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDOBJECT",
},
{
name: "camel case",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "missing namespace",
expectedIdentifier: "scaledobject..name",
soName: "name",
soKind: "scaledobject",
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
expectedIdentifier := test.expectedIdentifier
genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName)
Expand Down Expand Up @@ -79,3 +77,67 @@ func TestGeneratedIdentifierForScaledObject(t *testing.T) {
})
}
}

func TestGeneratedIdentifierForScaledJob(t *testing.T) {
tests := []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledjob.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledjob",
},
{
name: "all uppercase",
expectedIdentifier: "scaledjob.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDJOB",
},
{
name: "camel case",
expectedIdentifier: "scaledjob.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledjob",
},
{
name: "missing namespace",
expectedIdentifier: "scaledjob..name",
soName: "name",
soKind: "scaledjob",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
expectedIdentifier := test.expectedIdentifier
genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName)

scaledJob := &ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: test.soName,
Namespace: test.soNamespace,
},
}
scaledJobIdentifier := scaledJob.GenerateIdentifier()

withTriggers, err := AsDuckWithTriggers(scaledJob)
if err != nil {
t.Errorf("got error while converting to WithTriggers object: %s", err)
}
withTriggersIdentifier := withTriggers.GenerateIdentifier()

if expectedIdentifier != genericIdentifier {
t.Errorf("genericIdentifier=%q doesn't equal the expectedIdentifier=%q", genericIdentifier, expectedIdentifier)
}

if expectedIdentifier != scaledJobIdentifier {
t.Errorf("scaledJobIdentifier=%q doesn't equal the expectedIdentifier=%q", scaledJobIdentifier, expectedIdentifier)
}

if expectedIdentifier != withTriggersIdentifier {
t.Errorf("withTriggersIdentifier=%q doesn't equal the expectedIdentifier=%q", withTriggersIdentifier, expectedIdentifier)
}
})
}
}
4 changes: 4 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@ func (s ScaledJob) MinReplicaCount() int64 {
}
return defaultScaledJobMinReplicaCount
}

func (s *ScaledJob) GenerateIdentifier() string {
return GenerateIdentifier("ScaledJob", s.Namespace, s.Name)
}
2 changes: 1 addition & 1 deletion apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *WithTriggers) GenerateIdentifier() string {
return GenerateIdentifier(t.InternalKind, t.Namespace, t.Name)
}

// AsDuckWithTriggers tries to generates WithTriggers object for input object
// AsDuckWithTriggers tries to generate WithTriggers object for input object
// returns error if input object is unknown
func AsDuckWithTriggers(scalableObject interface{}) (*WithTriggers, error) {
switch obj := scalableObject.(type) {
Expand Down
167 changes: 0 additions & 167 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ package cache
import (
"context"
"fmt"
"math"
"time"

v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scalers"
)

Expand Down Expand Up @@ -141,68 +138,6 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index
return metric, activity, time.Since(startTime).Milliseconds(), err
}

// TODO needs refactor - move ScaledJob related methods to scale_handler, the similar way ScaledObject methods are
// refactor logic
func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength float64
var maxValue float64
isActive := false

logger := logf.Log.WithName("scalemetrics")
scalersMetrics := c.getScaledJobMetrics(ctx, scaledJob)
switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation {
case "min":
for _, metrics := range scalersMetrics {
if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
case "avg":
queueLengthSum := float64(0)
maxValueSum := float64(0)
length := 0
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLengthSum += metrics.queueLength
maxValueSum += metrics.maxValue
isActive = metrics.isActive
length++
}
}
if length != 0 {
queueLength = queueLengthSum / float64(length)
maxValue = maxValueSum / float64(length)
}
case "sum":
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLength += metrics.queueLength
maxValue += metrics.maxValue
isActive = metrics.isActive
}
}
default: // max
for _, metrics := range scalersMetrics {
if metrics.queueLength > queueLength && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
}

if scaledJob.MinReplicaCount() > 0 {
isActive = true
}

maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue)
logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)

return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue)
}

func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) {
if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
Expand All @@ -226,105 +161,3 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale

return ns, nil
}

type scalerMetrics struct {
queueLength float64
maxValue float64
isActive bool
}

func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics {
// TODO this loop should be probably done similar way the ScaledObject loop is done
var scalersMetrics []scalerMetrics
for i, s := range c.Scalers {
var queueLength float64
var targetAverageValue float64
isActive := false
maxValue := float64(0)
scalerType := fmt.Sprintf("%T:", s)

scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType)

metricSpecs := s.Scaler.GetMetricSpecForScaling(ctx)

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

// TODO here we should probably loop through all metrics in a Scaler
// as it is done for ScaledObject
metrics, isTriggerActive, err := s.Scaler.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name)
if err != nil {
var ns scalers.Scaler
ns, err = c.refreshScaler(ctx, i)
if err == nil {
metrics, isTriggerActive, err = ns.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name)
}
}

if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
}

targetAverageValue = getTargetAverageValue(metricSpecs)

var metricValue float64
for _, m := range metrics {
if m.MetricName == metricSpecs[0].External.Metric.Name {
metricValue = m.Value.AsApproximateFloat64()
queueLength += metricValue
}
}
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue)

if isTriggerActive {
isActive = true
}

if targetAverageValue != 0 {
averageLength := queueLength / targetAverageValue
maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength)
}
scalersMetrics = append(scalersMetrics, scalerMetrics{
queueLength: queueLength,
maxValue: maxValue,
isActive: isActive,
})
}
return scalersMetrics
}

func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 {
var targetAverageValue float64
var metricValue float64
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue = metric.External.Target.AverageValue.AsApproximateFloat64()
}

targetAverageValue += metricValue
}
count := float64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func ceilToInt64(x float64) int64 {
return int64(math.Ceil(x))
}

// Min function for float64
func min(x, y float64) float64 {
if x > y {
return y
}
return x
}
Loading

0 comments on commit 2a3d9f1

Please sign in to comment.