Skip to content

Commit

Permalink
fix: 1. 修复监控节点数据被覆盖问题 2. 新增节点没有动态监控问题
Browse files Browse the repository at this point in the history
  • Loading branch information
maskwang committed Aug 27, 2020
1 parent 5e0d183 commit c95faf3
Showing 1 changed file with 65 additions and 52 deletions.
117 changes: 65 additions & 52 deletions monitor/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,47 @@
package monitor

import (
"sync"
"net/http"
"time"
"strconv"
"sync"
"time"

"github.com/flike/kingshard/proxy/server"
"github.com/flike/kingshard/core/golog"
"github.com/flike/kingshard/proxy/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type Prometheus struct {
addr string
svr *server.Server
data sync.Map
addr string
svr *server.Server
nodesData sync.Map
}

//新建prometheus实例
func NewPrometheus(addr string, svr *server.Server) (*Prometheus, error) {
prometheus := new(Prometheus)
prometheus.addr = addr
prometheus.svr = svr
prom := new(Prometheus)
prom.addr = addr
prom.svr = svr

golog.Info("prometheus", "Run", "Prometheus running", 0,
"address",
addr)

return prometheus, nil
return prom, nil
}

//启动prometheus的http监控
func (p *Prometheus) Run() {
// 开始每秒钟获取一次数据
go func() {
for {
data := p.svr.GetMonitorData()

for _, data := range data {
p.data.Store("idleConn", data["idleConn"])
p.data.Store("maxConn", data["maxConn"])
p.data.Store("cacheConns", data["cacheConns"])
p.data.Store("pushConnCount", data["pushConnCount"])
p.data.Store("popConnCount", data["popConnCount"])
}

time.Sleep(1 * time.Second)
}
}()

//设置标签及注册
data := p.svr.GetMonitorData()

label := make(map[string]string)

for addr, data := range data {
label["addr"] = addr
label["type"] = data["type"]

p.addGauge("idleConn", "the db idle connection", label)
p.addGauge("cacheConns", "the db cache connection", label)
p.addGauge("pushConnCount", "the db pushConnCount", label)
p.addGauge("popConnCount", "the db popConnCount", label)
p.addGauge("maxConn", "the max connection config", label)
for addr, item := range data {
p.addNodeData(addr, item)
}

go p.flush()

mux := http.NewServeMux()

mux.Handle("/metrics", promhttp.Handler())
Expand All @@ -90,28 +66,65 @@ func (p *Prometheus) Run() {
}
}

func (p *Prometheus) addGauge(name string, help string, label map[string]string) {
func (p *Prometheus) addNodeData(addr string, data map[string]string) {
var (
nodeData = make(map[string]prometheus.Gauge)

label = map[string]string{
"addr": addr,
"type": data["type"],
}
)

nodeData["idleConn"] = p.addGauge("idleConn", "the db idle connection", label)
nodeData["cacheConns"] = p.addGauge("cacheConns", "the db cache connection", label)
nodeData["pushConnCount"] = p.addGauge("pushConnCount", "the db pushConnCount", label)
nodeData["popConnCount"] = p.addGauge("popConnCount", "the db popConnCount", label)
nodeData["maxConn"] = p.addGauge("maxConn", "the max connection config", label)

p.nodesData.Store(addr, nodeData)
}

func (p *Prometheus) addGauge(name string, help string, label map[string]string) prometheus.Gauge {
gauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: name,
Help: help,
Name: name,
Help: help,
ConstLabels: label,
},
)

prometheus.MustRegister(gauge)

go func() {
for {
pValueInterface, _ := p.data.Load(name)
pValueString, ok := pValueInterface.(string)

if ok {
floatValue, _ := strconv.ParseFloat(pValueString, 10)
gauge.Set(floatValue)
return gauge
}

func (p *Prometheus) flush() {
ticker := time.NewTicker(5 * time.Second)

for range ticker.C {
data := p.svr.GetMonitorData()

for addr, item := range data {
tmp, ok := p.nodesData.Load(addr)
if !ok {
p.addNodeData(addr, item)
continue
}

nodeData, ok := tmp.(map[string]prometheus.Gauge)
if !ok {
continue
}

time.Sleep(5 * time.Second)
for name, gauge := range nodeData {
if _, ok := item[name]; !ok {
continue
}

val, _ := strconv.ParseFloat(item[name], 10)
gauge.Set(val)
}
}
}()
}
}
}

0 comments on commit c95faf3

Please sign in to comment.