Skip to content

Commit

Permalink
PMM-9946 Automatically add k8s cluster (percona#1125)
Browse files Browse the repository at this point in the history
* PMM-9946 Automatically add k8s cluster

* Check if PMM is inside k8s cluster
* Get config and convert it to kubeconfig
* Trigger register cluster call
  • Loading branch information
gen1us2k authored Oct 19, 2022
1 parent 5ffe9a2 commit 815adfa
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 45 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/lib/pq v1.10.7
github.com/minio/minio-go/v7 v7.0.40
github.com/mwitkow/go-proto-validators v0.3.2
github.com/percona-platform/dbaas-api v0.0.0-20220627132007-e6e85c6e26dc
github.com/percona-platform/dbaas-api v0.0.0-20221019084503-3649d46509df
github.com/percona-platform/saas v0.0.0-20221014123257-4fa7a15ce672
github.com/percona/exporter_shared v0.7.3
github.com/percona/go-mysql v0.0.0-20210427141028-73d29c6da78c
Expand Down Expand Up @@ -107,6 +107,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/golang/mock v1.5.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -503,8 +504,8 @@ github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKf
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/percona-lab/crypto v0.0.0-20220811043533-d164de3c7f08 h1:NprWeXddFZJSgtN8hmf/hhIgiZwB3GNaKnI88iAFgEc=
github.com/percona-lab/crypto v0.0.0-20220811043533-d164de3c7f08/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
github.com/percona-platform/dbaas-api v0.0.0-20220627132007-e6e85c6e26dc h1:0VUBIZufdLO2QBRUHqCngktp0/unEjZErvmPkhx7FDE=
github.com/percona-platform/dbaas-api v0.0.0-20220627132007-e6e85c6e26dc/go.mod h1:Z+cv//W6luzn9aC9DK/UfQURsABEVzoCi8dN6Tr01o4=
github.com/percona-platform/dbaas-api v0.0.0-20221019084503-3649d46509df h1:CodeejNzVTI7j8sHfYfaxRW0YkMif66FS0x6RK4RGcc=
github.com/percona-platform/dbaas-api v0.0.0-20221019084503-3649d46509df/go.mod h1:/jgle33awfHq1va/T6NnNS5wWAETSnl6wUZ1bew+CJ0=
github.com/percona-platform/saas v0.0.0-20221014123257-4fa7a15ce672 h1:dwot2L8nFLdnGcvdpK3AdoTykDM1Qr2odQJbq/KrZoM=
github.com/percona-platform/saas v0.0.0-20221014123257-4fa7a15ce672/go.mod h1:gFUwaFp6Ugu5qsBwiOVJYbDlzgZ77tmXdXGO7tG5xVI=
github.com/percona/exporter_shared v0.7.3 h1:TX4LisZ08jo8KIlekBTwPX13JlM6gOOKgmwg7QEf+r8=
Expand Down
48 changes: 26 additions & 22 deletions managed/cmd/pmm-managed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type gRPCServerDeps struct {
supervisord *supervisord.Service
config *config.Config
componentsService *managementdbaas.ComponentsService
dbaasInitializer *managementdbaas.Initializer
agentService *agents.AgentService
}

Expand Down Expand Up @@ -262,7 +263,9 @@ func runGRPCServer(ctx context.Context, deps *gRPCServerDeps) {
backupv1beta1.RegisterArtifactsServer(gRPCServer, managementbackup.NewArtifactsService(deps.db, deps.backupRemovalService))
backupv1beta1.RegisterRestoreHistoryServer(gRPCServer, managementbackup.NewRestoreHistoryService(deps.db))

dbaasv1beta1.RegisterKubernetesServer(gRPCServer, managementdbaas.NewKubernetesServer(deps.db, deps.dbaasClient, deps.grafanaClient, deps.versionServiceClient))
k8sServer := managementdbaas.NewKubernetesServer(deps.db, deps.dbaasClient, deps.versionServiceClient, deps.grafanaClient)
deps.dbaasInitializer.RegisterKubernetesServer(k8sServer)
dbaasv1beta1.RegisterKubernetesServer(gRPCServer, k8sServer)
dbaasv1beta1.RegisterDBClustersServer(gRPCServer, managementdbaas.NewDBClusterService(deps.db, deps.dbaasClient, deps.grafanaClient, deps.versionServiceClient))
dbaasv1beta1.RegisterPXCClustersServer(gRPCServer, managementdbaas.NewPXCClusterService(deps.db, deps.dbaasClient, deps.grafanaClient, deps.componentsService, deps.versionServiceClient.GetVersionServiceURL()))
dbaasv1beta1.RegisterPSMDBClustersServer(gRPCServer, managementdbaas.NewPSMDBClusterService(deps.db, deps.dbaasClient, deps.grafanaClient, deps.componentsService, deps.versionServiceClient.GetVersionServiceURL()))
Expand Down Expand Up @@ -807,6 +810,8 @@ func main() {

componentsService := managementdbaas.NewComponentsService(db, dbaasClient, versionService)

dbaasInitializer := managementdbaas.NewInitializer(db, dbaasClient)

serverParams := &server.Params{
DB: db,
VMDB: vmdb,
Expand All @@ -821,7 +826,7 @@ func main() {
GrafanaClient: grafanaClient,
VMAlertExternalRules: externalRules,
RulesService: rulesService,
DbaasClient: dbaasClient,
DBaaSInitializer: dbaasInitializer,
Emailer: emailer,
}

Expand Down Expand Up @@ -897,26 +902,6 @@ func main() {
l.Fatalf("Failed to get settings: %+v.", err)
}

if settings.DBaaS.Enabled {
err = supervisord.RestartSupervisedService("dbaas-controller")
if err != nil {
l.Errorf("Failed to restart dbaas-controller on startup: %v", err)
} else {
l.Debug("DBaaS is enabled - creating a DBaaS client.")
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
err := dbaasClient.Connect(ctx)
cancel()
if err != nil {
l.Fatalf("Failed to connect to dbaas-controller API on %s: %v", *dbaasControllerAPIAddrF, err)
}
defer func() {
err := dbaasClient.Disconnect()
if err != nil {
l.Fatalf("Failed to disconnect from dbaas-controller API: %v", err)
}
}()
}
}
authServer := grafana.NewAuthServer(grafanaClient, awsInstanceChecker)

l.Info("Starting services...")
Expand Down Expand Up @@ -1010,6 +995,7 @@ func main() {
config: &cfg.Config,
defaultsFileParser: defaultsFileParser,
componentsService: componentsService,
dbaasInitializer: dbaasInitializer,
agentService: agentService,
})
}()
Expand All @@ -1034,6 +1020,24 @@ func main() {
defer wg.Done()
cleaner.Run(ctx, cleanInterval, cleanOlderThan)
}()
if settings.DBaaS.Enabled {
err = supervisord.RestartSupervisedService("dbaas-controller")
if err != nil {
l.Errorf("Failed to restart dbaas-controller on startup: %v", err)
} else {
l.Debug("DBaaS is enabled - creating a DBaaS client.")
err := dbaasInitializer.Enable(ctx)
if err != nil {
l.Fatalf("Failed initializing dbaas-controller %s: %v", *dbaasControllerAPIAddrF, err)
}
defer func() {
err := dbaasInitializer.Disable(context.Background())
if err != nil {
l.Fatalf("Failed disabling dbaas-controller API: %v", err)
}
}()
}
}

wg.Wait()
}
Expand Down
6 changes: 6 additions & 0 deletions managed/services/dbaas/dbaas_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,9 @@ func (c *Client) StopMonitoring(ctx context.Context, in *controllerv1beta1.StopM
defer c.connM.RUnlock()
return c.kubernetesClient.StopMonitoring(ctx, in, opts...)
}

func (c *Client) GetKubeConfig(ctx context.Context, in *controllerv1beta1.GetKubeconfigRequest, opts ...grpc.CallOption) (*controllerv1beta1.GetKubeconfigResponse, error) {
c.connM.RLock()
defer c.connM.RUnlock()
return c.kubernetesClient.GetKubeconfig(ctx, in, opts...)
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestDBClusterService(t *testing.T) {

versionService := NewVersionServiceClient(versionServiceURL)

ks := NewKubernetesServer(db, dbaasClient, grafanaClient, versionService)
ks := NewKubernetesServer(db, dbaasClient, versionService, grafanaClient)
dbaasClient.On("CheckKubernetesClusterConnection", ctx, dbKubeconfigTest).Return(&controllerv1beta1.CheckKubernetesClusterConnectionResponse{
Operators: &controllerv1beta1.Operators{
PxcOperatorVersion: "",
Expand Down
152 changes: 152 additions & 0 deletions managed/services/management/dbaas/dbaas_initializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (C) 2017 Percona LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package dbaas

import (
"context"
"sync"
"time"

dbaascontrollerv1beta1 "github.com/percona-platform/dbaas-api/gen/controller"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/reform.v1"

dbaasv1beta1 "github.com/percona/pmm/api/managementpb/dbaas"
"github.com/percona/pmm/managed/models"
)

// Initializer initializes dbaas feature
type Initializer struct {
db *reform.DB
l *logrus.Entry

dbaasClient dbaasClient
kubernetesServer dbaasv1beta1.KubernetesServer

enabled bool
cancel func()
m sync.Mutex
}

const defaultClusterName = "default-pmm-cluster"

var errClusterExists = errors.New("cluster already exists")

// NewInitializer returns initialized Initializer structure
func NewInitializer(db *reform.DB, client dbaasClient) *Initializer {
l := logrus.WithField("component", "dbaas_initializer")
return &Initializer{
db: db,
l: l,
dbaasClient: client,
}
}

func (in *Initializer) RegisterKubernetesServer(k dbaasv1beta1.KubernetesServer) {
in.kubernetesServer = k
}

// Update updates current dbaas settings
func (in *Initializer) Update(ctx context.Context) error {
settings, err := models.GetSettings(in.db)
if err != nil {
in.l.Errorf("Failed to get settings: %+v.", err)
return err
}
if settings.DBaaS.Enabled {
return in.Enable(ctx)
}
return in.Disable(ctx)
}

// Enable enables dbaas feature and connects to dbaas-controller
func (in *Initializer) Enable(ctx context.Context) error {
in.m.Lock()
defer in.m.Unlock()
if in.enabled {
return nil
}
timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
err := in.dbaasClient.Connect(timeoutCtx)
if err != nil {
return err
}
ctx, in.cancel = context.WithCancel(ctx)

in.enabled = true
return in.registerInCluster(ctx)
}

// registerIncluster automatically adds k8s cluster to dbaas when PMM is running inside k8s cluster
func (in *Initializer) registerInCluster(ctx context.Context) error {
kubeConfig, err := in.dbaasClient.GetKubeConfig(ctx, &dbaascontrollerv1beta1.GetKubeconfigRequest{})
if err == nil {
// If err is not equal to nil, dont' register cluster and fail silently
err := in.db.InTransaction(func(t *reform.TX) error {
cluster, err := models.FindKubernetesClusterByName(t.Querier, defaultClusterName)
if err != nil {
in.l.Errorf("failed finding cluster: %v", err)
return nil
}
if cluster != nil {
return errClusterExists
}
return nil
})
if err != nil {
if errors.Is(err, errClusterExists) {
return nil
}
return err
}
if len(kubeConfig.Kubeconfig) != 0 {
req := &dbaasv1beta1.RegisterKubernetesClusterRequest{
KubernetesClusterName: defaultClusterName,
KubeAuth: &dbaasv1beta1.KubeAuth{
Kubeconfig: kubeConfig.Kubeconfig,
},
}
_, err = in.kubernetesServer.RegisterKubernetesCluster(ctx, req)
if err != nil {
return err
}
in.l.Info("Cluster is successfully initialized")
}
} else {
in.l.Errorf("failed getting kubeconfig inside cluster: %v", err)
}
return nil
}

// Disable disconnects from dbaas-controller and disabled dbaas feature
func (in *Initializer) Disable(ctx context.Context) error {
in.m.Lock()
defer in.m.Unlock()
if !in.enabled { // Don't disable if already disabled
return nil
}
if in.cancel != nil {
in.cancel()
}
err := in.dbaasClient.Disconnect()
if err != nil {
return err
}
in.enabled = false
return nil
}
6 changes: 6 additions & 0 deletions managed/services/management/dbaas/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
//go:generate ../../../../bin/mockery -name=componentsService -case=snake -inpkg -testonly

type dbaasClient interface {
// Connect connects the client to dbaas-controller API.
Connect(ctx context.Context) error
// Disconnect disconnects the client from dbaas-controller API.
Disconnect() error
// CheckKubernetesClusterConnection checks connection to Kubernetes cluster and returns statuses of the cluster and operators.
CheckKubernetesClusterConnection(ctx context.Context, kubeConfig string) (*controllerv1beta1.CheckKubernetesClusterConnectionResponse, error)
// ListPXCClusters returns a list of PXC clusters.
Expand Down Expand Up @@ -70,6 +74,8 @@ type dbaasClient interface {
StartMonitoring(ctx context.Context, in *controllerv1beta1.StartMonitoringRequest, opts ...grpc.CallOption) (*controllerv1beta1.StartMonitoringResponse, error)
// StopMonitoring removes victoria metrics operator from the cluster.
StopMonitoring(ctx context.Context, in *controllerv1beta1.StopMonitoringRequest, opts ...grpc.CallOption) (*controllerv1beta1.StopMonitoringResponse, error)
// GetKubeConfig gets inluster config and converts it to kubeConfig
GetKubeConfig(ctx context.Context, in *controllerv1beta1.GetKubeconfigRequest, opts ...grpc.CallOption) (*controllerv1beta1.GetKubeconfigResponse, error)
}

type versionService interface {
Expand Down
7 changes: 2 additions & 5 deletions managed/services/management/dbaas/kubernetes_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ type kubernetesServer struct {
}

// NewKubernetesServer creates Kubernetes Server.
func NewKubernetesServer(db *reform.DB, dbaasClient dbaasClient, grafanaClient grafanaClient, versionService versionService) dbaasv1beta1.KubernetesServer {
func NewKubernetesServer(db *reform.DB, dbaasClient dbaasClient, versionService versionService, grafanaClient grafanaClient) dbaasv1beta1.KubernetesServer {
l := logrus.WithField("component", "kubernetes_server")
return &kubernetesServer{
l: l,
db: db,
dbaasClient: dbaasClient,
grafanaClient: grafanaClient,
versionService: versionService,
grafanaClient: grafanaClient,
}
}

Expand Down Expand Up @@ -274,7 +274,6 @@ func (k kubernetesServer) RegisterKubernetesCluster(ctx context.Context, req *db
k.l.Errorf("Replacing `aws` with `aim-authenticator` failed: %s", err)
return nil, status.Error(codes.Internal, "Internal server error")
}

var clusterInfo *dbaascontrollerv1beta1.CheckKubernetesClusterConnectionResponse
err = k.db.InTransaction(func(t *reform.TX) error {
var e error
Expand All @@ -292,7 +291,6 @@ func (k kubernetesServer) RegisterKubernetesCluster(ctx context.Context, req *db
if err != nil {
return nil, err
}

pmmVersion, err := goversion.NewVersion(pmmversion.PMMVersion)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -302,7 +300,6 @@ func (k kubernetesServer) RegisterKubernetesCluster(ctx context.Context, req *db
if err != nil {
return nil, err
}

if pxcOperatorVersion != nil && (clusterInfo.Operators == nil || clusterInfo.Operators.PxcOperatorVersion == "") {
_, err = k.dbaasClient.InstallPXCOperator(ctx, &dbaascontrollerv1beta1.InstallPXCOperatorRequest{
KubeAuth: &dbaascontrollerv1beta1.KubeAuth{
Expand Down
4 changes: 2 additions & 2 deletions managed/services/management/dbaas/kubernetes_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func TestKubernetesServer(t *testing.T) {
dbaasClient.AssertExpectations(t)
require.NoError(t, sqlDB.Close())
}

ks = NewKubernetesServer(db, dbaasClient, grafanaClient, NewVersionServiceClient("https://check-dev.percona.com/versions/v1"))
versionService := NewVersionServiceClient("https://check-dev.percona.com/versions/v1")
ks = NewKubernetesServer(db, dbaasClient, versionService, grafanaClient)
return
}
t.Run("Basic", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 815adfa

Please sign in to comment.