Skip to content

Commit

Permalink
add metric
Browse files Browse the repository at this point in the history
  • Loading branch information
liuping committed May 1, 2017
1 parent 538ea83 commit 00d8bd9
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 8 deletions.
163 changes: 161 additions & 2 deletions backend/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"io"
"log"
"net/http"
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/shell909090/influx-proxy/monitor"
)

var (
Expand Down Expand Up @@ -58,6 +62,7 @@ func TrimRight(p []byte, s []byte) (r []byte) {

type InfluxCluster struct {
lock sync.RWMutex
SoftTimeout int
Zone string
nexts string
query_executor Querier
Expand All @@ -67,6 +72,23 @@ type InfluxCluster struct {
bas []BackendAPI
backends map[string]BackendAPI
m2bs map[string][]BackendAPI // measurements to backends
stats *Statistics
counter *Statistics
ticker *time.Ticker
defaultTags map[string]string
}

type Statistics struct {
QueryRequests int64
QueryRequestsFail int64
WriteRequests int64
WriteRequestsFail int64
PingRequests int64
PingRequestsFail int64
PointsWritten int64
PointsWrittenFail int64
WriteRequestDuration int64
QueryRequestDuration int64
}

func NewInfluxCluster(cfgsrc *RedisConfigSource, nodecfg *NodeConfig) (ic *InfluxCluster) {
Expand All @@ -76,9 +98,25 @@ func NewInfluxCluster(cfgsrc *RedisConfigSource, nodecfg *NodeConfig) (ic *Influ
query_executor: &InfluxQLExecutor{},
cfgsrc: cfgsrc,
bas: make([]BackendAPI, 0),
stats: &Statistics{},
counter: &Statistics{},
ticker: time.NewTicker(10 * time.Second),
SoftTimeout: 60,
defaultTags: map[string]string{"addr": nodecfg.ListenAddr},
}
host, err := os.Hostname()
if err != nil {
log.Println(err)
}
ic.defaultTags["host"] = host
if nodecfg.Interval > 0 {
ic.ticker = time.NewTicker(time.Second * time.Duration(nodecfg.Interval))
}
if nodecfg.SoftTimeout > 0 {
ic.SoftTimeout = nodecfg.SoftTimeout
}

err := ic.ForbidQuery(ForbidCmds)
err = ic.ForbidQuery(ForbidCmds)
if err != nil {
panic(err)
return
Expand All @@ -88,9 +126,67 @@ func NewInfluxCluster(cfgsrc *RedisConfigSource, nodecfg *NodeConfig) (ic *Influ
panic(err)
return
}

go ic.statistics()
return
}

func (ic *InfluxCluster) statistics() {
// how to quit
for {
select {
case <-ic.ticker.C:
ic.Flush()
ic.lock.Lock()
ic.stats, ic.counter = ic.counter, ic.stats
ic.lock.Unlock()
err := ic.WriteStatistics()
if err != nil {
log.Println(err)
}
default:
}
}
}

func (ic *InfluxCluster) Flush() {
ic.counter.QueryRequests = 0
ic.counter.QueryRequestsFail = 0
ic.counter.WriteRequests = 0
ic.counter.WriteRequestsFail = 0
ic.counter.PingRequests = 0
ic.counter.PingRequestsFail = 0
ic.counter.PointsWritten = 0
ic.counter.PointsWrittenFail = 0
ic.counter.WriteRequestDuration = 0
ic.counter.QueryRequestDuration = 0
}

func (ic *InfluxCluster) WriteStatistics() (err error) {
metric := &monitor.Metric{
Name: "influxdb.cluster",
Tags: ic.defaultTags,
Fields: map[string]interface{}{
"statQueryRequest": ic.counter.QueryRequests,
"statQueryRequestFail": ic.counter.QueryRequestsFail,
"statWriteRequest": ic.counter.WriteRequests,
"statWriteRequestFail": ic.counter.WriteRequestsFail,
"statPingRequest": ic.counter.PingRequests,
"statPingRequestFail": ic.counter.PingRequestsFail,
"statPointsWritten": ic.counter.PointsWritten,
"statPointsWrittenFail": ic.counter.PointsWrittenFail,
"statQueryRequestDuration": ic.counter.QueryRequestDuration,
"statWriteRequestDuration": ic.counter.WriteRequestDuration,
},
Time: time.Now(),
}
line, err := metric.ParseToLine()
if err != nil {
return
}
return ic.Write([]byte(line + "\n"))
}

func (ic *InfluxCluster) ForbidQuery(s string) (err error) {
r, err := regexp.Compile(s)
if err != nil {
Expand Down Expand Up @@ -205,6 +301,9 @@ func (ic *InfluxCluster) LoadConfig() (err error) {
}

func (ic *InfluxCluster) Ping() (version string, err error) {
ic.lock.Lock()
ic.stats.PingRequests += 1
ic.lock.Unlock()
version = VERSION
return
}
Expand Down Expand Up @@ -249,19 +348,38 @@ func (ic *InfluxCluster) GetBackends(key string) (backends []BackendAPI, ok bool
}

func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err error) {
ic.lock.Lock()
ic.stats.QueryRequests += 1
ic.lock.Unlock()
q := strings.TrimSpace(req.FormValue("q"))
defer func(start time.Time) {
offset := time.Since(start).Nanoseconds()
ic.lock.Lock()
ic.stats.QueryRequestDuration += offset
ic.lock.Unlock()
if time.Duration(offset) > time.Second*time.Duration(ic.SoftTimeout) {
log.Printf("the query '%s' spend long time,the client is %s\n", q, req.RemoteAddr)
}
}(time.Now())

switch req.Method {
case "GET", "POST":
default:
w.WriteHeader(400)
w.Write([]byte("illegal method"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
return
}

// TODO: all query in q?
q := strings.TrimSpace(req.FormValue("q"))
if q == "" {
w.WriteHeader(400)
w.Write([]byte("empty query"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
return
}

Expand All @@ -274,6 +392,9 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
if err != nil {
w.WriteHeader(400)
w.Write([]byte("query forbidden"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
return
}

Expand All @@ -282,6 +403,9 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
log.Printf("can't get measurement: %s\n", q)
w.WriteHeader(400)
w.Write([]byte("can't get measurement"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
return
}

Expand All @@ -290,6 +414,9 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
log.Printf("unknown measurement: %s\n", key)
w.WriteHeader(400)
w.Write([]byte("unknown measurement"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
return
}

Expand Down Expand Up @@ -322,12 +449,20 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
}
}

w.WriteHeader(400)
w.Write([]byte("query error"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
return
}

// Wrong in one row will not stop others.
// So don't try to return error, just print it.
func (ic *InfluxCluster) WriteRow(line []byte) {
ic.lock.Lock()
ic.stats.PointsWritten += 1
ic.lock.Unlock()
// maybe trim?
line = bytes.TrimRight(line, " \t\r\n")

Expand All @@ -339,12 +474,18 @@ func (ic *InfluxCluster) WriteRow(line []byte) {
key, err := ScanKey(line)
if err != nil {
log.Printf("scan key error: %s\n", err)
ic.lock.Lock()
ic.stats.PointsWrittenFail += 1
ic.lock.Unlock()
return
}

bs, ok := ic.GetBackends(key)
if !ok {
log.Printf("new measurement: %s\n", key)
ic.lock.Lock()
ic.stats.PointsWrittenFail += 1
ic.lock.Unlock()
// TODO: new measurement?
return
}
Expand All @@ -354,13 +495,25 @@ func (ic *InfluxCluster) WriteRow(line []byte) {
err = b.Write(line)
if err != nil {
log.Printf("cluster write fail: %s\n", key)
ic.lock.Lock()
ic.stats.PointsWrittenFail += 1
ic.lock.Unlock()
return
}
}
return
}

func (ic *InfluxCluster) Write(p []byte) (err error) {
ic.lock.Lock()
ic.stats.WriteRequests += 1
ic.lock.Unlock()
defer func(start time.Time) {
ic.lock.Lock()
ic.stats.WriteRequestDuration += time.Since(start).Nanoseconds()
ic.lock.Unlock()
}(time.Now())

buf := bytes.NewBuffer(p)

var line []byte
Expand All @@ -369,6 +522,9 @@ func (ic *InfluxCluster) Write(p []byte) (err error) {
switch err {
default:
log.Printf("error: %s\n", err)
ic.lock.Lock()
ic.stats.WriteRequestsFail += 1
ic.lock.Unlock()
return
case io.EOF, nil:
err = nil
Expand All @@ -388,6 +544,9 @@ func (ic *InfluxCluster) Write(p []byte) (err error) {
err = n.Write(p)
if err != nil {
log.Printf("error: %s\n", err)
ic.lock.Lock()
ic.stats.WriteRequestsFail += 1
ic.lock.Unlock()
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ func LoadStructFromMap(data map[string]string, o interface{}) (err error) {
}

type NodeConfig struct {
ListenAddr string
DB string
Zone string
Nexts string
ListenAddr string
DB string
Zone string
Nexts string
Interval int
SoftTimeout int
}

type BackendConfig struct {
Expand Down
6 changes: 4 additions & 2 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@
}

# this config will cover default_node config
# listenaddr: proxy listen addr
# listenaddr: proxy listen addr
# db: proxy db, client's db must be same with it
# zone: use for query
# nexts: the backends keys, will accept all data, split with ','
NODES = {
'l1': {
'listenaddr': '6666',
'listenaddr': ':6666',
'db': 'test',
'zone': 'local',
'interval':10,
'softtimeout':60,
}
}

Expand Down
24 changes: 24 additions & 0 deletions monitor/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package monitor

import (
"time"

client "github.com/influxdata/influxdb/client/v2"
)

type Metric struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
Time time.Time `json:"time"`
}

func (m *Metric) ParseToLine() (line string, err error) {
p, err := client.NewPoint(m.Name, m.Tags, m.Fields, m.Time)
if err != nil {
return "", err
}
line = p.PrecisionString("ns")

return
}

0 comments on commit 00d8bd9

Please sign in to comment.