From 27916af9809ec0a81d50401a5f1bec48cb23fa4f Mon Sep 17 00:00:00 2001 From: "zhangxinjie.next" Date: Thu, 27 Apr 2023 20:11:42 +0800 Subject: [PATCH] feat: customize pod informer to reduce memory usage --- cmd/controller-manager/app/options/options.go | 7 + cmd/controller-manager/app/util.go | 2 + go.mod | 1 + go.sum | 2 + .../util/federatedclient/client.go | 11 + .../util/federatedclient/podinformer.go | 135 ++++++++ .../util/federatedclient/podinformer_test.go | 320 ++++++++++++++++++ 7 files changed, 478 insertions(+) create mode 100644 pkg/controllers/util/federatedclient/podinformer.go create mode 100644 pkg/controllers/util/federatedclient/podinformer_test.go diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index d1b1db83..692bae10 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -53,6 +53,9 @@ type Options struct { NSAutoPropExcludeRegexp string CreateCRDsForFTCs bool ClusterJoinTimeout time.Duration + + MaxPodListers int64 + EnablePodPruning bool } func NewOptions() *Options { @@ -109,6 +112,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl "The maximum amount of time to wait for a new cluster to join the federation before timing out.", ) + flags.Int64Var(&o.MaxPodListers, "max-pod-listers", 0, "The maximum number of concurrent pod listing requests to member clusters. "+ + "A non-positive number means unlimited, but may increase the instantaneous memory usage.") + flags.BoolVar(&o.EnablePodPruning, "enable-pod-pruning", false, "Enable pod pruning for pod informer. "+ + "Enabling this can reduce memory usage of the pod informer, but will disable pod propagation.") o.addKlogFlags(flags) } diff --git a/cmd/controller-manager/app/util.go b/cmd/controller-manager/app/util.go index 1f6f9d28..6e05948c 100644 --- a/cmd/controller-manager/app/util.go +++ b/cmd/controller-manager/app/util.go @@ -116,6 +116,8 @@ func createControllerContext(opts *options.Options) (*controllercontext.Context, fedInformerFactory.Core().V1alpha1().FederatedClusters(), common.DefaultFedSystemNamespace, restConfig, + opts.MaxPodListers, + opts.EnablePodPruning, ) return &controllercontext.Context{ diff --git a/go.mod b/go.mod index 370143b7..2c530ef0 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.1 golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 + golang.org/x/sync v0.1.0 k8s.io/api v0.26.0 k8s.io/apiextensions-apiserver v0.26.0 k8s.io/apimachinery v0.26.0 diff --git a/go.sum b/go.sum index 8fa9a353..4b0a2842 100644 --- a/go.sum +++ b/go.sum @@ -323,6 +323,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/controllers/util/federatedclient/client.go b/pkg/controllers/util/federatedclient/client.go index 1cfdd01a..b8f45949 100644 --- a/pkg/controllers/util/federatedclient/client.go +++ b/pkg/controllers/util/federatedclient/client.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -63,6 +64,9 @@ type federatedClientFactory struct { dynamicClientsetCache map[string]dynamicclient.Interface kubeInformerCache map[string]kubeinformer.SharedInformerFactory dynamicInformerCache map[string]dynamicinformer.DynamicSharedInformerFactory + + availablePodListers *semaphore.Weighted + enablePodPruning bool } func NewFederatedClientsetFactory( @@ -71,6 +75,8 @@ func NewFederatedClientsetFactory( informer fedcorev1a1informers.FederatedClusterInformer, fedSystemNamespace string, baseRestConfig *rest.Config, + maxPodListers int64, + enablePodPruning bool, ) FederatedClientFactory { factory := &federatedClientFactory{ mu: sync.RWMutex{}, @@ -86,6 +92,10 @@ func NewFederatedClientsetFactory( dynamicClientsetCache: map[string]dynamicclient.Interface{}, kubeInformerCache: map[string]kubeinformer.SharedInformerFactory{}, dynamicInformerCache: map[string]dynamicinformer.DynamicSharedInformerFactory{}, + enablePodPruning: enablePodPruning, + } + if maxPodListers > 0 { + factory.availablePodListers = semaphore.NewWeighted(maxPodListers) } factory.handle, _ = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -310,6 +320,7 @@ func (f *federatedClientFactory) processQueueItem(ctx context.Context) { f.dynamicClientsetCache[name] = dynamicClientset f.kubeInformerCache[name] = kubeInformerFactory f.dynamicInformerCache[name] = dynamicInformerFactory + addPodInformer(ctx, kubeInformerFactory, kubeClientset, f.availablePodListers, f.enablePodPruning) f.mu.Unlock() } diff --git a/pkg/controllers/util/federatedclient/podinformer.go b/pkg/controllers/util/federatedclient/podinformer.go new file mode 100644 index 00000000..5cc6b898 --- /dev/null +++ b/pkg/controllers/util/federatedclient/podinformer.go @@ -0,0 +1,135 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedclient + +import ( + "context" + "time" + + "golang.org/x/sync/semaphore" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" + kubeclient "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func addPodInformer(ctx context.Context, + informer informers.SharedInformerFactory, + client kubeclient.Interface, + podListerSemaphore *semaphore.Weighted, + enablePodPruning bool) { + informer.InformerFor(&corev1.Pod{}, func(k kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + podListerWatcher(ctx, client, podListerSemaphore, enablePodPruning), + &corev1.Pod{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + }) +} + +func podListerWatcher(ctx context.Context, + client kubeclient.Interface, + semaphore *semaphore.Weighted, + enablePodPruning bool) cache.ListerWatcher { + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if semaphore != nil { + if err := semaphore.Acquire(ctx, 1); err != nil { + return nil, err + } + defer semaphore.Release(1) + } + pods, err := client.CoreV1().Pods(corev1.NamespaceAll).List(ctx, options) + if err != nil { + return nil, err + } + if enablePodPruning { + for i := range pods.Items { + prunePod(&pods.Items[i]) + } + } + return pods, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + watcher, err := client.CoreV1().Pods(corev1.NamespaceAll).Watch(ctx, options) + if err != nil { + return nil, err + } + if !enablePodPruning { + return watcher, nil + } + + // It's easy for a consumer to add buffering via an extra + // goroutine/channel, but impossible for them to remove it, + // so nonbuffered is better. -- from watch.NewStreamWatcher + proxyCh := make(chan watch.Event) + proxyWatcher := watch.NewProxyWatcher(proxyCh) + go func() { + defer watcher.Stop() + // Closing proxyCh will notify the reflector to stop the current + // watching cycle and then restart the list and watch. + defer close(proxyCh) + for { + select { + case <-proxyWatcher.StopChan(): + return + case event, ok := <-watcher.ResultChan(): + if !ok { + // the watcher has been closed, stop the proxy + return + } + if pod, ok := event.Object.(*corev1.Pod); ok { + prunePod(pod) + } + proxyCh <- event + } + } + }() + return proxyWatcher, nil + }, + } +} + +func prunePod(pod *corev1.Pod) { + containers := make([]corev1.Container, len(pod.Spec.Containers)) + initContainers := make([]corev1.Container, len(pod.Spec.InitContainers)) + for i := range pod.Spec.Containers { + containers[i] = corev1.Container{Resources: pod.Spec.Containers[i].Resources} + } + for i := range pod.Spec.InitContainers { + initContainers[i] = corev1.Container{Resources: pod.Spec.InitContainers[i].Resources} + } + *pod = corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Generation: pod.Generation, + ResourceVersion: pod.ResourceVersion, + UID: pod.UID, + }, + Spec: corev1.PodSpec{ + NodeName: pod.Spec.NodeName, + Overhead: pod.Spec.Overhead, + Containers: containers, + InitContainers: initContainers, + }, + } +} diff --git a/pkg/controllers/util/federatedclient/podinformer_test.go b/pkg/controllers/util/federatedclient/podinformer_test.go new file mode 100644 index 00000000..98173076 --- /dev/null +++ b/pkg/controllers/util/federatedclient/podinformer_test.go @@ -0,0 +1,320 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedclient + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "golang.org/x/sync/semaphore" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + kubeinformer "k8s.io/client-go/informers" + kubeclient "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" + "k8s.io/client-go/tools/cache" + + "github.com/kubewharf/kubeadmiral/pkg/controllers/util" +) + +type fakePodsWithConcurrencyLimit struct { + corev1client.PodInterface + concurrentLists *atomic.Int64 +} + +func (c *fakePodsWithConcurrencyLimit) List(ctx context.Context, opts metav1.ListOptions) (result *corev1.PodList, err error) { + cur := c.concurrentLists.Add(-1) + defer c.concurrentLists.Add(1) + if cur < 0 { + panic("limit exceeded") + } + return c.PodInterface.List(ctx, opts) +} + +type fakeCoreV1WithConcurrencyLimit struct { + fakecorev1.FakeCoreV1 + concurrentLists *atomic.Int64 +} + +func (c *fakeCoreV1WithConcurrencyLimit) Pods(namespace string) corev1client.PodInterface { + return &fakePodsWithConcurrencyLimit{c.FakeCoreV1.Pods(namespace), c.concurrentLists} +} + +var _ kubeclient.Interface = &fakeClientsetWithConcurrencyLimit{} + +type fakeClientsetWithConcurrencyLimit struct { + *fake.Clientset + concurrentLists *atomic.Int64 +} + +func (c *fakeClientsetWithConcurrencyLimit) CoreV1() corev1client.CoreV1Interface { + return &fakeCoreV1WithConcurrencyLimit{fakecorev1.FakeCoreV1{Fake: &c.Clientset.Fake}, c.concurrentLists} +} + +func newFakeClientset(concurrentLists int64, objects ...runtime.Object) *fakeClientsetWithConcurrencyLimit { + client := fake.NewSimpleClientset(objects...) + var limit atomic.Int64 + limit.Add(concurrentLists) + return &fakeClientsetWithConcurrencyLimit{ + Clientset: client, + concurrentLists: &limit, + } +} + +func setupFakeClient(concurrentLists int64, numPods int) (pods []runtime.Object, client kubeclient.Interface) { + pods = make([]runtime.Object, numPods) + for i := 0; i < numPods; i++ { + ns := fmt.Sprintf("test-ns-%d", i%10) + name := fmt.Sprintf("test-name-%d", i) + pods[i] = newPod(ns, name) + } + return pods, newFakeClientset(concurrentLists, pods...) +} + +func newPod(ns, name string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"foo": "bar"}, + Namespace: ns, + }, + Spec: corev1.PodSpec{ + Overhead: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1m"), + }, + InitContainers: []corev1.Container{ + { + Name: "initcontainer", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1m"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1m"), + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + }, + }, + }, + }, + }, + } +} + +func Test_podInformer(t *testing.T) { + tests := []struct { + name string + clusters int + podNum int + availablePodListers int64 + enablePodPruning bool + }{ + { + name: "4 clusters, prune pod, limit 2", + clusters: 4, + podNum: 3000, + availablePodListers: 2, + enablePodPruning: true, + }, + { + name: "4 clusters, prune pod, no limit", + clusters: 4, + podNum: 3000, + availablePodListers: 0, + enablePodPruning: true, + }, + { + name: "4 clusters, regular pod, limit 2", + clusters: 4, + podNum: 3000, + availablePodListers: 2, + enablePodPruning: false, + }, + { + name: "4 clusters, regular pod, no limit", + clusters: 4, + podNum: 3000, + availablePodListers: 0, + enablePodPruning: false, + }, + } + + type memberCluster struct { + name string + client kubeclient.Interface + informer kubeinformer.SharedInformerFactory + enablePruning bool + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var sem *semaphore.Weighted + if tt.availablePodListers > 0 { + sem = semaphore.NewWeighted(tt.availablePodListers) + } + ctx := context.Background() + clusters := make([]memberCluster, tt.clusters) + for i := 0; i < tt.clusters; i++ { + var client kubeclient.Interface + if tt.availablePodListers == 0 { + _, client = setupFakeClient(int64(tt.clusters), tt.podNum) + } else { + _, client = setupFakeClient(tt.availablePodListers, tt.podNum) + } + informer := kubeinformer.NewSharedInformerFactory(client, util.NoResyncPeriod) + clusters[i] = memberCluster{ + name: fmt.Sprintf("member-cluster-%d", i), + client: client, + informer: informer, + enablePruning: tt.enablePodPruning, + } + } + + for i := range clusters { + addPodInformer(ctx, clusters[i].informer, clusters[i].client, sem, clusters[i].enablePruning) + clusters[i].informer.Start(ctx.Done()) + } + var wg sync.WaitGroup + wg.Add(tt.clusters) + for i := range clusters { + go func(i int) { + defer wg.Done() + + podLister := clusters[i].informer.Core().V1().Pods().Lister() + podsSynced := clusters[i].informer.Core().V1().Pods().Informer().HasSynced + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // test lister + if !cache.WaitForNamedCacheSync(clusters[i].name, ctx.Done(), podsSynced) { + t.Errorf("%s should be synced, but it was not\n", clusters[i].name) + return + } + pods, err := podLister.List(labels.Everything()) + if err != nil { + t.Errorf("%s: unexpected error when listing pod: %v\n", clusters[i].name, err) + return + } + if len(pods) != tt.podNum { + t.Errorf("%s: Expected %d pods, got %d pods", clusters[i].name, tt.podNum, len(pods)) + return + } + + // test watcher + watchPodName := fmt.Sprintf("watch-pod-%d", rand.Intn(tt.podNum)) + watchPodNamespace := "watch" + watchPod := newPod(watchPodNamespace, watchPodName) + handlerFn := func(obj interface{}, ch chan struct{}) error { + pod, ok := obj.(*corev1.Pod) + if !ok { + return fmt.Errorf("expected to handle pod, but got: %v", obj) + } + if pod.Name == watchPodName { + close(ch) + } + if clusters[i].enablePruning && pod.Labels != nil { + return fmt.Errorf("expect no label for pod, but got labels: %v", pod.Labels) + } + if !clusters[i].enablePruning && pod.Labels == nil { + return fmt.Errorf("expected to get lables from pod, but got nil") + } + return nil + } + addCh, updateCh, deleteCh := make(chan struct{}), make(chan struct{}), make(chan struct{}) + clusters[i].informer.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if err := handlerFn(obj, addCh); err != nil { + t.Errorf("unexpected err: %v", err) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if err := handlerFn(newObj, updateCh); err != nil { + t.Errorf("unexpected err: %v", err) + } + }, + DeleteFunc: func(obj interface{}) { + if err := handlerFn(obj, deleteCh); err != nil { + t.Errorf("unexpected err: %v", err) + } + }, + }) + + if watchPod, err = clusters[i].client.CoreV1().Pods(watchPodNamespace).Create( + ctx, watchPod, metav1.CreateOptions{}); err != nil { + t.Errorf("unexpected error when creating pod: %v", err) + return + } + select { + case <-addCh: + case <-time.After(time.Second): + t.Errorf("timeout for adding pod") + return + } + + watchPod.Labels = map[string]string{watchPodName: fmt.Sprintf("foo-%d", rand.Intn(tt.podNum))} + if watchPod, err = clusters[i].client.CoreV1().Pods(watchPodNamespace).Update( + ctx, watchPod, metav1.UpdateOptions{}); err != nil { + t.Errorf("unexpected error when updating pod: %v", err) + return + } + if !clusters[i].enablePruning { + select { + case <-updateCh: + case <-time.After(time.Second): + t.Errorf("timeout for updating pod") + return + } + } + + if err = clusters[i].client.CoreV1().Pods(watchPodNamespace).Delete( + ctx, watchPod.Name, metav1.DeleteOptions{}); err != nil { + t.Errorf("unexpected error when deleting pod: %v", err) + return + } + select { + case <-deleteCh: + case <-time.After(time.Second): + t.Errorf("timeout for deleting pod") + return + } + }(i) + } + wg.Wait() + }) + } +}