Skip to content

Commit

Permalink
default interpreter aggregate status: set sub-test name and parallelize
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Alavi <[email protected]>
  • Loading branch information
a7i committed Jan 9, 2023
1 parent 7b4c541 commit 05a98cc
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 19 deletions.
46 changes: 46 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -31,6 +33,7 @@ func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggre
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = aggregatePodStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = aggregatePersistentVolumeStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = aggregatePersistentVolumeClaimStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = aggregatePodDisruptionBudgetStatus
return s
}

Expand Down Expand Up @@ -455,3 +458,46 @@ func aggregatePersistentVolumeClaimStatus(object *unstructured.Unstructured, agg
pvc.Status = *newStatus
return helper.ToUnstructured(pvc)
}

func aggregatePodDisruptionBudgetStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pdb := &policyv1.PodDisruptionBudget{}
err := helper.ConvertToTypedObject(object, pdb)
if err != nil {
return nil, err
}

newStatus := &policyv1.PodDisruptionBudgetStatus{
DisruptedPods: make(map[string]metav1.Time),
}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}

temp := &policyv1.PodDisruptionBudgetStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof(
"Grab pdb(%s/%s) status from cluster(%s), desired healthy: %d, current healthy: %d, disrupted allowed: %d, expected: %d",
pdb.Namespace, pdb.Name, item.ClusterName,
temp.DesiredHealthy, temp.CurrentHealthy, temp.DisruptionsAllowed, temp.ExpectedPods,
)

newStatus.CurrentHealthy += temp.CurrentHealthy
newStatus.DesiredHealthy += temp.DesiredHealthy
newStatus.ExpectedPods += temp.ExpectedPods
newStatus.DisruptionsAllowed += temp.DisruptionsAllowed
for podName, evictionTime := range temp.DisruptedPods {
newStatus.DisruptedPods[item.ClusterName+"/"+podName] = evictionTime
}
}

if reflect.DeepEqual(pdb.Status, *newStatus) {
klog.V(3).Infof("ignore update pdb(%s/%s) status as up to date", pdb.Namespace, pdb.Name)
return object, nil
}

pdb.Status = *newStatus
return helper.ToUnstructured(pdb)
}
94 changes: 94 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/aggregatestatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

Expand Down Expand Up @@ -843,3 +844,96 @@ func TestAggregatePVStatus(t *testing.T) {
assert.Equal(t, tt.expectedObj, actualObj, tt.name)
}
}

func TestAggregatedPodDisruptionBudgetStatus(t *testing.T) {
currPdbObj, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{
Status: policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 1,
DesiredHealthy: 1,
DisruptionsAllowed: 1,
ExpectedPods: 1,
},
})

expectedPdbObj, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{
Status: policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 2,
DesiredHealthy: 2,
DisruptionsAllowed: 2,
ExpectedPods: 2,
},
})

healthyStatusRaw, _ := helper.BuildStatusRawExtension(map[string]interface{}{
"currentHealthy": 1,
"desiredHealthy": 1,
"disruptionsAllowed": 1,
"expectedPods": 1,
})

evictionTime := metav1.Now()

unHealthyStatusRaw, _ := helper.BuildStatusRawExtension(map[string]interface{}{
"currentHealthy": 0,
"desiredHealthy": 1,
"disruptionsAllowed": 0,
"expectedPods": 1,
"disruptedPods": map[string]metav1.Time{
"pod-1234": evictionTime,
},
})

expectedUnhealthyPdbObj, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{
Status: policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 0,
DesiredHealthy: 2,
DisruptionsAllowed: 0,
ExpectedPods: 2,
DisruptedPods: map[string]metav1.Time{
"member1/pod-1234": evictionTime,
"member2/pod-1234": evictionTime,
},
},
})

aggregateStatusItems := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: healthyStatusRaw, Applied: true},
{ClusterName: "member2", Status: healthyStatusRaw, Applied: true},
}

unhealthyAggregateStatusItems := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: unHealthyStatusRaw, Applied: true},
{ClusterName: "member2", Status: unHealthyStatusRaw, Applied: true},
}

for _, tt := range []struct {
name string
curObj *unstructured.Unstructured
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
expectedObj *unstructured.Unstructured
}{
{
name: "update pdb status",
curObj: currPdbObj,
aggregatedStatusItems: aggregateStatusItems,
expectedObj: expectedPdbObj,
},
{
name: "update pdb status with disrupted pods",
curObj: currPdbObj,
aggregatedStatusItems: unhealthyAggregateStatusItems,
expectedObj: expectedUnhealthyPdbObj,
},
{
name: "ignore update pdb status as up to date",
curObj: expectedPdbObj,
aggregatedStatusItems: aggregateStatusItems,
expectedObj: expectedPdbObj,
},
} {
t.Run(tt.name, func(t *testing.T) {
actualObj, _ := aggregatePodDisruptionBudgetStatus(tt.curObj, tt.aggregatedStatusItems)
assert.Equal(t, tt.expectedObj, actualObj)
})
}
}
12 changes: 12 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/healthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"

Expand All @@ -22,6 +23,7 @@ func getAllDefaultHealthInterpreter() map[schema.GroupVersionKind]healthInterpre
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = interpretServiceHealth
s[networkingv1.SchemeGroupVersion.WithKind(util.IngressKind)] = interpretIngressHealth
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = interpretPersistentVolumeClaimHealth
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = interpretPodDisruptionBudgetHealth
return s
}

