Skip to content

Commit

Permalink
SpecSyncers for upstream and downstream
Browse files Browse the repository at this point in the history
...

Signed-off-by: David Festal <[email protected]>
  • Loading branch information
davidfestal committed Apr 6, 2023
1 parent b648f4a commit bd8cfe8
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 126 deletions.
9 changes: 7 additions & 2 deletions pkg/syncer/spec/mutators/podspecable.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

type ListSecretFunc func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error)
type GetWorkspaceURLFunc func(obj *unstructured.Unstructured) (*url.URL, error)
type GetClusterDDSIFFunc func(clusterName logicalcluster.Name) (*ddsif.DiscoveringDynamicSharedInformerFactory, error)

type PodSpecableMutator struct {
getWorkspaceURL GetWorkspaceURLFunc
Expand Down Expand Up @@ -71,7 +72,7 @@ func (dm *PodSpecableMutator) GVRs() []schema.GroupVersionResource {
}
}

func NewPodspecableMutator(ddsifForUpstreamSyncer *ddsif.DiscoveringDynamicSharedInformerFactory, serviceLister listerscorev1.ServiceLister,
func NewPodspecableMutator(ddsifForUpstreamSyncer GetClusterDDSIFFunc, serviceLister listerscorev1.ServiceLister,
syncTargetClusterName logicalcluster.Name, syncTargetName string, syncTargetUID types.UID,
dnsNamespace string, upsyncPods bool) *PodSpecableMutator {
secretsGVR := corev1.SchemeGroupVersion.WithResource("secrets")
Expand All @@ -84,7 +85,11 @@ func NewPodspecableMutator(ddsifForUpstreamSyncer *ddsif.DiscoveringDynamicShare
return url.Parse(workspaceURL)
},
listSecrets: func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) {
informers, notSynced := ddsifForUpstreamSyncer.Informers()
ddsif, err := ddsifForUpstreamSyncer(clusterName)
if err != nil {
return nil, err
}
informers, notSynced := ddsif.Informers()
informer, ok := informers[secretsGVR]
if !ok {
if shared.ContainsGVR(notSynced, secretsGVR) {
Expand Down
249 changes: 169 additions & 80 deletions pkg/syncer/spec/spec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
syncerindexers "github.com/kcp-dev/kcp/pkg/syncer/indexers"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/syncer/spec/dns"
"github.com/kcp-dev/kcp/pkg/syncer/synctarget"
workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1"
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
)
Expand All @@ -69,8 +70,8 @@ type Controller struct {
mutators map[schema.GroupVersionResource]Mutator
dnsProcessor *dns.DNSProcessor

upstreamClient kcpdynamic.ClusterInterface
downstreamClient dynamic.Interface
getUpstreamClient func(clusterName logicalcluster.Name) (dynamic.Interface, error)
downstreamClient dynamic.Interface

getUpstreamLister func(clusterName logicalcluster.Name, gvr schema.GroupVersionResource) (cache.GenericLister, error)
getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error)
Expand All @@ -89,7 +90,6 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
upstreamClient kcpdynamic.ClusterInterface, downstreamClient dynamic.Interface, downstreamKubeClient kubernetes.Interface,
ddsifForUpstreamSyncer *ddsif.DiscoveringDynamicSharedInformerFactory,
ddsifForDownstream *ddsif.GenericDiscoveringDynamicSharedInformerFactory[cache.SharedIndexInformer, cache.GenericLister, informers.GenericInformer],
addDownstreamEventHandler func(ddsif.GVREventHandler),
downstreamNSCleaner shared.Cleaner,
syncTargetUID types.UID,
dnsNamespace string,
Expand All @@ -99,7 +99,9 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
c := Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),

upstreamClient: upstreamClient,
getUpstreamClient: func(clusterName logicalcluster.Name) (dynamic.Interface, error) {
return upstreamClient.Cluster(clusterName.Path()), nil
},
downstreamClient: downstreamClient,
downstreamNSCleaner: downstreamNSCleaner,

Expand Down Expand Up @@ -139,7 +141,14 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
syncTargetKey: syncTargetKey,
advancedSchedulingEnabled: advancedSchedulingEnabled,

mutators: make(map[schema.GroupVersionResource]Mutator, 2),
mutators: make(map[schema.GroupVersionResource]Mutator, 2),
dnsProcessor: dnsProcessor,
}

for _, mutator := range mutators {
for _, gvr := range mutator.GVRs() {
c.mutators[gvr] = mutator
}
}

logger := logging.WithReconciler(syncerLogger, controllerName)
Expand Down Expand Up @@ -174,96 +183,176 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
},
)

addDownstreamEventHandler(
ddsif.GVREventHandlerFuncs{
DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
return
}
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
return &c, nil
}

