Skip to content

Commit

Permalink
[v2] Implement the scaledjob controler for v2 (kedacore#945)
Browse files Browse the repository at this point in the history
* [v2] Implement the scaledjob controler for v2

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Scale Logic of the Azure Storage Queue for not scaling with invisible queue

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/controller/scaledjob/scaledjob_controller.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* rollback the change for rabbit mq and fix the protocol on crd

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Add protocol for the scalejobs_crd

Signed-off-by: Tsuyoshi Ushio <[email protected]>

Co-authored-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
2 people authored and Zbynek Roubalik committed Aug 6, 2020
1 parent dcaa94c commit 73d676d
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 67 deletions.
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,8 @@ k8s.io/apimachinery v0.0.0-20191121175448-79c2a76c473a/go.mod h1:b9qmWdKlLuU9EBh
k8s.io/apimachinery v0.18.0/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA=
k8s.io/apimachinery v0.18.2 h1:44CmtbmkzVDAhCpRVSiP2R5PPrC2RtlIv/MoB8xpdRA=
k8s.io/apimachinery v0.18.2/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA=
k8s.io/apimachinery v0.18.5 h1:Lh6tgsM9FMkC12K5T5QjRm7rDs6aQN5JHkA0JomULDM=
k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
k8s.io/apiserver v0.18.2 h1:fwKxdTWwwYhxvtjo0UUfX+/fsitsNtfErPNegH2x9ic=
k8s.io/apiserver v0.18.2/go.mod h1:Xbh066NqrZO8cbsoenCwyDJ1OSi8Ag8I2lezeHxzwzw=
k8s.io/autoscaler v0.0.0-20190607113959-1b4f1855cb8e/go.mod h1:QEXezc9uKPT91dwqhSJq3GNI3B1HxFRQHiku9kmrsSA=
Expand Down Expand Up @@ -1357,6 +1359,7 @@ k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/metrics v0.18.0/go.mod h1:8aYTW18koXqjLVKL7Ds05RPMX9ipJZI3mywYvBOxXd4=
k8s.io/metrics v0.18.2 h1:v4J7WKu/Zo/htSH3w//UWJZT9/CpUThXWYyUbQ/F/jY=
k8s.io/metrics v0.18.2/go.mod h1:qga8E7QfYNR9Q89cSCAjinC9pTZ7yv1XSVGUB0vJypg=
k8s.io/metrics v0.18.6 h1:IRMCn0KKNhbOSnxNZ+MhooRi8c67iIMjpGkKpm6oqOM=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Expand Down
189 changes: 132 additions & 57 deletions pkg/controller/scaledjob/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@ package scaledjob

import (
"context"
"fmt"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"

"github.com/kedacore/keda/pkg/scaling"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
//metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

//"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

//"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -34,7 +42,10 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileScaledJob{client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileScaledJob{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
scaleHandler: scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme())}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand All @@ -46,17 +57,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

// Watch for changes to primary resource ScaledJob
err = c.Watch(&source.Kind{Type: &kedav1alpha1.ScaledJob{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

// TODO(user): Modify this to be the types you create that are owned by the primary resource
// Watch for changes to secondary resource Pods and requeue the owner ScaledJob
// err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
// IsController: true,
// OwnerType: &kedav1alpha1.ScaledJob{},
// })
err = c.Watch(&source.Kind{Type: &kedav1alpha1.ScaledJob{}},
&handler.EnqueueRequestForObject{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
// Ignore updates to ScaledObject Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
return e.MetaOld.GetGeneration() != e.MetaNew.GetGeneration()
},
})
if err != nil {
return err
}
Expand All @@ -71,8 +80,9 @@ var _ reconcile.Reconciler = &ReconcileScaledJob{}
type ReconcileScaledJob struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
client client.Client
scheme *runtime.Scheme
scaleHandler scaling.ScaleHandler
}

// Reconcile reads that state of the cluster for a ScaledJob object and makes changes based on the state read
Expand All @@ -84,11 +94,10 @@ type ReconcileScaledJob struct {
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileScaledJob) Reconcile(request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling ScaledJob")

// Fetch the ScaledJob instance
instance := &kedav1alpha1.ScaledJob{}
err := r.client.Get(context.TODO(), request.NamespacedName, instance)
scaledJob := &kedav1alpha1.ScaledJob{}
err := r.client.Get(context.TODO(), request.NamespacedName, scaledJob)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
Expand All @@ -97,48 +106,114 @@ func (r *ReconcileScaledJob) Reconcile(request reconcile.Request) (reconcile.Res
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get ScaleJob")
return reconcile.Result{}, err
}

reqLogger.Info("Reconciling ScaledJob is NOT IMPLEMENTED yet")
reqLogger.Info("Reconciling ScaledJob")

isScaledJobMarkedToBeDeleted := scaledJob.GetDeletionTimestamp() != nil
if isScaledJobMarkedToBeDeleted {
if contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
// Run finalization logic for scaledJobFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.finalizeScaledJob(reqLogger, scaledJob); err != nil {
return reconcile.Result{}, err
}

// Remove scaledJobFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledJob.SetFinalizers(remove(scaledJob.GetFinalizers(), scaledJobFinalizer))
err := r.client.Update(context.TODO(), scaledJob)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

if !contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
if err := r.addFinalizer(reqLogger, scaledJob); err != nil {
return reconcile.Result{}, err
}
}

var errMsg string
if scaledJob.Spec.JobTargetRef != nil {
reqLogger.Info("Detected ScaleType = Job")
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
} else {
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

return reconcile.Result{}, err
}

return reconcile.Result{}, nil
errMsg = "scaledJob.Spec.JobTargetRef is not set"
err = fmt.Errorf(errMsg)
reqLogger.Error(err, "scaledJob.Spec.JobTargetRef not found")
return reconcile.Result{}, err
}

