Skip to content

Commit

Permalink
fix unlimited retries when namespace deleted after cm changed (istio#…
Browse files Browse the repository at this point in the history
…35150)

* fix unlimited retries when namespace deleted immediately after istio-ca-root-cert changed (istio#35149)

Signed-off-by: pangshaoqiang <[email protected]>

* Using workqueue to handle events in NamespaceController

* some fix
  • Loading branch information
pangsq authored Sep 28, 2021
1 parent f11ecbf commit e06118d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 64 deletions.
124 changes: 62 additions & 62 deletions pilot/pkg/serviceregistry/kube/controller/namespacecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/inject"
"istio.io/istio/pkg/queue"
"istio.io/istio/security/pkg/k8s"
)

Expand All @@ -48,7 +49,7 @@ type NamespaceController struct {
getData func() map[string]string
client corev1.CoreV1Interface

queue queue.Instance
queue workqueue.RateLimitingInterface
namespacesInformer cache.SharedInformer
configMapInformer cache.SharedInformer
namespaceLister listerv1.NamespaceLister
Expand All @@ -60,7 +61,7 @@ func NewNamespaceController(data func() map[string]string, kubeClient kube.Clien
c := &NamespaceController{
getData: data,
client: kubeClient.CoreV1(),
queue: queue.NewQueue(time.Second),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

c.configMapInformer = kubeClient.KubeInformer().Core().V1().ConfigMaps().Informer()
Expand All @@ -70,58 +71,19 @@ func NewNamespaceController(data func() map[string]string, kubeClient kube.Clien

c.configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
cm, err := convertToConfigMap(obj)
if err != nil {
log.Errorf("failed to convert to configmap: %v", err)
return
}
// This is a change to a configmap we don't watch, ignore it
if cm.Name != CACertNamespaceConfigMap {
return
}
c.queue.Push(func() error {
return c.configMapChange(cm)
})
c.configMapChange(obj)
},
DeleteFunc: func(obj interface{}) {
cm, err := convertToConfigMap(obj)
if err != nil {
log.Errorf("failed to convert to configmap: %v", err)
return
}
// This is a change to a configmap we don't watch, ignore it
if cm.Name != CACertNamespaceConfigMap {
return
}
c.queue.Push(func() error {
ns, err := c.namespaceLister.Get(cm.Namespace)
if err != nil {
// namespace is deleted before
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// If the namespace is terminating, we may get into a loop of trying to re-add the configmap back
// We should make sure the namespace still exists
if ns.Status.Phase != v1.NamespaceTerminating {
return c.insertDataForNamespace(cm.Namespace)
}
return nil
})
c.configMapChange(obj)
},
})

c.namespacesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.queue.Push(func() error {
return c.namespaceChange(obj.(*v1.Namespace))
})
c.namespaceChange(obj.(*v1.Namespace))
},
UpdateFunc: func(_, obj interface{}) {
c.queue.Push(func() error {
return c.namespaceChange(obj.(*v1.Namespace))
})
c.namespaceChange(obj.(*v1.Namespace))
},
})

Expand All @@ -130,11 +92,41 @@ func NewNamespaceController(data func() map[string]string, kubeClient kube.Clien

// Run starts the NamespaceController until a value is sent to stopCh.
func (nc *NamespaceController) Run(stopCh <-chan struct{}) {
defer nc.queue.ShutDown()

if !cache.WaitForCacheSync(stopCh, nc.namespacesInformer.HasSynced, nc.configMapInformer.HasSynced) {
log.Error("Failed to sync namespace controller cache")
return
}
log.Infof("Namespace controller started")
go nc.queue.Run(stopCh)

go wait.Until(nc.runWorker, time.Second, stopCh)

<-stopCh
}

func (nc *NamespaceController) runWorker() {
for nc.processNextWorkItem() {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when
// it's time to quit.
func (nc *NamespaceController) processNextWorkItem() bool {
key, quit := nc.queue.Get()
if quit {
return false
}
defer nc.queue.Done(key)

if err := nc.insertDataForNamespace(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("insertDataForNamespace %q failed: %v", key, err))
nc.queue.AddRateLimited(key)
return true
}

nc.queue.Forget(key)
return true
}

// insertDataForNamespace will add data into the configmap for the specified namespace
Expand All @@ -151,26 +143,34 @@ func (nc *NamespaceController) insertDataForNamespace(ns string) error {

// On namespace change, update the config map.
// If terminating, this will be skipped
func (nc *NamespaceController) namespaceChange(ns *v1.Namespace) error {
// skip special kubernetes system namespaces
for _, namespace := range inject.IgnoredNamespaces {
if ns.Name == namespace {
return nil
}
func (nc *NamespaceController) namespaceChange(ns *v1.Namespace) {
if ns.Status.Phase != v1.NamespaceTerminating {
nc.syncNamespace(ns.Name)
}
}

if ns.Status.Phase != v1.NamespaceTerminating {
return nc.insertDataForNamespace(ns.Name)
// On configMap change(update or delete), try to create or update the config map.
func (nc *NamespaceController) configMapChange(obj interface{}) {
cm, err := convertToConfigMap(obj)
if err != nil {
log.Errorf("failed to convert to configmap: %v", err)
return
}
// This is a change to a configmap we don't watch, ignore it
if cm.Name != CACertNamespaceConfigMap {
return
}
return nil
nc.syncNamespace(cm.Namespace)
}

// When a config map is changed, merge the data into the configmap
func (nc *NamespaceController) configMapChange(cm *v1.ConfigMap) error {
if err := k8s.UpdateDataInConfigMap(nc.client, cm.DeepCopy(), nc.getData()); err != nil {
return fmt.Errorf("error when inserting CA cert to configmap %v: %v", cm.Name, err)
func (nc *NamespaceController) syncNamespace(ns string) {
// skip special kubernetes system namespaces
for _, namespace := range inject.IgnoredNamespaces {
if ns == namespace {
return
}
}
return nil
nc.queue.Add(ns)
}

func convertToConfigMap(obj interface{}) (*v1.ConfigMap, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestNamespaceController(t *testing.T) {
close(stop)
})
client.RunAndWait(stop)
nc.Run(stop)
go nc.Run(stop)

createNamespace(t, client, "foo", nil)
expectConfigMap(t, nc.configmapLister, "foo", testdata)
Expand Down
3 changes: 2 additions & 1 deletion security/pkg/k8s/configutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func InsertDataToConfigMap(client corev1.ConfigMapsGetter, lister listerv1.Confi
}
if _, err = client.ConfigMaps(meta.Namespace).Create(context.TODO(), configmap, metav1.CreateOptions{}); err != nil {
// Namespace may be deleted between now... and our previous check. Just skip this, we cannot create into deleted ns
if errors.IsNotFound(err) {
// And don't retry a create if the namespace is terminating
if errors.IsNotFound(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
return fmt.Errorf("error when creating configmap %v: %v", meta.Name, err)
Expand Down

0 comments on commit e06118d

Please sign in to comment.