func NewSpecSyncerForDownstream(syncerLogger logr.Logger, syncTargetClusterName logicalcluster.Name, syncTargetName, syncTargetKey string,
advancedSchedulingEnabled bool,
getShardAccess synctarget.GetShardAccessFunc,
downstreamClient dynamic.Interface, downstreamKubeClient kubernetes.Interface,
ddsifForDownstream *ddsif.GenericDiscoveringDynamicSharedInformerFactory[cache.SharedIndexInformer, cache.GenericLister, informers.GenericInformer],
downstreamNSCleaner shared.Cleaner,
syncTargetUID types.UID,
dnsNamespace string,
dnsProcessor *dns.DNSProcessor,
dnsImage string,
mutators ...Mutator) (*Controller, error) {
c := Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),

getUpstreamClient: func(clusterName logicalcluster.Name) (dynamic.Interface, error) {
shardAccess, ok, err := getShardAccess(clusterName)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("shard-related clients not found for cluster %q", clusterName)
}
return shardAccess.SyncerClient.Cluster(clusterName.Path()), nil
},
downstreamClient: downstreamClient,
downstreamNSCleaner: downstreamNSCleaner,

getDownstreamLister: func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
informers, notSynced := ddsifForDownstream.Informers()
informer, ok := informers[gvr]
if !ok {
if shared.ContainsGVR(notSynced, gvr) {
return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr)
}
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr)
}
return informer.Lister(), nil
},
getUpstreamLister: func(clusterName logicalcluster.Name, gvr schema.GroupVersionResource) (cache.GenericLister, error) {
shardAccess, ok, err := getShardAccess(clusterName)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("shard-related clients not found for cluster %q", clusterName)
}

informers, notSynced := shardAccess.SyncerDDSIF.Informers()
informer, ok := informers[gvr]
if !ok {
if shared.ContainsGVR(notSynced, gvr) {
return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", gvr)
}
return nil, fmt.Errorf("gvr %v should be known in the upstream informer factory", gvr)
}
return informer.Lister().ByCluster(clusterName), nil
},

listDownstreamNamespacesByLocator: func(jsonLocator string) ([]*unstructured.Unstructured, error) {
nsInformer, err := ddsifForDownstream.ForResource(namespaceGVR)
if err != nil {
return nil, err
}
return indexers.ByIndex[*unstructured.Unstructured](nsInformer.Informer().GetIndexer(), syncerindexers.ByNamespaceLocatorIndexName, jsonLocator)
},
syncTargetName: syncTargetName,
syncTargetClusterName: syncTargetClusterName,
syncTargetUID: syncTargetUID,
syncTargetKey: syncTargetKey,
advancedSchedulingEnabled: advancedSchedulingEnabled,

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
mutators: make(map[schema.GroupVersionResource]Mutator, 2),

dnsProcessor: dnsProcessor,
}

for _, mutator := range mutators {
for _, gvr := range mutator.GVRs() {
c.mutators[gvr] = mutator
}
}

logger := logging.WithReconciler(syncerLogger, controllerName)

namespaceGVR := corev1.SchemeGroupVersion.WithResource("namespaces")

ddsifForDownstream.AddEventHandler(ddsif.GVREventHandlerFuncs{
DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
return
}
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting key for type %T: %w", obj, err))
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error splitting key %q: %w", key, err))
}
logger := logging.WithQueueKey(logger, key).WithValues("gvr", gvr, DownstreamNamespace, namespace, DownstreamName, name)
logger.V(3).Info("processing delete event")

var nsLocatorHolder *unstructured.Unstructured
// Handle namespaced resources
if namespace != "" {
// Use namespace lister
namespaceLister, err := c.getDownstreamLister(namespaceGVR)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting key for type %T: %w", obj, err))
utilruntime.HandleError(err)
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error splitting key %q: %w", key, err))
}
logger := logging.WithQueueKey(logger, key).WithValues("gvr", gvr, DownstreamNamespace, namespace, DownstreamName, name)
logger.V(3).Info("processing delete event")

var nsLocatorHolder *unstructured.Unstructured
// Handle namespaced resources
if namespace != "" {
// Use namespace lister
namespaceLister, err := c.getDownstreamLister(namespaceGVR)
if err != nil {
utilruntime.HandleError(err)
return
}

nsObj, err := namespaceLister.Get(namespace)
if apierrors.IsNotFound(err) {
return
}
if err != nil {
utilruntime.HandleError(err)
return
}
c.downstreamNSCleaner.PlanCleaning(namespace)
nsLocatorHolder, ok = nsObj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", nsObj))
return
}
} else {
// The nsLocatorHolder is in the resource itself for cluster-scoped resources.
nsLocatorHolder = unstrObj
}
logger = logging.WithObject(logger, nsLocatorHolder)

locator, ok := nsLocatorHolder.GetAnnotations()[shared.NamespaceLocatorAnnotation]
if !ok {
utilruntime.HandleError(fmt.Errorf("unable to find the locator annotation in resource %s", nsLocatorHolder.GetName()))
nsObj, err := namespaceLister.Get(namespace)
if apierrors.IsNotFound(err) {
return
}
nsLocator := &shared.NamespaceLocator{}
err = json.Unmarshal([]byte(locator), nsLocator)
if err != nil {
utilruntime.HandleError(err)
return
}
logger.V(4).Info("found", "NamespaceLocator", nsLocator)
m := &metav1.ObjectMeta{
Annotations: map[string]string{
logicalcluster.AnnotationKey: nsLocator.ClusterName.String(),
},
Namespace: nsLocator.Namespace,
Name: shared.GetUpstreamResourceName(gvr, name),
c.downstreamNSCleaner.PlanCleaning(namespace)
nsLocatorHolder, ok = nsObj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", nsObj))
return
}
c.AddToQueue(gvr, m, logger)
},
})