// FIXME use ScaledJob
// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject
// func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ( error) {
// // scaledObject.Spec.ScaleType = kedav1alpha1.ScaleTypeJob

// // Delete Jobs owned by the previous version of the ScaledObject
// opts := []client.ListOption{
// client.InNamespace(scaledObject.GetNamespace()),
// client.MatchingLabels(map[string]string{"scaledobject": scaledObject.GetName()}),
// }
// jobs := &batchv1.JobList{}
// err := r.client.List(context.TODO(), jobs, opts...)
// if err != nil {
// logger.Error(err, "Cannot get list of Jobs owned by this ScaledObject")
// return err
// }

// if jobs.Size() > 0 {
// logger.Info("Deleting jobs owned by the previous version of the ScaledObject", "Number of jobs to delete", jobs.Size())
// }
// for _, job := range jobs.Items {
// err = r.client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
// if err != nil {
// logger.Error(err, "Not able to delete job", "Job", job.Name)
// return err
// }
// }

// // ScaledObject was created or modified - let's start a new ScaleLoop
// err = r.startScaleLoop(logger, scaledObject)
// if err != nil {
// logger.Error(err, "Failed to start a new ScaleLoop")
// return err
// }

// return nil
// }
func (r *ReconcileScaledJob) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {

msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
if err != nil {
return msg, err
}

// scaledJob was created or modified - let's start a new ScaleLoop
err = r.requestScaleLoop(logger, scaledJob)
if err != nil {
return "Failed to start a new scale loop with scaling logic", err
} else {
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}

return "ScaledJob is defined correctly and is ready to scaling", nil
}

// Delete Jobs owned by the previous version of the scaledJob
func (r *ReconcileScaledJob) deletePreviousVersionScaleJobs(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledJob": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.client.List(context.TODO(), jobs, opts...)
if err != nil {
return "Cannot get list of Jobs owned by this scaledJob", err
}

if jobs.Size() > 0 {
logger.Info("Deleting jobs owned by the previous version of the scaledJob", "Number of jobs to delete", jobs.Size())
}
for _, job := range jobs.Items {
err = r.client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}

return fmt.Sprintf("Deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", jobs.Size()), nil
}

// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ReconcileScaledJob) requestScaleLoop(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {

logger.V(1).Info("Starting a new ScaleLoop")

if err := r.scaleHandler.HandleScalableObject(scaledJob); err != nil {
return err
}

return nil
}
51 changes: 51 additions & 0 deletions pkg/controller/scaledjob/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package scaledjob

import (
"context"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
)

const (
scaledJobFinalizer = "finalizer.keda.sh"
)

// finalizescaledJob is stopping ScaleLoop for the respective ScaleJob
func (r *ReconcileScaledJob) finalizeScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
// TODO implement finalize logic for ScaledJob
logger.Info("Successfully finalized ScaledJob")
return nil
}

// addFinalizer adds finalizer to the scaledJob
func (r *ReconcileScaledJob) addFinalizer(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.Info("Adding Finalizer for the ScaledJob")
scaledJob.SetFinalizers(append(scaledJob.GetFinalizers(), scaledJobFinalizer))

// Update CR
err := r.client.Update(context.TODO(), scaledJob)
if err != nil {
logger.Error(err, "Failed to update ScaledJob with finalizer")
return err
}
return nil
}

func contains(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}

func remove(list []string, s string) []string {
for i, v := range list {
if v == s {
list = append(list[:i], list[i+1:]...)
}
}
return list
}
24 changes: 23 additions & 1 deletion pkg/scalers/azure/azure_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package azure

import (
"context"

"github.com/Azure/azure-storage-queue-go/azqueue"
)

Expand All @@ -26,5 +27,26 @@ func GetAzureQueueLength(ctx context.Context, podIdentity string, connectionStri
return -1, err
}

return props.ApproximateMessagesCount(), nil
visibleMessageCount, err := getVisibleCount(&queueURL, 32)
if err != nil {
return -1, err
}
approximateMessageCount := props.ApproximateMessagesCount()

if visibleMessageCount == 32 {
return approximateMessageCount, nil
} else {
return visibleMessageCount, nil
}
}

func getVisibleCount(queueURL *azqueue.QueueURL, maxCount int32) (int32, error) {
messagesURL := queueURL.NewMessagesURL()
ctx := context.Background()
queue, err := messagesURL.Peek(ctx, maxCount)
if err != nil {
return 0, err
}
num := queue.NumMessages()
return num, nil
}
14 changes: 7 additions & 7 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
}

func (e *scaleExecutor) createJobs(scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
// scaledObject.Spec.JobTargetRef.Template.GenerateName = scaledObject.GetName() + "-"
// if scaledObject.Spec.JobTargetRef.Template.Labels == nil {
// scaledObject.Spec.JobTargetRef.Template.Labels = map[string]string{}
// }
// scaledObject.Spec.JobTargetRef.Template.Labels["scaledobject"] = scaledObject.GetName()
scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-"
if scaledJob.Spec.JobTargetRef.Template.Labels == nil {
scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{}
}
scaledJob.Spec.JobTargetRef.Template.Labels["scaledjob"] = scaledJob.GetName()

e.logger.Info("Creating jobs", "Effective number of max jobs", maxScale)

Expand All @@ -65,10 +65,10 @@ func (e *scaleExecutor) createJobs(scaledJob *kedav1alpha1.ScaledJob, scaleTo in
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledJob.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledobject": scaledJob.GetName(),
"scaledjob": scaledJob.GetName(),
},
},
//Spec: *scaledObject.Spec.JobTargetRef.DeepCopy(),
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}

// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
Expand Down
Loading

0 comments on commit 73d676d

Please sign in to comment.