Skip to content

Commit

Permalink
Merge pull request karmada-io#2912 from jwcesign/fix-pp-rb
Browse files Browse the repository at this point in the history
fix a corner case that re-schedule be skipped in case of the cluster becomes not fit.
  • Loading branch information
karmada-bot authored Dec 13, 2022
2 parents 11cb257 + 950b614 commit f029394
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
}

// Short path for case no cluster fit.
if len(feasibleClusters) == 0 {
return result, &framework.FitError{
NumAllClusters: clusterInfoSnapshot.NumOfClusters(),
Expand Down
22 changes: 14 additions & 8 deletions pkg/scheduler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ func (s *Scheduler) updateCluster(oldObj, newObj interface{}) {
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
fallthrough
case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec):
s.enqueueAffectedPolicy(newCluster)
s.enqueueAffectedClusterPolicy(newCluster)
s.enqueueAffectedPolicy(oldCluster, newCluster)
s.enqueueAffectedClusterPolicy(oldCluster, newCluster)
}
}

// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs
func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
func (s *Scheduler) enqueueAffectedPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
policies, _ := s.policyLister.List(labels.Everything())
for _, policy := range policies {
selector := labels.SelectorFromSet(labels.Set{
Expand All @@ -245,10 +245,13 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
affinity := policy.Spec.Placement.ClusterAffinity
switch {
case affinity == nil:
// If no clusters specified, add it in queue
// If no clusters specified, add it to the queue
fallthrough
case util.ClusterMatches(newCluster, *affinity):
// If specific cluster matches the affinity. add it in queue
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
fallthrough
case util.ClusterMatches(oldCluster, *affinity):
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
err := s.requeueResourceBindings(selector, metrics.ClusterChanged)
if err != nil {
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
Expand All @@ -258,7 +261,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
}

// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs
func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Cluster) {
func (s *Scheduler) enqueueAffectedClusterPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything())
for _, policy := range clusterPolicies {
selector := labels.SelectorFromSet(labels.Set{
Expand All @@ -267,10 +270,13 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu
affinity := policy.Spec.Placement.ClusterAffinity
switch {
case affinity == nil:
// If no clusters specified, add it in queue
// If no clusters specified, add it to the queue
fallthrough
case util.ClusterMatches(newCluster, *affinity):
// If specific cluster matches the affinity. add it in queue
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
fallthrough
case util.ClusterMatches(oldCluster, *affinity):
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged)
if err != nil {
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
Expand Down
17 changes: 13 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -33,6 +35,7 @@ import (
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
Expand Down Expand Up @@ -476,13 +479,16 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
}

scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
if err != nil {
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
}

klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
return s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
scheduleErr := s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
return utilerrors.NewAggregate([]error{err, scheduleErr})
}

func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
Expand Down Expand Up @@ -527,13 +533,16 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
}

scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
if err != nil {
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
return err
}

klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
return s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
scheduleErr := s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
return utilerrors.NewAggregate([]error{err, scheduleErr})
}

func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
Expand Down
44 changes: 44 additions & 0 deletions test/e2e/framework/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,50 @@ func setClusterLabel(c client.Client, clusterName string) error {
return err
}

// UpdateClusterLabels updates cluster labels.
func UpdateClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
gomega.Eventually(func() (bool, error) {
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
return false, err
}

if cluster.Labels == nil {
cluster.Labels = map[string]string{}
}
for key, value := range labels {
cluster.Labels[key] = value
}
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
return false, err
}
return true, nil
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
}

// DeleteClusterLabels deletes cluster labels if it exists.
func DeleteClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
gomega.Eventually(func() (bool, error) {
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
return false, err
}

if cluster.Labels == nil {
return true, nil
}
for key := range labels {
delete(cluster.Labels, key)
}
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
return false, err
}
return true, nil
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
}

// GetClusterNamesFromClusters will get Clusters' names form Clusters Object.
func GetClusterNamesFromClusters(clusters []*clusterv1alpha1.Cluster) []string {
clusterNames := make([]string, 0, len(clusters))
Expand Down
109 changes: 109 additions & 0 deletions test/e2e/rescheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,112 @@ var _ = ginkgo.Describe("[cluster joined] reschedule testing", func() {
})
})
})

// reschedule testing while policy matches, triggered by label changes.
var _ = ginkgo.Describe("[cluster labels changed] reschedule testing while policy matches", func() {
var deployment *appsv1.Deployment
var targetMember string
var labelKey string
var policyNamespace string
var policyName string

ginkgo.BeforeEach(func() {
targetMember = framework.ClusterNames()[0]
policyNamespace = testNamespace
policyName = deploymentNamePrefix + rand.String(RandomStrLength)
labelKey = "cluster" + rand.String(RandomStrLength)

deployment = testhelper.NewDeployment(testNamespace, policyName)
framework.CreateDeployment(kubeClient, deployment)

labels := map[string]string{labelKey: "ok"}
framework.UpdateClusterLabels(karmadaClient, targetMember, labels)

ginkgo.DeferCleanup(func() {
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
framework.DeleteClusterLabels(karmadaClient, targetMember, labels)
})
})

ginkgo.Context("Changes cluster labels to test reschedule while pp matches", func() {
var policy *policyv1alpha1.PropagationPolicy

ginkgo.BeforeEach(func() {
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: deployment.APIVersion,
Kind: deployment.Kind,
Name: deployment.Name,
}}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{labelKey: "ok"},
},
},
})
})

ginkgo.BeforeEach(func() {
framework.CreatePropagationPolicy(karmadaClient, policy)

ginkgo.DeferCleanup(func() {
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
})

framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
func(deployment *appsv1.Deployment) bool { return true })
})

ginkgo.It("change labels to testing deployment reschedule", func() {
labelsUpdate := map[string]string{labelKey: "not_ok"}
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)

labelsUpdate = map[string]string{labelKey: "ok"}
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
func(deployment *appsv1.Deployment) bool { return true })
})
})

ginkgo.Context("Changes cluster labels to test reschedule while cpp matches", func() {
var policy *policyv1alpha1.ClusterPropagationPolicy

ginkgo.BeforeEach(func() {
policy = testhelper.NewClusterPropagationPolicy(policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: deployment.APIVersion,
Kind: deployment.Kind,
Name: deployment.Name,
}}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{labelKey: "ok"},
},
},
})
})

ginkgo.BeforeEach(func() {
framework.CreateClusterPropagationPolicy(karmadaClient, policy)

ginkgo.DeferCleanup(func() {
framework.RemoveClusterPropagationPolicy(karmadaClient, policy.Name)
})

framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
func(deployment *appsv1.Deployment) bool { return true })
})

ginkgo.It("change labels to testing deployment reschedule", func() {
labelsUpdate := map[string]string{labelKey: "not_ok"}
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)

labelsUpdate = map[string]string{labelKey: "ok"}
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
func(deployment *appsv1.Deployment) bool { return true })
})
})
})

0 comments on commit f029394

Please sign in to comment.