for _, mutator := range mutators {
for _, gvr := range mutator.GVRs() {
c.mutators[gvr] = mutator
}
}
} else {
// The nsLocatorHolder is in the resource itself for cluster-scoped resources.
nsLocatorHolder = unstrObj
}
logger = logging.WithObject(logger, nsLocatorHolder)

c.dnsProcessor = dnsProcessor
locator, ok := nsLocatorHolder.GetAnnotations()[shared.NamespaceLocatorAnnotation]
if !ok {
utilruntime.HandleError(fmt.Errorf("unable to find the locator annotation in resource %s", nsLocatorHolder.GetName()))
return
}
nsLocator := &shared.NamespaceLocator{}
err = json.Unmarshal([]byte(locator), nsLocator)
if err != nil {
utilruntime.HandleError(err)
return
}
logger.V(4).Info("found", "NamespaceLocator", nsLocator)
m := &metav1.ObjectMeta{
Annotations: map[string]string{
logicalcluster.AnnotationKey: nsLocator.ClusterName.String(),
},
Namespace: nsLocator.Namespace,
Name: shared.GetUpstreamResourceName(gvr, name),
}
c.AddToQueue(gvr, m, logger)
},
})

return &c, nil
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/syncer/spec/spec_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@ func (c *Controller) ensureSyncerFinalizer(ctx context.Context, gvr schema.Group

upstreamFinalizers = append(upstreamFinalizers, shared.SyncerFinalizerNamePrefix+c.syncTargetKey)
upstreamObjCopy.SetFinalizers(upstreamFinalizers)
if _, err := c.upstreamClient.Cluster(clusterName.Path()).Resource(gvr).Namespace(namespace).Update(ctx, upstreamObjCopy, metav1.UpdateOptions{}); err != nil {
upstreamClient, err := c.getUpstreamClient(clusterName)
if err != nil {
return false, err
}
if _, err := upstreamClient.Resource(gvr).Namespace(namespace).Update(ctx, upstreamObjCopy, metav1.UpdateOptions{}); err != nil {
logger.Error(err, "Failed adding finalizer on upstream upstreamresource")
return false, err
}
Expand Down Expand Up @@ -427,7 +431,11 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers
if apierrors.IsNotFound(err) {
// That's not an error.
// Just think about removing the finalizer from the KCP location-specific resource:
return shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, upstreamSyncerLister, c.upstreamClient.Cluster(upstreamObjLogicalCluster.Path()), upstreamObj.GetNamespace(), c.syncTargetKey, upstreamObj.GetName())
upstreamClient, err := c.getUpstreamClient(upstreamObjLogicalCluster)
if err != nil {
return err
}
return shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, upstreamSyncerLister, upstreamClient, upstreamObj.GetNamespace(), c.syncTargetKey, upstreamObj.GetName())
}
logger.Error(err, "Error deleting upstream resource from downstream")
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/syncer/spec/spec_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,11 +1257,13 @@ func TestSpecSyncerProcess(t *testing.T) {

secretMutator := mutators.NewSecretMutator()
podspecableMutator := mutators.NewPodspecableMutator(
ddsifForUpstreamSyncer, toInformerFactory.Core().V1().Services().Lister(), tc.syncTargetClusterName, tc.syncTargetName, syncTargetUID, "kcp-01c0zzvlqsi7n", false)
func(clusterName logicalcluster.Name) (*ddsif.DiscoveringDynamicSharedInformerFactory, error) {
return ddsifForUpstreamSyncer, nil
}, toInformerFactory.Core().V1().Services().Lister(), tc.syncTargetClusterName, tc.syncTargetName, syncTargetUID, "kcp-01c0zzvlqsi7n", false)

dnsProcessor := dns.NewDNSProcessor(toKubeClient, toInformerFactory, tc.syncTargetName, syncTargetUID, "kcp-01c0zzvlqsi7n", "dnsimage")
controller, err := NewSpecSyncer(logger, kcpLogicalCluster, tc.syncTargetName, syncTargetKey, tc.advancedSchedulingEnabled,
fromClusterClient, toClient, toKubeClient, ddsifForUpstreamSyncer, ddsifForDownstream, func(gh ddsif.GVREventHandler) {}, mockedCleaner, syncTargetUID,
fromClusterClient, toClient, toKubeClient, ddsifForUpstreamSyncer, ddsifForDownstream, mockedCleaner, syncTargetUID,
"kcp-01c0zzvlqsi7n", dnsProcessor, "dnsimage", secretMutator, podspecableMutator)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit bd8cfe8

Please sign in to comment.