diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 454881679..30c73a833 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -126,6 +126,7 @@ rules: - delete - get - patch + - watch - apiGroups: - "" resources: @@ -136,6 +137,7 @@ rules: - get - patch - update + - watch - apiGroups: - "" resources: @@ -146,6 +148,7 @@ rules: - get - patch - update + - watch - apiGroups: - dscinitialization.opendatahub.io resources: @@ -218,6 +221,7 @@ rules: - get - patch - update + - watch - apiGroups: - networking.k8s.io resources: @@ -264,6 +268,7 @@ rules: - get - patch - update + - watch - apiGroups: - route.openshift.io resources: @@ -275,6 +280,7 @@ rules: - get - patch - update + - watch - apiGroups: - scheduling.k8s.io resources: diff --git a/main.go b/main.go index d78d82564..ddbd7ed86 100644 --- a/main.go +++ b/main.go @@ -36,10 +36,13 @@ import ( "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -49,12 +52,14 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" + clientcache "k8s.io/client-go/tools/cache" retrywatch "k8s.io/client-go/tools/watch" configv1alpha1 "k8s.io/component-base/config/v1alpha1" "k8s.io/klog/v2" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -166,6 +171,9 @@ func main() { kubeConfig.QPS = ptr.Deref(cfg.ClientConnection.QPS, rest.DefaultQPS) setupLog.V(2).Info("REST client", "qps", kubeConfig.QPS, "burst", kubeConfig.Burst) + selector, err := labels.Parse(controllers.RayClusterNameLabel) + exitOnError(err, "unable to parse label selector") + mgr, err := ctrl.NewManager(kubeConfig, ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ @@ -179,6 +187,31 @@ func main() { LeaseDuration: &cfg.LeaderElection.LeaseDuration.Duration, RetryPeriod: &cfg.LeaderElection.RetryPeriod.Duration, RenewDeadline: &cfg.LeaderElection.RenewDeadline.Duration, + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &corev1.Secret{}: { + Label: selector, + }, + &corev1.Service{}: { + Label: selector, + }, + &corev1.ServiceAccount{}: { + Label: selector, + }, + &networkingv1.Ingress{}: { + Label: selector, + }, + &networkingv1.NetworkPolicy{}: { + Label: selector, + }, + &rbacv1.ClusterRoleBinding{}: { + Label: selector, + }, + &routev1.Route{}: { + Label: selector, + }, + }, + }, }) exitOnError(err, "unable to create manager") @@ -451,7 +484,7 @@ func waitForAPI(ctx context.Context, mgr ctrl.Manager, apiName string, action fu // Wait for the API to become available then invoke action setupLog.Info(fmt.Sprintf("API %v not available, setting up retry watcher", apiName)) - retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{ + retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &clientcache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) }, diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index bf58426bc..869f78093 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -34,6 +34,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,6 +49,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" routev1 "github.com/openshift/api/route/v1" routev1ac "github.com/openshift/client-go/route/applyconfigurations/route/v1" @@ -78,6 +81,8 @@ const ( CAPrivateKeyKey = "ca.key" CACertKey = "ca.crt" + + RayClusterNameLabel = "ray.openshift.ai/cluster-name" ) var ( @@ -88,12 +93,12 @@ var ( // +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=ray.io,resources=rayclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=update -// +kubebuilder:rbac:groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;create;update;patch;delete -// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;create;update;patch;delete -// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;create;patch;delete;get -// +kubebuilder:rbac:groups=core,resources=services,verbs=get;create;update;patch;delete -// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;create;update;patch;delete -// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;create;update;patch;delete +// +kubebuilder:rbac:groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;create;update;patch;delete;watch +// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;create;update;patch;delete;watch +// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;create;patch;delete;get;watch +// +kubebuilder:rbac:groups=core,resources=services,verbs=get;create;update;patch;delete;watch +// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;create;update;patch;delete;watch +// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;create;update;patch;delete;watch // +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create; // +kubebuilder:rbac:groups=authorization.k8s.io,resources=subjectaccessreviews,verbs=create; // +kubebuilder:rbac:groups=dscinitialization.opendatahub.io,resources=dscinitializations,verbs=get;list;watch @@ -301,7 +306,7 @@ func crbNameFromCluster(cluster *rayv1.RayCluster) string { func desiredOAuthClusterRoleBinding(cluster *rayv1.RayCluster) *rbacv1ac.ClusterRoleBindingApplyConfiguration { return rbacv1ac.ClusterRoleBinding( crbNameFromCluster(cluster)). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name, "ray.openshift.ai/cluster-namespace": cluster.Namespace}). WithSubjects( rbacv1ac.Subject(). WithKind("ServiceAccount"). @@ -322,7 +327,7 @@ func oauthServiceAccountNameFromCluster(cluster *rayv1.RayCluster) string { func desiredServiceAccount(cluster *rayv1.RayCluster) *corev1ac.ServiceAccountApplyConfiguration { return corev1ac.ServiceAccount(oauthServiceAccountNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithAnnotations(map[string]string{ "serviceaccounts.openshift.io/oauth-redirectreference.first": "" + `{"kind":"OAuthRedirectReference","apiVersion":"v1",` + @@ -343,7 +348,7 @@ func rayClientNameFromCluster(cluster *rayv1.RayCluster) string { func desiredClusterRoute(cluster *rayv1.RayCluster) *routev1ac.RouteApplyConfiguration { return routev1ac.Route(dashboardNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithSpec(routev1ac.RouteSpec(). WithTo(routev1ac.RouteTargetReference().WithKind("Service").WithName(oauthServiceNameFromCluster(cluster))). WithPort(routev1ac.RoutePort().WithTargetPort(intstr.FromString((oAuthServicePortName)))). @@ -367,7 +372,7 @@ func oauthServiceTLSSecretName(cluster *rayv1.RayCluster) string { func desiredOAuthService(cluster *rayv1.RayCluster) *corev1ac.ServiceApplyConfiguration { return corev1ac.Service(oauthServiceNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithAnnotations(map[string]string{"service.beta.openshift.io/serving-cert-secret-name": oauthServiceTLSSecretName(cluster)}). WithSpec( corev1ac.ServiceSpec(). @@ -397,7 +402,7 @@ func desiredOAuthSecret(cluster *rayv1.RayCluster, cookieSalt string) *corev1ac. cookieSecret := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) return corev1ac.Secret(oauthSecretNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithStringData(map[string]string{"cookie_secret": cookieSecret}). WithOwnerReferences( metav1ac.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), @@ -410,7 +415,7 @@ func caSecretNameFromCluster(cluster *rayv1.RayCluster) string { func desiredCASecret(cluster *rayv1.RayCluster, key, cert []byte) *corev1ac.SecretApplyConfiguration { return corev1ac.Secret(caSecretNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithData(map[string][]byte{ CAPrivateKeyKey: key, CACertKey: cert, @@ -466,7 +471,7 @@ func generateCACertificate() ([]byte, []byte, error) { } func desiredWorkersNetworkPolicy(cluster *rayv1.RayCluster) *networkingv1ac.NetworkPolicyApplyConfiguration { return networkingv1ac.NetworkPolicy(cluster.Name+"-workers", cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithSpec(networkingv1ac.NetworkPolicySpec(). WithPodSelector(metav1ac.LabelSelector().WithMatchLabels(map[string]string{"ray.io/cluster": cluster.Name, "ray.io/node-type": "worker"})). WithIngress( @@ -488,7 +493,7 @@ func desiredHeadNetworkPolicy(cluster *rayv1.RayCluster, cfg *config.KubeRayConf allSecuredPorts = append(allSecuredPorts, networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(10001))) } return networkingv1ac.NetworkPolicy(cluster.Name+"-head", cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{RayClusterNameLabel: cluster.Name}). WithSpec(networkingv1ac.NetworkPolicySpec(). WithPodSelector(metav1ac.LabelSelector().WithMatchLabels(map[string]string{"ray.io/cluster": cluster.Name, "ray.io/node-type": "head"})). WithIngress( @@ -551,5 +556,27 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named(controllerName). For(&rayv1.RayCluster{}). + Owns(&corev1.ServiceAccount{}). + Owns(&corev1.Service{}). + Owns(&corev1.Secret{}). + Owns(&routev1.Route{}). + Owns(&networkingv1.Ingress{}). + Watches(&rbacv1.ClusterRoleBinding{}, handler.EnqueueRequestsFromMapFunc( + func(c context.Context, o client.Object) []reconcile.Request { + name, ok := o.GetLabels()[RayClusterNameLabel] + if !ok { + return []reconcile.Request{} + } + namespace, ok := o.GetLabels()["ray.openshift.ai/cluster-namespace"] + if !ok { + return []reconcile.Request{} + } + return []reconcile.Request{{ + NamespacedName: client.ObjectKey{ + Name: name, + Namespace: namespace, + }}} + }), + ). Complete(r) } diff --git a/pkg/controllers/support.go b/pkg/controllers/support.go index d208b52d3..c5cc96331 100644 --- a/pkg/controllers/support.go +++ b/pkg/controllers/support.go @@ -20,7 +20,7 @@ func serviceNameFromCluster(cluster *rayv1.RayCluster) string { func desiredRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{"ray.openshift.ai/cluster-name": cluster.Name}). WithSpec(routeapply.RouteSpec(). WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)). WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))). @@ -33,7 +33,7 @@ func desiredRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConf func desiredRayClientIngress(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{"ray.openshift.ai/cluster-name": cluster.Name}). WithAnnotations(map[string]string{ "nginx.ingress.kubernetes.io/rewrite-target": "/", "nginx.ingress.kubernetes.io/ssl-redirect": "true", @@ -68,7 +68,7 @@ func desiredRayClientIngress(cluster *rayv1.RayCluster, ingressHost string) *net func desiredClusterIngress(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace). - WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithLabels(map[string]string{"ray.openshift.ai/cluster-name": cluster.Name}). WithOwnerReferences(v1.OwnerReference(). WithAPIVersion(cluster.APIVersion). WithKind(cluster.Kind).