Skip to content

Commit

Permalink
detector watch pp fields changed other than resource selector
Browse files Browse the repository at this point in the history
Signed-off-by: Poor12 <[email protected]>
  • Loading branch information
Poor12 committed Feb 9, 2023
1 parent 8a69413 commit 9ae3e50
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 35 deletions.
69 changes: 64 additions & 5 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unst
// ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy.
func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policy *policyv1alpha1.PropagationPolicy) (err error) {
start := time.Now()
klog.Infof("Applying policy(%s%s) for object: %s", policy.Namespace, policy.Name, objectKey)
klog.Infof("Applying policy(%s/%s) for object: %s", policy.Namespace, policy.Name, objectKey)
var operationResult controllerutil.OperationResult
defer func() {
metrics.ObserveApplyPolicyAttemptAndLatency(object, policy.ObjectMeta, err, start)
Expand Down Expand Up @@ -756,7 +756,7 @@ func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{})
return
}

klog.V(2).Infof("Update PropagationPolicy(%s) resourceSelectors", key)
klog.V(2).Infof("Update PropagationPolicy(%s)", key)
d.policyReconcileWorker.Add(key)
}

Expand Down Expand Up @@ -820,7 +820,7 @@ func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj inter
return
}

klog.V(2).Infof("Update ClusterPropagationPolicy(%s) resourceSelectors", key)
klog.V(2).Infof("Update ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.Add(key)
}

Expand Down Expand Up @@ -969,11 +969,32 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyDeletion(policyName str
// And then check if object in waiting list matches the policy, if yes remove the object
// from waiting list and throw the object to it's reconcile queue. If not, do nothing.
func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *policyv1alpha1.PropagationPolicy) error {
err := d.cleanUnmatchedResourceBindings(policy.Namespace, policy.Name, policy.Spec.ResourceSelectors)
err := d.cleanPPUnmatchedResourceBindings(policy.Namespace, policy.Name, policy.Spec.ResourceSelectors)
if err != nil {
return err
}

// When updating fields other than ResourceSelector, should first find the corresponding ResourceBinding
// and add the bound object to the processor's queue for reconciliation to make sure that
// PropagationPolicy's updates can be synchronized to ResourceBinding.
resourceBindings, err := d.listPPDerivedRB(policy.Namespace, policy.Name)
if err != nil {
return err
}
for _, rb := range resourceBindings.Items {
gv, err := schema.ParseGroupVersion(rb.Spec.Resource.APIVersion)
if err != nil {
return err
}
d.Processor.Add(keys.ClusterWideKey{
Name: rb.Spec.Resource.Name,
Namespace: rb.Spec.Resource.Namespace,
Kind: rb.Spec.Resource.Kind,
Group: gv.Group,
Version: gv.Version,
})
}

matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
klog.Infof("Matched %d resources by policy(%s/%s)", len(matchedKeys), policy.Namespace, policy.Name)

Expand All @@ -1000,7 +1021,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
// And then check if object in waiting list matches the policy, if yes remove the object
// from waiting list and throw the object to it's reconcile queue. If not, do nothing.
func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy *policyv1alpha1.ClusterPropagationPolicy) error {
err := d.cleanUnmatchedResourceBindings("", policy.Name, policy.Spec.ResourceSelectors)
err := d.cleanCPPUnmatchedResourceBindings(policy.Name, policy.Spec.ResourceSelectors)
if err != nil {
return err
}
Expand All @@ -1010,6 +1031,44 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
return err
}

// When updating fields other than ResourceSelector, should first find the corresponding ResourceBinding/ClusterResourceBinding
// and add the bound object to the processor's queue for reconciliation to make sure that
// ClusterPropagationPolicy's updates can be synchronized to ResourceBinding/ClusterResourceBinding.
resourceBindings, err := d.listCPPDerivedRB(policy.Name)
if err != nil {
return err
}
clusterResourceBindings, err := d.listCPPDerivedCRB(policy.Name)
if err != nil {
return err
}
for _, rb := range resourceBindings.Items {
gv, err := schema.ParseGroupVersion(rb.Spec.Resource.APIVersion)
if err != nil {
return err
}
d.Processor.Add(keys.ClusterWideKey{
Name: rb.Spec.Resource.Name,
Namespace: rb.Spec.Resource.Namespace,
Kind: rb.Spec.Resource.Kind,
Group: gv.Group,
Version: gv.Version,
})
}
for _, crb := range clusterResourceBindings.Items {
gv, err := schema.ParseGroupVersion(crb.Spec.Resource.APIVersion)
if err != nil {
return err
}
d.Processor.Add(keys.ClusterWideKey{
Name: crb.Spec.Resource.Name,
Namespace: crb.Spec.Resource.Namespace,
Kind: crb.Spec.Resource.Kind,
Group: gv.Group,
Version: gv.Version,
})
}

matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
klog.Infof("Matched %d resources by policy(%s)", len(matchedKeys), policy.Name)

Expand Down
110 changes: 80 additions & 30 deletions pkg/detector/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,30 +152,41 @@ func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstruc
return d.ApplyClusterPolicy(object, objectKey, matchedClusterPropagationPolicy)
}

func (d *ResourceDetector) cleanUnmatchedResourceBindings(policyNamespace, policyName string, selectors []policyv1alpha1.ResourceSelector) error {
var ls labels.Set
var removeLabels []string
if len(policyNamespace) == 0 {
ls = labels.Set{policyv1alpha1.ClusterPropagationPolicyLabel: policyName}
removeLabels = []string{policyv1alpha1.ClusterPropagationPolicyLabel}
} else {
ls = labels.Set{
policyv1alpha1.PropagationPolicyNamespaceLabel: policyNamespace,
policyv1alpha1.PropagationPolicyNameLabel: policyName,
}
removeLabels = []string{
policyv1alpha1.PropagationPolicyNamespaceLabel,
policyv1alpha1.PropagationPolicyNameLabel,
}
func (d *ResourceDetector) cleanPPUnmatchedResourceBindings(policyNamespace, policyName string, selectors []policyv1alpha1.ResourceSelector) error {
bindings, err := d.listPPDerivedRB(policyNamespace, policyName)
if err != nil {
return err
}

bindings := &workv1alpha2.ResourceBindingList{}
listOpt := &client.ListOptions{LabelSelector: labels.SelectorFromSet(ls)}
err := d.Client.List(context.TODO(), bindings, listOpt)
removeLabels := []string{
policyv1alpha1.PropagationPolicyNamespaceLabel,
policyv1alpha1.PropagationPolicyNameLabel,
}
return d.removeResourceBindingsLabels(bindings, selectors, removeLabels)
}

func (d *ResourceDetector) cleanCPPUnmatchedResourceBindings(policyName string, selectors []policyv1alpha1.ResourceSelector) error {
bindings, err := d.listCPPDerivedRB(policyName)
if err != nil {
klog.Errorf("Failed to list ResourceBinding with policy(%s/%s), error: %v", policyNamespace, policyName, err)
return err
}

removeLabels := []string{
policyv1alpha1.ClusterPropagationPolicyLabel,
}
return d.removeResourceBindingsLabels(bindings, selectors, removeLabels)
}

func (d *ResourceDetector) cleanUnmatchedClusterResourceBinding(policyName string, selectors []policyv1alpha1.ResourceSelector) error {
bindings, err := d.listCPPDerivedCRB(policyName)
if err != nil {
return err
}

return d.removeClusterResourceBindingsLabels(bindings, selectors)
}

func (d *ResourceDetector) removeResourceBindingsLabels(bindings *workv1alpha2.ResourceBindingList, selectors []policyv1alpha1.ResourceSelector, removeLabels []string) error {
var errs []error
for _, binding := range bindings.Items {
removed, err := d.removeResourceLabelsIfNotMatch(binding.Spec.Resource, selectors, removeLabels...)
Expand All @@ -202,20 +213,11 @@ func (d *ResourceDetector) cleanUnmatchedResourceBindings(policyNamespace, polic
if len(errs) > 0 {
return errors.NewAggregate(errs)
}

return nil
}

func (d *ResourceDetector) cleanUnmatchedClusterResourceBinding(policyName string, selectors []policyv1alpha1.ResourceSelector) error {
bindings := &workv1alpha2.ClusterResourceBindingList{}
listOpt := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
policyv1alpha1.ClusterPropagationPolicyLabel: policyName,
})}
err := d.Client.List(context.TODO(), bindings, listOpt)
if err != nil {
klog.Errorf("Failed to list ClusterResourceBinding with policy(%s), error: %v", policyName, err)
}

func (d *ResourceDetector) removeClusterResourceBindingsLabels(bindings *workv1alpha2.ClusterResourceBindingList, selectors []policyv1alpha1.ResourceSelector) error {
var errs []error
for _, binding := range bindings.Items {
removed, err := d.removeResourceLabelsIfNotMatch(binding.Spec.Resource, selectors, []string{policyv1alpha1.ClusterPropagationPolicyLabel}...)
Expand Down Expand Up @@ -270,3 +272,51 @@ func (d *ResourceDetector) removeResourceLabelsIfNotMatch(objectReference workv1
}
return true, nil
}

func (d *ResourceDetector) listPPDerivedRB(policyNamespace, policyName string) (*workv1alpha2.ResourceBindingList, error) {
bindings := &workv1alpha2.ResourceBindingList{}
listOpt := &client.ListOptions{
Namespace: policyNamespace,
LabelSelector: labels.SelectorFromSet(labels.Set{
policyv1alpha1.PropagationPolicyNamespaceLabel: policyNamespace,
policyv1alpha1.PropagationPolicyNameLabel: policyName,
}),
}
err := d.Client.List(context.TODO(), bindings, listOpt)
if err != nil {
klog.Errorf("Failed to list ResourceBinding with policy(%s/%s), error: %v", policyNamespace, policyName, err)
return nil, err
}

return bindings, nil
}

func (d *ResourceDetector) listCPPDerivedRB(policyName string) (*workv1alpha2.ResourceBindingList, error) {
bindings := &workv1alpha2.ResourceBindingList{}
listOpt := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
policyv1alpha1.ClusterPropagationPolicyLabel: policyName,
})}
err := d.Client.List(context.TODO(), bindings, listOpt)
if err != nil {
klog.Errorf("Failed to list ResourceBinding with policy(%s), error: %v", policyName, err)
return nil, err
}

return bindings, nil
}

