Skip to content

Commit

Permalink
fix bugs in update (ssexporter) and port (nodeexporter) (secretflow#227)
Browse files Browse the repository at this point in the history
* feat:cluster_monitor

利用envoy和ss实现集群间指标测量,可测试测量脚本所使用的系统资源。

* update 11.1

* delete idea

* 11.3

* update aggregation functions

* remove test

* 11.21

* 11.27

* 11.29

* Create README.md

* fix IP address in ListenAndServe

* merge metricexporter to mian

* add annotations and docs

* fix MetricUpdatePeriod

* Update kuscia_config_cn.md

* fix ss update

* kuscia-monitor auto start

* integrates grafana with templates

* import prom from images; separate start_monitor

* delete useless file

* start node_exporter in kuscia, add docs

* assign the grafana template

* rm test script, fixed docs

* unify ports into 9091

* Update kuscia_monitor.md

* remove useless variables

* separate ss\envoy from metricexporter

* fix update (ssexporter) and port (nodeexporter)

* enable exporter in the master

* code style
  • Loading branch information
Vancasola authored Jan 31, 2024
1 parent ee44335 commit cdac81b
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 231 deletions.
2 changes: 2 additions & 0 deletions cmd/kuscia/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func Run(ctx context.Context, configFile string, onlyControllers bool) error {
}()
wg.Wait()
modules.RunNodeExporter(runCtx, cancel, conf)
modules.RunSsExporter(runCtx, cancel, conf)
modules.RunMetricExporter(runCtx, cancel, conf)
utils.SetupPprof(conf.Debug, conf.DebugPort, false)
}
<-runCtx.Done()
Expand Down
10 changes: 5 additions & 5 deletions cmd/kuscia/modules/metricexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ import (
)

type metricExporterModule struct {
rootDir string
metricsUrls map[string]string
rootDir string
metricURLs map[string]string
}

