Skip to content

Commit

Permalink
shard controller: use committer
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Goldstein <[email protected]>
  • Loading branch information
ncdc committed Mar 7, 2023
1 parent c9fbb59 commit 4c12b55
Showing 1 changed file with 30 additions and 49 deletions.
79 changes: 30 additions & 49 deletions pkg/reconciler/core/shard/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,26 @@ package shard

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

jsonpatch "github.com/evanphx/json-patch"
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/equality"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
corev1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/core/v1alpha1"
corev1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/core/v1alpha1"
corev1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/core/v1alpha1"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
)

const (
Expand All @@ -55,10 +51,12 @@ func NewController(
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)

c := &Controller{
queue: queue,
kcpClient: rootKcpClient,
shardIndexer: shardInformer.Informer().GetIndexer(),
shardLister: shardInformer.Lister(),
queue: queue,
kcpClient: rootKcpClient,
commit: committer.NewCommitter[*Shard, Patcher, *ShardSpec, *ShardStatus](rootKcpClient.CoreV1alpha1().Shards()),
getShard: func(clusterName logicalcluster.Name, name string) (*corev1alpha1.Shard, error) {
return shardInformer.Cluster(clusterName).Lister().Get(name)
},
}

shardInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -76,10 +74,17 @@ type Controller struct {

kcpClient kcpclientset.ClusterInterface

shardIndexer cache.Indexer
shardLister corev1alpha1listers.ShardClusterLister
getShard func(clusterName logicalcluster.Name, name string) (*corev1alpha1.Shard, error)
commit CommitFunc
}

type Shard = corev1alpha1.Shard
type ShardSpec = corev1alpha1.ShardSpec
type ShardStatus = corev1alpha1.ShardStatus
type Patcher = corev1alpha1client.ShardInterface
type Resource = committer.Resource[*ShardSpec, *ShardStatus]
type CommitFunc = func(ctx context.Context, original, updated *Resource) error

func (c *Controller) enqueue(obj interface{}) {
key, err := kcpcache.MetaClusterNamespaceKeyFunc(obj)
if err != nil {
Expand All @@ -101,7 +106,7 @@ func (c *Controller) Start(ctx context.Context, numThreads int) {
defer logger.Info("Shutting down controller")

for i := 0; i < numThreads; i++ {
go wait.Until(func() { c.startWorker(ctx) }, time.Second, ctx.Done())
go wait.UntilWithContext(ctx, c.startWorker, time.Second)
}

<-ctx.Done()
Expand Down Expand Up @@ -139,63 +144,39 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {

func (c *Controller) process(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
clusterName, namespace, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
logger.Error(err, "invalid key")
return nil
}
if namespace != "" {
logger.Error(errors.New("namespace found in key for cluster-wide Shard object"), "invalid key")
return nil
}

obj, err := c.shardLister.Cluster(clusterName).Get(name)
obj, err := c.getShard(clusterName, name)
if err != nil {
if kerrors.IsNotFound(err) {
return nil // object deleted before we handled it
}
return err
}

previous := obj
obj = obj.DeepCopy()

logger = logging.WithObject(logger, obj)
ctx = klog.NewContext(ctx, logger)

var errs []error
if err := c.reconcile(ctx, obj); err != nil {
return err
errs = append(errs, err)
}

// If the object being reconciled changed as a result, update it.
if !equality.Semantic.DeepEqual(previous.Status, obj.Status) {
oldData, err := json.Marshal(corev1alpha1.Shard{
Status: previous.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for workspace shard %s|%s/%s: %w", core.RootCluster, namespace, name, err)
}

newData, err := json.Marshal(corev1alpha1.Shard{
ObjectMeta: metav1.ObjectMeta{
UID: previous.UID,
ResourceVersion: previous.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: obj.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for workspace shard %s|%s/%s: %w", core.RootCluster, namespace, name, err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for workspace shard %s|%s/%s: %w", core.RootCluster, namespace, name, err)
}
_, uerr := c.kcpClient.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().Patch(ctx, obj.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return uerr
oldResource := &Resource{ObjectMeta: previous.ObjectMeta, Spec: &previous.Spec, Status: &previous.Status}
newResource := &Resource{ObjectMeta: obj.ObjectMeta, Spec: &obj.Spec, Status: &obj.Status}
if err := c.commit(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
}

logger.V(6).Info("processed Shard")
return nil
return utilerrors.NewAggregate(errs)
}

func (c *Controller) reconcile(ctx context.Context, workspaceShard *corev1alpha1.Shard) error {
Expand Down

0 comments on commit 4c12b55

Please sign in to comment.