func (d *ResourceDetector) listCPPDerivedCRB(policyName string) (*workv1alpha2.ClusterResourceBindingList, error) {
bindings := &workv1alpha2.ClusterResourceBindingList{}
listOpt := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
policyv1alpha1.ClusterPropagationPolicyLabel: policyName,
})}
err := d.Client.List(context.TODO(), bindings, listOpt)
if err != nil {
klog.Errorf("Failed to list ClusterResourceBinding with policy(%s), error: %v", policyName, err)
return nil, err
}

return bindings, nil
}
73 changes: 73 additions & 0 deletions test/e2e/clusterpropagationpolicy_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package e2e

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -321,6 +324,76 @@ var _ = ginkgo.Describe("[AdvancedClusterPropagation] propagation testing", func
})
})
})

ginkgo.Context("Edit ClusterPropagationPolicy PropagateDeps", func() {

ginkgo.When("namespace scope resource", func() {
var policy *policyv1alpha1.ClusterPropagationPolicy
var deployment *appsv1.Deployment
var targetMember string

ginkgo.BeforeEach(func() {
targetMember = framework.ClusterNames()[0]
policyName := deploymentNamePrefix + rand.String(RandomStrLength)

deployment = testhelper.NewDeployment(testNamespace, policyName+"01")

policy = testhelper.NewClusterPropagationPolicy(policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: deployment.APIVersion,
Kind: deployment.Kind,
Name: deployment.Name,
}}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
ClusterNames: []string{targetMember},
},
})
})

ginkgo.BeforeEach(func() {
framework.CreateClusterPropagationPolicy(karmadaClient, policy)
framework.CreateDeployment(kubeClient, deployment)
ginkgo.DeferCleanup(func() {
framework.RemoveClusterPropagationPolicy(karmadaClient, policy.Name)
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
})

gomega.Eventually(func() bool {
bindings, err := karmadaClient.WorkV1alpha2().ResourceBindings(testNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
policyv1alpha1.ClusterPropagationPolicyLabel: policy.Name,
}).String(),
})
if err != nil {
return false
}
return len(bindings.Items) != 0
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})

ginkgo.It("update policy propagateDeps", func() {
patch := []map[string]interface{}{
{
"op": "replace",
"path": "/spec/propagateDeps",
"value": true,
},
}
framework.PatchClusterPropagationPolicy(karmadaClient, policy.Name, patch, types.JSONPatchType)
gomega.Eventually(func() bool {
bindings, err := karmadaClient.WorkV1alpha2().ResourceBindings(testNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
policyv1alpha1.ClusterPropagationPolicyLabel: policy.Name,
}).String(),
})
if err != nil {
return false
}
return bindings.Items[0].Spec.PropagateDeps == true
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})
})
})
})

// ExplicitPriority more than one CPP matches the object, we should select the one with the highest explicit priority, if the
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/framework/clusterpropagationpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package framework

import (
"context"
"encoding/json"
"fmt"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
karmada "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
Expand All @@ -28,6 +30,17 @@ func RemoveClusterPropagationPolicy(client karmada.Interface, name string) {
})
}

// PatchClusterPropagationPolicy patch ClusterPropagationPolicy with karmada client.
func PatchClusterPropagationPolicy(client karmada.Interface, name string, patch []map[string]interface{}, patchType types.PatchType) {
ginkgo.By(fmt.Sprintf("Patching ClusterPropagationPolicy(%s)", name), func() {
patchBytes, err := json.Marshal(patch)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())

_, err = client.PolicyV1alpha1().ClusterPropagationPolicies().Patch(context.TODO(), name, patchType, patchBytes, metav1.PatchOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}

// UpdateClusterPropagationPolicy update ClusterPropagationPolicy resourceSelectors with karmada client.
func UpdateClusterPropagationPolicy(client karmada.Interface, name string, resourceSelectors []policyv1alpha1.ResourceSelector) {
ginkgo.By(fmt.Sprintf("Updating ClusterPropagationPolicy(%s)", name), func() {
Expand Down
Loading

0 comments on commit 9ae3e50

Please sign in to comment.