forked from evcc-io/evcc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinfluxdb.go
138 lines (114 loc) Β· 2.8 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package server
import (
"fmt"
"sync"
"time"
"github.com/evcc-io/evcc/core/loadpoint"
"github.com/evcc-io/evcc/util"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxlog "github.com/influxdata/influxdb-client-go/v2/log"
)
// InfluxConfig is the influx db configuration
type InfluxConfig struct {
URL string
Database string
Token string
Org string
User string
Password string
Interval time.Duration
}
// Influx is a influx publisher
type Influx struct {
sync.Mutex
log *util.Logger
client influxdb2.Client
org string
database string
}
// NewInfluxClient creates new publisher for influx
func NewInfluxClient(url, token, org, user, password, database string) *Influx {
log := util.NewLogger("influx")
// InfluxDB v1 compatibility
if token == "" && user != "" {
token = fmt.Sprintf("%s:%s", user, password)
}
options := influxdb2.DefaultOptions().SetPrecision(time.Second)
client := influxdb2.NewClientWithOptions(url, token, options)
// handle error logging in writer
influxlog.Log = nil
return &Influx{
log: log,
client: client,
org: org,
database: database,
}
}
// supportedType checks if type can be written as influx value
func (m *Influx) supportedType(p util.Param) bool {
if p.Val == nil {
return true
}
switch val := p.Val.(type) {
case float64:
return true
case [3]float64:
return true
case []float64:
return len(val) == 3
default:
return false
}
}
// Run Influx publisher
func (m *Influx) Run(loadPoints []loadpoint.API, in <-chan util.Param) {
writer := m.client.WriteAPI(m.org, m.database)
// log errors
go func() {
for err := range writer.Errors() {
m.log.ERROR.Println(err)
}
}()
// track active vehicle per loadpoint
vehicles := make(map[int]string)
// add points to batch for async writing
for param := range in {
// vehicle name
if param.LoadPoint != nil {
if name, ok := param.Val.(string); ok && param.Key == "vehicleTitle" {
vehicles[*param.LoadPoint] = name
continue
}
}
if !m.supportedType(param) {
continue
}
tags := map[string]string{}
if param.LoadPoint != nil {
tags["loadpoint"] = loadPoints[*param.LoadPoint].Name()
tags["vehicle"] = vehicles[*param.LoadPoint]
}
fields := map[string]interface{}{}
// array to slice
val := param.Val
if v, ok := val.([3]float64); ok {
val = v[:]
}
// add slice as phase values
if phases, ok := val.([]float64); ok {
var total float64
for i, v := range phases {
total += v
fields[fmt.Sprintf("l%d", i+1)] = v
}
// add total as "value"
val = total
}
fields["value"] = val
// write asynchronously
m.log.TRACE.Printf("write %s=%v (%v)", param.Key, param.Val, tags)
p := influxdb2.NewPoint(param.Key, tags, fields, time.Now())
writer.WritePoint(p)
}
m.client.Close()
}