Expand Down Expand Up @@ -144,3 +146,13 @@ func interpretPersistentVolumeClaimHealth(object *unstructured.Unstructured) (bo

return pvc.Status.Phase == corev1.ClaimBound, nil
}

func interpretPodDisruptionBudgetHealth(object *unstructured.Unstructured) (bool, error) {
pdb := &policyv1.PodDisruptionBudget{}
err := helper.ConvertToTypedObject(object, pdb)
if err != nil {
return false, err
}

return pdb.Status.CurrentHealthy >= pdb.Status.DesiredHealthy, nil
}
53 changes: 53 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/healthy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,56 @@ func Test_interpretPersistentVolumeClaimHealth(t *testing.T) {
})
}
}

func Test_interpretPodDisruptionBudgetHealth(t *testing.T) {
tests := []struct {
name string
object *unstructured.Unstructured
want bool
}{
{
name: "podDisruptionBudget healthy",
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"desiredHealthy": 2,
"currentHealthy": 3,
},
},
},
want: true,
},
{
name: "podDisruptionBudget healthy when desired healthy equals to current healthy pods",
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"desiredHealthy": 2,
"currentHealthy": 2,
},
},
},
want: true,
},
{
name: "podDisruptionBudget does not allow further disruption when number of healthy pods is less than desired",
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"desiredHealthy": 2,
"currentHealthy": 1,
},
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := interpretPodDisruptionBudgetHealth(tt.object)
if got != tt.want {
t.Errorf("interpretPodDisruptionBudgetHealth() = %v, want %v", got, tt.want)
}
})
}
}
31 changes: 31 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/reflectstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -26,6 +27,7 @@ func getAllDefaultReflectStatusInterpreter() map[schema.GroupVersionKind]reflect
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = reflectJobStatus
s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = reflectDaemonSetStatus
s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = reflectStatefulSetStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = reflectPodDisruptionBudgetStatus
return s
}

Expand Down Expand Up @@ -189,6 +191,35 @@ func reflectStatefulSetStatus(object *unstructured.Unstructured) (*runtime.RawEx
return helper.BuildStatusRawExtension(grabStatus)
}

func reflectPodDisruptionBudgetStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
if err != nil {
klog.Errorf("Failed to get status field from %s(%s/%s), error: %v",
object.GetKind(), object.GetNamespace(), object.GetName(), err)
return nil, err
}
if !exist {
klog.Errorf("Failed to grab status from %s(%s/%s) which should have status field.",
object.GetKind(), object.GetNamespace(), object.GetName())
return nil, nil
}

pdbStatus := &policyv1.PodDisruptionBudgetStatus{}
err = helper.ConvertToTypedObject(statusMap, pdbStatus)
if err != nil {
return nil, fmt.Errorf("failed to convert PodDisruptionBudget from map[string]interface{}: %v", err)
}

grabStatus := policyv1.PodDisruptionBudgetStatus{
DisruptionsAllowed: pdbStatus.DisruptionsAllowed,
ExpectedPods: pdbStatus.ExpectedPods,
DesiredHealthy: pdbStatus.DesiredHealthy,
CurrentHealthy: pdbStatus.CurrentHealthy,
DisruptedPods: pdbStatus.DisruptedPods,
}
return helper.BuildStatusRawExtension(grabStatus)
}

func reflectWholeStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions pkg/resourceinterpreter/defaultinterpreter/reflectstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"reflect"
"testing"

policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

Expand Down Expand Up @@ -72,3 +73,66 @@ func Test_getEntireStatus(t *testing.T) {
})
}
}

func Test_reflectPodDisruptionBudgetStatus(t *testing.T) {
currStatus := policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 1,
DesiredHealthy: 1,
DisruptionsAllowed: 1,
ExpectedPods: 1,
}
currStatusUnstructured, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{Status: currStatus})
wantRawExtension, _ := helper.BuildStatusRawExtension(&currStatus)
type args struct {
object *unstructured.Unstructured
}
tests := []struct {
name string
args args
want *runtime.RawExtension
wantErr bool
}{
{
"object doesn't have status",
args{
&unstructured.Unstructured{
Object: map[string]interface{}{},
},
},
nil,
false,
},
{
"object have wrong format status",
args{
&unstructured.Unstructured{
Object: map[string]interface{}{
"status": "a string",
},
},
},
nil,
true,
},
{
"object have correct format status",
args{
currStatusUnstructured,
},
wantRawExtension,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := reflectPodDisruptionBudgetStatus(tt.args.object)
if (err != nil) != tt.wantErr {
t.Errorf("reflectPodDisruptionBudgetStatus() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("reflectPodDisruptionBudgetStatus() got = %v, want %v", got, tt.want)
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ const (
PersistentVolumeClaimKind = "PersistentVolumeClaim"
// PersistentVolumeKind indicates the target resource is a persistentvolume
PersistentVolumeKind = "PersistentVolume"
// HorizontalPodAutoscalerKind indicated the target resource is a horizontalpodautoscaler
// HorizontalPodAutoscalerKind indicates the target resource is a horizontalpodautoscaler
HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler"
// PodDisruptionBudgetKind indicates the target resource is a poddisruptionbudget
PodDisruptionBudgetKind = "PodDisruptionBudget"

// ServiceExportKind indicates the target resource is a serviceexport crd
ServiceExportKind = "ServiceExport"
Expand Down
Loading

0 comments on commit 05a98cc

Please sign in to comment.