forked from evcc-io/evcc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinfluxdb.go
160 lines (128 loc) Β· 3.54 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package server
import (
"crypto/tls"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/evcc-io/evcc/core/site"
"github.com/evcc-io/evcc/util"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxlog "github.com/influxdata/influxdb-client-go/v2/log"
)
// Influx is a influx publisher
type Influx struct {
sync.Mutex
log *util.Logger
clock clock.Clock
client influxdb2.Client
org string
database string
}
// NewInfluxClient creates new publisher for influx
func NewInfluxClient(url, token, org, user, password, database string, insecure bool) *Influx {
log := util.NewLogger("influx")
// InfluxDB v1 compatibility
if token == "" && user != "" {
token = fmt.Sprintf("%s:%s", user, password)
}
options := influxdb2.DefaultOptions()
options.SetTLSConfig(&tls.Config{InsecureSkipVerify: insecure})
options.SetPrecision(time.Second)
client := influxdb2.NewClientWithOptions(url, token, options)
// handle error logging in writer
influxlog.Log = nil
return &Influx{
log: log,
clock: clock.New(),
client: client,
org: org,
database: database,
}
}
// pointWriter is the minimal interface for influxdb2 api.Writer
type pointWriter interface {
WritePoint(point *write.Point)
}
// writePoint asynchronously writes a point to influx
func (m *Influx) writePoint(writer pointWriter, key string, fields map[string]any, tags map[string]string) {
m.log.TRACE.Printf("write %s=%v (%v)", key, fields, tags)
writer.WritePoint(influxdb2.NewPoint(key, tags, fields, m.clock.Now()))
}
// writeComplexPoint asynchronously writes a point to influx
func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags map[string]string) {
fields := make(map[string]any)
switch val := param.Val.(type) {
case string:
return
case int, int64, float64:
fields["value"] = param.Val
case []float64:
if len(val) != 3 {
return
}
// add array as phase values
for i, v := range val {
fields[fmt.Sprintf("l%d", i+1)] = v
}
case [3]float64:
// add array as phase values
for i, v := range val {
fields[fmt.Sprintf("l%d", i+1)] = v
}
default:
// allow writing nil values
if param.Val == nil {
fields["value"] = nil
break
}
// slice of structs
if typ := reflect.TypeOf(param.Val); typ.Kind() == reflect.Slice && typ.Elem().Kind() == reflect.Struct {
val := reflect.ValueOf(param.Val)
// loop slice
for i := 0; i < val.Len(); i++ {
val := val.Index(i)
typ := val.Type()
// loop struct
for j := 0; j < typ.NumField(); j++ {
n := typ.Field(j).Name
v := val.Field(j).Interface()
key := param.Key + strings.ToUpper(n[:1]) + n[1:]
fields["value"] = v
tags["id"] = strconv.Itoa(i + 1)
m.writePoint(writer, key, fields, tags)
}
}
}
return
}
m.writePoint(writer, param.Key, fields, tags)
}
// Run Influx publisher
func (m *Influx) Run(site site.API, in <-chan util.Param) {
writer := m.client.WriteAPI(m.org, m.database)
// log errors
go func() {
for err := range writer.Errors() {
// log async as we're part of the logging loop
go m.log.ERROR.Println(err)
}
}()
// add points to batch for async writing
for param := range in {
tags := make(map[string]string)
if param.Loadpoint != nil {
lp := site.Loadpoints()[*param.Loadpoint]
tags["loadpoint"] = lp.Title()
if v := lp.GetVehicle(); v != nil {
tags["vehicle"] = v.Title()
}
}
m.writeComplexPoint(writer, param, tags)
}
m.client.Close()
}