Skip to content

Commit

Permalink
add_metric
Browse files Browse the repository at this point in the history
  • Loading branch information
pingliu committed May 9, 2017
1 parent 00d8bd9 commit 39dfda5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 71 deletions.
87 changes: 23 additions & 64 deletions backend/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/shell909090/influx-proxy/monitor"
)
Expand Down Expand Up @@ -62,7 +64,6 @@ func TrimRight(p []byte, s []byte) (r []byte) {

type InfluxCluster struct {
lock sync.RWMutex
SoftTimeout int
Zone string
nexts string
query_executor Querier
Expand Down Expand Up @@ -101,7 +102,6 @@ func NewInfluxCluster(cfgsrc *RedisConfigSource, nodecfg *NodeConfig) (ic *Influ
stats: &Statistics{},
counter: &Statistics{},
ticker: time.NewTicker(10 * time.Second),
SoftTimeout: 60,
defaultTags: map[string]string{"addr": nodecfg.ListenAddr},
}
host, err := os.Hostname()
Expand All @@ -112,9 +112,6 @@ func NewInfluxCluster(cfgsrc *RedisConfigSource, nodecfg *NodeConfig) (ic *Influ
if nodecfg.Interval > 0 {
ic.ticker = time.NewTicker(time.Second * time.Duration(nodecfg.Interval))
}
if nodecfg.SoftTimeout > 0 {
ic.SoftTimeout = nodecfg.SoftTimeout
}

err = ic.ForbidQuery(ForbidCmds)
if err != nil {
Expand All @@ -127,6 +124,7 @@ func NewInfluxCluster(cfgsrc *RedisConfigSource, nodecfg *NodeConfig) (ic *Influ
return
}

// feature
go ic.statistics()
return
}
Expand All @@ -137,9 +135,8 @@ func (ic *InfluxCluster) statistics() {
select {
case <-ic.ticker.C:
ic.Flush()
ic.lock.Lock()
ic.stats, ic.counter = ic.counter, ic.stats
ic.lock.Unlock()
ic.counter = (*Statistics)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&ic.stats)),
unsafe.Pointer(ic.counter)))
err := ic.WriteStatistics()
if err != nil {
log.Println(err)
Expand Down Expand Up @@ -301,9 +298,7 @@ func (ic *InfluxCluster) LoadConfig() (err error) {
}

func (ic *InfluxCluster) Ping() (version string, err error) {
ic.lock.Lock()
ic.stats.PingRequests += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.PingRequests, 1)
version = VERSION
return
}
Expand Down Expand Up @@ -348,38 +343,26 @@ func (ic *InfluxCluster) GetBackends(key string) (backends []BackendAPI, ok bool
}

func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err error) {
ic.lock.Lock()
ic.stats.QueryRequests += 1
ic.lock.Unlock()
q := strings.TrimSpace(req.FormValue("q"))
atomic.AddInt64(&ic.stats.QueryRequests, 1)
defer func(start time.Time) {
offset := time.Since(start).Nanoseconds()
ic.lock.Lock()
ic.stats.QueryRequestDuration += offset
ic.lock.Unlock()
if time.Duration(offset) > time.Second*time.Duration(ic.SoftTimeout) {
log.Printf("the query '%s' spend long time,the client is %s\n", q, req.RemoteAddr)
}
atomic.AddInt64(&ic.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())

switch req.Method {
case "GET", "POST":
default:
w.WriteHeader(400)
w.Write([]byte("illegal method"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.QueryRequestsFail, 1)
return
}

// TODO: all query in q?
q := strings.TrimSpace(req.FormValue("q"))
if q == "" {
w.WriteHeader(400)
w.Write([]byte("empty query"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.QueryRequestsFail, 1)
return
}

Expand All @@ -392,9 +375,7 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
if err != nil {
w.WriteHeader(400)
w.Write([]byte("query forbidden"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.QueryRequestsFail, 1)
return
}

Expand All @@ -403,9 +384,7 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
log.Printf("can't get measurement: %s\n", q)
w.WriteHeader(400)
w.Write([]byte("can't get measurement"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.QueryRequestsFail, 1)
return
}

Expand All @@ -414,9 +393,7 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er
log.Printf("unknown measurement: %s\n", key)
w.WriteHeader(400)
w.Write([]byte("unknown measurement"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.QueryRequestsFail, 1)
return
}

Expand Down Expand Up @@ -451,18 +428,14 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er

w.WriteHeader(400)
w.Write([]byte("query error"))
ic.lock.Lock()
ic.stats.QueryRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.QueryRequestsFail, 1)
return
}

// Wrong in one row will not stop others.
// So don't try to return error, just print it.
func (ic *InfluxCluster) WriteRow(line []byte) {
ic.lock.Lock()
ic.stats.PointsWritten += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.PointsWritten, 1)
// maybe trim?
line = bytes.TrimRight(line, " \t\r\n")

Expand All @@ -474,18 +447,14 @@ func (ic *InfluxCluster) WriteRow(line []byte) {
key, err := ScanKey(line)
if err != nil {
log.Printf("scan key error: %s\n", err)
ic.lock.Lock()
ic.stats.PointsWrittenFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.PointsWrittenFail, 1)
return
}

bs, ok := ic.GetBackends(key)
if !ok {
log.Printf("new measurement: %s\n", key)
ic.lock.Lock()
ic.stats.PointsWrittenFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.PointsWrittenFail, 1)
// TODO: new measurement?
return
}
Expand All @@ -495,23 +464,17 @@ func (ic *InfluxCluster) WriteRow(line []byte) {
err = b.Write(line)
if err != nil {
log.Printf("cluster write fail: %s\n", key)
ic.lock.Lock()
ic.stats.PointsWrittenFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.PointsWrittenFail, 1)
return
}
}
return
}

func (ic *InfluxCluster) Write(p []byte) (err error) {
ic.lock.Lock()
ic.stats.WriteRequests += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.WriteRequests, 1)
defer func(start time.Time) {
ic.lock.Lock()
ic.stats.WriteRequestDuration += time.Since(start).Nanoseconds()
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())

buf := bytes.NewBuffer(p)
Expand All @@ -522,9 +485,7 @@ func (ic *InfluxCluster) Write(p []byte) (err error) {
switch err {
default:
log.Printf("error: %s\n", err)
ic.lock.Lock()
ic.stats.WriteRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.WriteRequestsFail, 1)
return
case io.EOF, nil:
err = nil
Expand All @@ -544,9 +505,7 @@ func (ic *InfluxCluster) Write(p []byte) (err error) {
err = n.Write(p)
if err != nil {
log.Printf("error: %s\n", err)
ic.lock.Lock()
ic.stats.WriteRequestsFail += 1
ic.lock.Unlock()
atomic.AddInt64(&ic.stats.WriteRequestsFail, 1)
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ func LoadStructFromMap(data map[string]string, o interface{}) (err error) {
}

type NodeConfig struct {
ListenAddr string
DB string
Zone string
Nexts string
Interval int
SoftTimeout int
ListenAddr string
DB string
Zone string
Nexts string
Interval int
}

type BackendConfig struct {
Expand Down
1 change: 0 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
'db': 'test',
'zone': 'local',
'interval':10,
'softtimeout':60,
}
}

Expand Down

0 comments on commit 39dfda5

Please sign in to comment.