Skip to content

Commit

Permalink
feat: customize pod informer to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
JackZxj committed May 10, 2023
1 parent 5c37ebe commit 27916af
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Options struct {
NSAutoPropExcludeRegexp string
CreateCRDsForFTCs bool
ClusterJoinTimeout time.Duration

MaxPodListers int64
EnablePodPruning bool
}

func NewOptions() *Options {
Expand Down Expand Up @@ -109,6 +112,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl
"The maximum amount of time to wait for a new cluster to join the federation before timing out.",
)

flags.Int64Var(&o.MaxPodListers, "max-pod-listers", 0, "The maximum number of concurrent pod listing requests to member clusters. "+
"A non-positive number means unlimited, but may increase the instantaneous memory usage.")
flags.BoolVar(&o.EnablePodPruning, "enable-pod-pruning", false, "Enable pod pruning for pod informer. "+
"Enabling this can reduce memory usage of the pod informer, but will disable pod propagation.")
o.addKlogFlags(flags)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/controller-manager/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func createControllerContext(opts *options.Options) (*controllercontext.Context,
fedInformerFactory.Core().V1alpha1().FederatedClusters(),
common.DefaultFedSystemNamespace,
restConfig,
opts.MaxPodListers,
opts.EnablePodPruning,
)

return &controllercontext.Context{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
golang.org/x/sync v0.1.0
k8s.io/api v0.26.0
k8s.io/apiextensions-apiserver v0.26.0
k8s.io/apimachinery v0.26.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/util/federatedclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"golang.org/x/sync/semaphore"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -63,6 +64,9 @@ type federatedClientFactory struct {
dynamicClientsetCache map[string]dynamicclient.Interface
kubeInformerCache map[string]kubeinformer.SharedInformerFactory
dynamicInformerCache map[string]dynamicinformer.DynamicSharedInformerFactory

availablePodListers *semaphore.Weighted
enablePodPruning bool
}

func NewFederatedClientsetFactory(
Expand All @@ -71,6 +75,8 @@ func NewFederatedClientsetFactory(
informer fedcorev1a1informers.FederatedClusterInformer,
fedSystemNamespace string,
baseRestConfig *rest.Config,
maxPodListers int64,
enablePodPruning bool,
) FederatedClientFactory {
factory := &federatedClientFactory{
mu: sync.RWMutex{},
Expand All @@ -86,6 +92,10 @@ func NewFederatedClientsetFactory(
dynamicClientsetCache: map[string]dynamicclient.Interface{},
kubeInformerCache: map[string]kubeinformer.SharedInformerFactory{},
dynamicInformerCache: map[string]dynamicinformer.DynamicSharedInformerFactory{},
enablePodPruning: enablePodPruning,
}
if maxPodListers > 0 {
factory.availablePodListers = semaphore.NewWeighted(maxPodListers)
}
factory.handle, _ = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -310,6 +320,7 @@ func (f *federatedClientFactory) processQueueItem(ctx context.Context) {
f.dynamicClientsetCache[name] = dynamicClientset
f.kubeInformerCache[name] = kubeInformerFactory
f.dynamicInformerCache[name] = dynamicInformerFactory
addPodInformer(ctx, kubeInformerFactory, kubeClientset, f.availablePodListers, f.enablePodPruning)
f.mu.Unlock()
}

Expand Down
135 changes: 135 additions & 0 deletions pkg/controllers/util/federatedclient/podinformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2023 The KubeAdmiral Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package federatedclient

import (
"context"
"time"

"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
kubeclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func addPodInformer(ctx context.Context,
informer informers.SharedInformerFactory,
client kubeclient.Interface,
podListerSemaphore *semaphore.Weighted,
enablePodPruning bool) {
informer.InformerFor(&corev1.Pod{}, func(k kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
podListerWatcher(ctx, client, podListerSemaphore, enablePodPruning),
&corev1.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
})
}

func podListerWatcher(ctx context.Context,
client kubeclient.Interface,
semaphore *semaphore.Weighted,
enablePodPruning bool) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if semaphore != nil {
if err := semaphore.Acquire(ctx, 1); err != nil {
return nil, err
}
defer semaphore.Release(1)
}
pods, err := client.CoreV1().Pods(corev1.NamespaceAll).List(ctx, options)
if err != nil {
return nil, err
}
if enablePodPruning {
for i := range pods.Items {
prunePod(&pods.Items[i])
}
}
return pods, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
watcher, err := client.CoreV1().Pods(corev1.NamespaceAll).Watch(ctx, options)
if err != nil {
return nil, err
}
if !enablePodPruning {
return watcher, nil
}

// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better. -- from watch.NewStreamWatcher
proxyCh := make(chan watch.Event)
proxyWatcher := watch.NewProxyWatcher(proxyCh)
go func() {
defer watcher.Stop()
// Closing proxyCh will notify the reflector to stop the current
// watching cycle and then restart the list and watch.
defer close(proxyCh)
for {
select {
case <-proxyWatcher.StopChan():
return
case event, ok := <-watcher.ResultChan():
if !ok {
// the watcher has been closed, stop the proxy
return
}
if pod, ok := event.Object.(*corev1.Pod); ok {
prunePod(pod)
}
proxyCh <- event
}
}
}()
return proxyWatcher, nil
},
}
}

func prunePod(pod *corev1.Pod) {
containers := make([]corev1.Container, len(pod.Spec.Containers))
initContainers := make([]corev1.Container, len(pod.Spec.InitContainers))
for i := range pod.Spec.Containers {
containers[i] = corev1.Container{Resources: pod.Spec.Containers[i].Resources}
}
for i := range pod.Spec.InitContainers {
initContainers[i] = corev1.Container{Resources: pod.Spec.InitContainers[i].Resources}
}
*pod = corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Generation: pod.Generation,
ResourceVersion: pod.ResourceVersion,
UID: pod.UID,
},
Spec: corev1.PodSpec{
NodeName: pod.Spec.NodeName,
Overhead: pod.Spec.Overhead,
Containers: containers,
InitContainers: initContainers,
},
}
}
Loading

0 comments on commit 27916af

Please sign in to comment.