Skip to content

Commit

Permalink
feat: add redis metrics to application controller and api server (arg…
Browse files Browse the repository at this point in the history
…oproj#3500)

* add redis metrics to application controller and api server

* fix failed test
  • Loading branch information
Alexander Matyushentsev authored Apr 28, 2020
1 parent 14f1725 commit 842a3d1
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 14 deletions.
16 changes: 14 additions & 2 deletions cmd/argocd-application-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/argoproj/pkg/stats"
rediscache "github.com/go-redis/cache"
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -50,6 +52,7 @@ func newCommand() *cobra.Command {
metricsPort int
kubectlParallelismLimit int64
cacheSrc func() (*appstatecache.Cache, error)
redisClient *redis.Client
)
var command = cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -91,6 +94,14 @@ func newCommand() *cobra.Command {
metricsPort,
kubectlParallelismLimit)
errors.CheckError(err)
metricsServer := appController.GetMetricsServer()
redisClient.WrapProcess(func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
err := oldProcess(cmd)
metricsServer.IncRedisRequest(err != nil && err != rediscache.ErrCacheMiss)
return err
}
})

vers := common.GetVersion()
log.Infof("Application Controller (version: %s, built: %s) starting (namespace: %s)", vers.Version, vers.BuildDate, namespace)
Expand All @@ -116,8 +127,9 @@ func newCommand() *cobra.Command {
command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortArgoCDMetrics, "Start metrics server on given port")
command.Flags().IntVar(&selfHealTimeoutSeconds, "self-heal-timeout-seconds", 5, "Specifies timeout between application self heal attempts")
command.Flags().Int64Var(&kubectlParallelismLimit, "kubectl-parallelism-limit", 20, "Number of allowed concurrent kubectl fork/execs. Any value less the 1 means no limit.")

cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command, func(client *redis.Client) {
redisClient = client
})
return &command
}

Expand Down
4 changes: 4 additions & 0 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func NewApplicationController(
return &ctrl, nil
}

func (ctrl *ApplicationController) GetMetricsServer() *metrics.MetricsServer {
return ctrl.metricsServer
}

func (ctrl *ApplicationController) onKubectlRun(command string) (util.Closer, error) {
ctrl.metricsServer.IncKubectlExec(command)
if ctrl.kubectlSemaphore != nil {
Expand Down
16 changes: 16 additions & 0 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"os"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -24,6 +25,7 @@ type MetricsServer struct {
kubectlExecPendingGauge *prometheus.GaugeVec
k8sRequestCounter *prometheus.CounterVec
clusterEventsCounter *prometheus.CounterVec
redisRequestCounter *prometheus.CounterVec
reconcileHistogram *prometheus.HistogramVec
registry *prometheus.Registry
}
Expand Down Expand Up @@ -108,6 +110,14 @@ var (
Name: "argocd_cluster_events_total",
Help: "Number of processes k8s resource events.",
}, append(descClusterDefaultLabels, "group", "kind"))

redisRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "argocd_redis_request_total",
Help: "Number of kubernetes requests executed during application reconciliation.",
},
[]string{"initiator", "failed"},
)
)

// NewMetricsServer returns a new prometheus server which collects application metrics
Expand All @@ -128,6 +138,7 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, health
registry.MustRegister(kubectlExecPendingGauge)
registry.MustRegister(reconcileHistogram)
registry.MustRegister(clusterEventsCounter)
registry.MustRegister(redisRequestCounter)

return &MetricsServer{
registry: registry,
Expand All @@ -141,6 +152,7 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, health
kubectlExecPendingGauge: kubectlExecPendingGauge,
reconcileHistogram: reconcileHistogram,
clusterEventsCounter: clusterEventsCounter,
redisRequestCounter: redisRequestCounter,
}
}

Expand Down Expand Up @@ -189,6 +201,10 @@ func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, server,
).Inc()
}

func (m *MetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues("argocd-application-controller", strconv.FormatBool(failed)).Inc()
}

// IncReconcile increments the reconcile counter for an application
func (m *MetricsServer) IncReconcile(app *argoappv1.Application, duration time.Duration) {
m.reconcileHistogram.WithLabelValues(app.Namespace, app.Spec.Destination.Server).Observe(duration.Seconds())
Expand Down
49 changes: 49 additions & 0 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package metrics

import (
"fmt"
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type MetricsServer struct {
*http.Server
redisRequestCounter *prometheus.CounterVec
}

var (
redisRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "argocd_redis_request_total",
Help: "Number of kubernetes requests executed during application reconciliation.",
},
[]string{"initiator", "failed"},
)
)

// NewMetricsServer returns a new prometheus server which collects api server metrics
func NewMetricsServer(port int) *MetricsServer {
mux := http.NewServeMux()
registry := prometheus.NewRegistry()
mux.Handle("/metrics", promhttp.HandlerFor(prometheus.Gatherers{
registry,
prometheus.DefaultGatherer,
}, promhttp.HandlerOpts{}))

registry.MustRegister(redisRequestCounter)

return &MetricsServer{
Server: &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
},
redisRequestCounter: redisRequestCounter,
}
}

func (m *MetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues("argocd-server", strconv.FormatBool(failed)).Inc()
}
25 changes: 13 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/dgrijalva/jwt-go"
rediscache "github.com/go-redis/cache"
"github.com/go-redis/redis"
golang_proto "github.com/golang/protobuf/proto"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -25,7 +26,6 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
netCtx "golang.org/x/net/context"
Expand Down Expand Up @@ -67,6 +67,7 @@ import (
servercache "github.com/argoproj/argo-cd/server/cache"
"github.com/argoproj/argo-cd/server/certificate"
"github.com/argoproj/argo-cd/server/cluster"
"github.com/argoproj/argo-cd/server/metrics"
"github.com/argoproj/argo-cd/server/project"
"github.com/argoproj/argo-cd/server/rbacpolicy"
"github.com/argoproj/argo-cd/server/repocreds"
Expand Down Expand Up @@ -246,7 +247,17 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
httpsS.Handler = withRootPath(httpsS.Handler, a.RootPath)
}
}
metricsServ := newAPIServerMetricsServer(metricsPort)

metricsServ := metrics.NewMetricsServer(metricsPort)
if a.RedisClient != nil {
a.RedisClient.WrapProcess(func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
err := oldProcess(cmd)
metricsServ.IncRedisRequest(err != nil && err != rediscache.ErrCacheMiss)
return err
}
})
}

// Start listener
var conn net.Listener
Expand Down Expand Up @@ -724,16 +735,6 @@ func indexFilePath(srcPath string, baseHRef string) (string, error) {
return filePath, nil
}

// newAPIServerMetricsServer returns HTTP server which serves prometheus metrics on gRPC requests
func newAPIServerMetricsServer(port int) *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
return &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
}
}

func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
Expand Down

0 comments on commit 842a3d1

Please sign in to comment.