func NewMetricExporter(i *Dependencies) Module {
return &metricExporterModule{
rootDir: i.RootDir,
metricsUrls: map[string]string{
metricURLs: map[string]string{
"node-exporter": "http://localhost:9100/metrics",
"envoy": envoyexporter.GetEnvoyMetricUrl(),
"envoy": envoyexporter.GetEnvoyMetricURL(),
"ss": "http://localhost:9092/ssmetrics",
},
}
}

func (exporter *metricExporterModule) Run(ctx context.Context) error {
metricexporter.MetricExporter(ctx, exporter.metricsUrls)
metricexporter.MetricExporter(ctx, exporter.metricURLs)
return nil
}

Expand Down
16 changes: 7 additions & 9 deletions cmd/kuscia/modules/nodeexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ import (
)

type nodeExporterModule struct {
runMode pkgcom.RunModeType
rootDir string
runMode pkgcom.RunModeType
rootDir string
exportPort string
}

func NewNodeExporter(i *Dependencies) Module {
return &nodeExporterModule{
runMode: i.RunMode,
rootDir: i.RootDir,
runMode: i.RunMode,
rootDir: i.RootDir,
exportPort: ":9100",
}
}

func (exporter *nodeExporterModule) Run(ctx context.Context) error {
var args []string
if exporter.runMode == "master" {
args = append(args, "--web.listen-address")
args = append(args, ":9091")
}
sp := supervisor.NewSupervisor("node_exporter", nil, -1)
return sp.Run(ctx, func(ctx context.Context) supervisor.Cmd {
cmd := exec.CommandContext(ctx, filepath.Join(exporter.rootDir, "bin/node_exporter"), args...)
Expand Down Expand Up @@ -92,7 +90,7 @@ func (exporter *nodeExporterModule) WaitReady(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-tickerReady.C:
if nil == exporter.readyz("http://127.0.0.1:9100") {
if nil == exporter.readyz("http://127.0.0.1"+exporter.exportPort) {
return nil
}
case <-ticker.C:
Expand Down
31 changes: 7 additions & 24 deletions pkg/metricexporter/envoyexporter/envoyexporter.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,5 @@
package envoyexporter

import (
"github.com/secretflow/kuscia/pkg/ssexporter/parse"
"strings"
)

func ConvertClusterMetrics(metrics []string) []string {
var clusterMetrics []string
clusterAddresses := parse.GetClusterAddress()
for clusterName := range clusterAddresses {
for _, metric := range metrics {
str := "cluster." + clusterName + "." + strings.ToLower(metric)
clusterMetrics = append(clusterMetrics, str)
}
}
return clusterMetrics
}

func GetEnvoyMetrics() []string {
var metrics []string
metrics = append(metrics, "upstream_cx_rx_bytes_total",
Expand All @@ -28,16 +11,16 @@ func GetEnvoyMetrics() []string {
"upstream_cx_connect_fail",
"upstream_cx_connect_timeout",
"upstream_rq_timeout")
return ConvertClusterMetrics(metrics)
return metrics
}

func GetEnvoyMetricUrl() string {
baseUrl := "http://localhost:10000/stats?format=prometheus&&filter="
func GetEnvoyMetricURL() string {
baseURL := "http://localhost:10000/stats/prometheus?filter="
metrics := GetEnvoyMetrics()
filter_regex := "("
filterRegex := "("
for _, metric := range metrics {
filter_regex += metric + "|"
filterRegex += metric + "|"
}
filter_regex = filter_regex[0:len(filter_regex)-1] + ")"
return baseUrl + filter_regex
filterRegex = filterRegex[0:len(filterRegex)-1] + ")"
return baseURL + filterRegex
}
4 changes: 2 additions & 2 deletions pkg/ssexporter/parse/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func GetLocalDomainName() string {
return namespace
}

// GetIpFromDomain get a list of IP addresses from a local domain name
func GetIpFromDomain(localDomainName string) []string {
// GetIPFromDomain get a list of IP addresses from a local domain name
func GetIPFromDomain(localDomainName string) []string {
ipAddresses, err := net.LookupIP(localDomainName)
var ipAddr []string
if err != nil {
Expand Down
98 changes: 49 additions & 49 deletions pkg/ssexporter/promexporter/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package promexporter
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/secretflow/kuscia/pkg/utils/nlog"
"strings"
)

func ProduceCounter(namespace string, name string, help string, labels map[string]string) prometheus.Counter {
func produceCounter(namespace string, name string, help string, labels map[string]string) prometheus.Counter {
return prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Expand All @@ -17,7 +18,7 @@ func ProduceCounter(namespace string, name string, help string, labels map[strin
})
}

func ProduceGauge(namespace string, name string, help string, labels map[string]string) prometheus.Gauge {
func produceGauge(namespace string, name string, help string, labels map[string]string) prometheus.Gauge {
return prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Expand All @@ -27,7 +28,7 @@ func ProduceGauge(namespace string, name string, help string, labels map[string]
})
}

func ProduceHistogram(namespace string, name string, help string, labels map[string]string) prometheus.Histogram {
func produceHistogram(namespace string, name string, help string, labels map[string]string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Expand All @@ -36,7 +37,7 @@ func ProduceHistogram(namespace string, name string, help string, labels map[str
ConstLabels: labels,
})
}
func ProduceSummary(namespace string, name string, help string, labels map[string]string) prometheus.Summary {
func produceSummary(namespace string, name string, help string, labels map[string]string) prometheus.Summary {
return prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Expand All @@ -51,32 +52,39 @@ var gauges = make(map[string]prometheus.Gauge)
var histograms = make(map[string]prometheus.Histogram)
var summaries = make(map[string]prometheus.Summary)

func ProduceMetric(reg *prometheus.Registry,
metricType string, nameSpace string, name string, help string, labels map[string]string) *prometheus.Registry {
var metricId string
metricId = labels["local_domain"] + "__" + labels["remote_domain"] + "__" + name + "__" + labels["aggregation_function"]
func produceMetric(reg *prometheus.Registry,
metricID string, metricType string) *prometheus.Registry {
splitedMetric := strings.Split(metricID, "__")
labels := make(map[string]string)
labels["type"] = "ss"
labels["remote_domain"] = splitedMetric[len(splitedMetric)-3]
name := splitedMetric[len(splitedMetric)-2]
labels["aggregation_function"] = splitedMetric[len(splitedMetric)-1]
help := name + " aggregated by " + labels["aggregation_function"] + " from ss"
nameSpace := "ss"
if metricType == "Counter" {
counters[metricId] = ProduceCounter(nameSpace, name, help, labels)
reg.MustRegister(counters[metricId])
counters[metricID] = produceCounter(nameSpace, name, help, labels)
reg.MustRegister(counters[metricID])
} else if metricType == "Gauge" {
gauges[metricId] = ProduceGauge(nameSpace, name, help, labels)
reg.MustRegister(gauges[metricId])
gauges[metricID] = produceGauge(nameSpace, name, help, labels)
reg.MustRegister(gauges[metricID])
} else if metricType == "Histogram" {
histograms[metricId] = ProduceHistogram(nameSpace, name, help, labels)
reg.MustRegister(histograms[metricId])
histograms[metricID] = produceHistogram(nameSpace, name, help, labels)
reg.MustRegister(histograms[metricID])
} else if metricType == "Summary" {
summaries[metricId] = ProduceSummary(nameSpace, name, help, labels)
reg.MustRegister(summaries[metricId])
summaries[metricID] = produceSummary(nameSpace, name, help, labels)
reg.MustRegister(summaries[metricID])
}
return reg
}
func Formalize(metric string) string {
func formalize(metric string) string {
metric = strings.Replace(metric, "-", "_", -1)
metric = strings.Replace(metric, ".", "__", -1)
metric = strings.ToLower(metric)
return metric
}
func ProduceMetrics(localDomainName string,
func ProduceMetrics(
localDomainName string,
clusterAddresses map[string][]string,
netMetrics []string,
MetricTypes map[string]string,
Expand All @@ -86,49 +94,41 @@ func ProduceMetrics(localDomainName string,
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
for clusterName := range clusterAddresses {
destinationAddress := clusterAddresses[clusterName]
clusterName = Formalize(clusterName)
labels := make(map[string]string)
labels["type"] = "ss"
labels["local_domain"] = localDomainName
for _, dstAddr := range destinationAddress {
labels["remote_domain"] = Formalize(strings.Split(dstAddr, ":")[0])
for _, metric := range netMetrics {
metric = Formalize(metric)
labels["aggregation_function"] = aggregationMetrics[metric]
reg = ProduceMetric(reg, MetricTypes[metric], "ss", metric, metric+"with aggregation function "+aggregationMetrics[metric], labels)
}
}

}
return reg
}

func UpdateMetrics(clusterResults map[string]float64, MetricTypes map[string]string) {
func UpdateMetrics(reg *prometheus.Registry,
clusterResults map[string]float64, MetricTypes map[string]string) {
for metric, val := range clusterResults {
metricId := Formalize(metric)
metricID := formalize(metric)
splitedMetric := strings.Split(metric, ".")
var metricTypeId string
metricTypeId = splitedMetric[len(splitedMetric)-1]
metricType, ok := MetricTypes[metricTypeId]
if !ok {
metricTypeId = splitedMetric[len(splitedMetric)-2] + "__" + splitedMetric[len(splitedMetric)-1]
metricType, ok = MetricTypes[metricTypeId]
}
var metricTypeID string
metricTypeID = splitedMetric[len(splitedMetric)-2]
metricType, ok := MetricTypes[metricTypeID]
if !ok {
metricTypeId = splitedMetric[len(splitedMetric)-2]
metricType, ok = MetricTypes[metricTypeId]
nlog.Error("Fail to get metric types", ok)
}
switch metricType {
case "Counter":
counters[metricId].Add(val)
if _, ok := counters[metricID]; !ok {
produceMetric(reg, metricID, metricType)
}
counters[metricID].Add(val)
case "Gauge":
gauges[metricId].Set(val)
if _, ok := gauges[metricID]; !ok {
produceMetric(reg, metricID, metricType)
}
gauges[metricID].Set(val)
case "Histogram":
histograms[metricId].Observe(val)
if _, ok := histograms[metricID]; !ok {
produceMetric(reg, metricID, metricType)
}
histograms[metricID].Observe(val)
case "Summary":
summaries[metricId].Observe(val)
if _, ok := summaries[metricID]; !ok {
produceMetric(reg, metricID, metricType)
}
summaries[metricID].Observe(val)
}
}
}
Loading

0 comments on commit cdac81b

Please sign in to comment.