Skip to content

Commit

Permalink
Add support for writing to InfluxDB (evcc-io#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Mar 18, 2020
1 parent 3aec820 commit 9b6f5ae
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 12 deletions.
9 changes: 9 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type config struct {
Log string
Interval time.Duration
Mqtt mqttConfig
Influx influxConfig
Menu []server.MenuConfig
Pushover messagingConfig
Meters []meterConfig
Expand All @@ -31,6 +32,14 @@ type mqttConfig struct {
Password string
}

type influxConfig struct {
URL string
Database string
User string
Password string
Interval time.Duration
}

type providerConfig struct {
Type string
Topic string
Expand Down
48 changes: 43 additions & 5 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,25 @@ func checkVersion() {
}
}

var teeIsChained bool // controles piping of first channel in teed chain

func tee(in chan core.Param) (chan core.Param, <-chan core.Param) {
gen := make(chan core.Param)
tee := make(chan core.Param)

go func(teeIsChained bool) {
for i := range gen {
if teeIsChained {
in <- i
}
tee <- i
}
}(teeIsChained)

teeIsChained = true
return gen, tee
}

func run(cmd *cobra.Command, args []string) {
level, _ := cmd.PersistentFlags().GetString("log")
configureLogging(level)
Expand Down Expand Up @@ -170,19 +189,38 @@ func run(cmd *cobra.Command, args []string) {
loadPoints := loadConfig(conf, notificationChan)
go notificationHub.Run(notificationChan)

// start broadcasting values
valueChan := make(chan core.Param)
triggerChan := make(chan struct{})

// setup influx
if viper.Get("influx") != nil {
influx := server.NewInfluxClient(
conf.Influx.URL,
conf.Influx.Database,
conf.Influx.Interval,
conf.Influx.User,
conf.Influx.Password,
)

var teeChan <-chan core.Param
valueChan, teeChan = tee(valueChan)

go influx.Run(teeChan)
}

// create webserver
socketHub := server.NewSocketHub()
httpd := server.NewHttpd(uri, conf.Menu, loadPoints[0], socketHub)

// start broadcasting values
uiChan := make(chan core.Param)
triggerChan := make(chan struct{})
go socketHub.Run(uiChan, triggerChan)
var teeChan <-chan core.Param
valueChan, teeChan = tee(valueChan)
go socketHub.Run(teeChan, triggerChan)

// start all loadpoints
for _, lp := range loadPoints {
lp.Dump()
lp.Prepare(uiChan, notificationChan)
lp.Prepare(valueChan, notificationChan)
go lp.Run(conf.Interval)
}

Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/asaskevich/EventBus v0.0.0-20180315140547-d46933a94f05
github.com/benbjohnson/clock v1.0.0
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/fsnotify/fsnotify v1.4.8 // indirect
github.com/golang/mock v1.4.1
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/golang/mock v1.4.3
github.com/google/go-github v17.0.0+incompatible // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/gorilla/handlers v1.4.2
Expand All @@ -17,6 +17,7 @@ require (
github.com/grid-x/modbus v0.0.0-20200108122021-57d05a9f1e1a
github.com/grid-x/serial v0.0.0-20191104121038-e24bc9bf6f08 // indirect
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/mjibson/esc v0.2.0
github.com/pelletier/go-toml v1.6.0 // indirect
Expand All @@ -29,9 +30,9 @@ require (
github.com/spf13/viper v1.6.2
github.com/tcnksm/go-latest v0.0.0-20170313132115-e3007ae9052e
golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
golang.org/x/tools v0.0.0-20200311090712-aafaee8bce8c
gopkg.in/ini.v1 v1.54.0 // indirect
golang.org/x/sys v0.0.0-20200317113312-5766fd39f98d // indirect
golang.org/x/tools v0.0.0-20200318150045-ba25ddc85566
gopkg.in/ini.v1 v1.55.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)

Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.8 h1:vmjGRBE6skJRlqYIcSsh9B823pS/MqsvxevqbMnBPbI=
github.com/fsnotify/fsnotify v1.4.8/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand All @@ -65,6 +67,8 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.1 h1:ocYkMQY5RrXTYgXl7ICpV0IXwlEQGwKIsery4gyXa1U=
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -126,6 +130,8 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d h1:/WZQPMZNsjZ7IlCpsLGdQBINg5bxKQ1K1sh6awxLtkA=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
Expand Down Expand Up @@ -229,6 +235,7 @@ github.com/tcnksm/go-latest v0.0.0-20170313132115-e3007ae9052e h1:IWllFTiDjjLIf2
github.com/tcnksm/go-latest v0.0.0-20170313132115-e3007ae9052e/go.mod h1:d7u6HkTYKSv5m6MCKkOQlHwaShTMl3HjqSGW3XtVhXM=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
Expand Down Expand Up @@ -305,6 +312,8 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200317113312-5766fd39f98d h1:62ap6LNOjDU6uGmKXHJbSfciMoV+FeI1sRXx/pLDL44=
golang.org/x/sys v0.0.0-20200317113312-5766fd39f98d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -334,6 +343,8 @@ golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200311090712-aafaee8bce8c h1:9WR4YuzLDuQMqEmLQrG0DiMmE2/HvX1dlrujzjmNVFg=
golang.org/x/tools v0.0.0-20200311090712-aafaee8bce8c/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200318150045-ba25ddc85566 h1:OXjomkWHhzUx4+HldlJ2TsMxJdWgEo5CTtspD1wdhdk=
golang.org/x/tools v0.0.0-20200318150045-ba25ddc85566/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down Expand Up @@ -368,6 +379,8 @@ gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.54.0 h1:oM5ElzbIi7gwLnNbPX2M25ED1vSAK3B6dex50eS/6Fs=
gopkg.in/ini.v1 v1.54.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.55.0 h1:E8yzL5unfpW3M6fz/eB7Cb5MQAYSZ7GKo4Qth+N2sgQ=
gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
169 changes: 169 additions & 0 deletions server/influxdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package server

import (
"sync"
"time"

"github.com/andig/evcc/api"
"github.com/andig/evcc/core"
influxdb "github.com/influxdata/influxdb1-client/v2"
)

const (
influxWriteTimeout = 10 * time.Second
influxWriteInterval = 30 * time.Second
precision = "s"
)

// Influx is a influx publisher
type Influx struct {
sync.Mutex
log *api.Logger
client influxdb.Client
points []*influxdb.Point
pointsConf influxdb.BatchPointsConfig
interval time.Duration
measurement string
}

// NewInfluxClient creates new publisher for influx
func NewInfluxClient(
url string,
database string,
interval time.Duration,
user string,
password string,
) *Influx {
log := api.NewLogger("iflx")

if database == "" {
log.FATAL.Fatal("missing database")
}
if interval == 0 {
interval = influxWriteInterval
}

client, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: url,
Username: user,
Password: password,
Timeout: influxWriteTimeout,
})
if err != nil {
log.FATAL.Fatalf("error creating client: %v", err)
}

// check connection
go func(client influxdb.Client) {
if _, _, err := client.Ping(influxWriteTimeout); err != nil {
log.FATAL.Fatalf("%v", err)
}
}(client)

return &Influx{
log: log,
client: client,
interval: interval,
pointsConf: influxdb.BatchPointsConfig{
Database: database,
Precision: precision,
},
}
}

// writeBatchPoints asynchronously writes the collected points
func (m *Influx) writeBatchPoints() {
m.Lock()

// get current batch
if len(m.points) == 0 {
m.Unlock()
return
}

// create new batch
batch, err := influxdb.NewBatchPoints(m.pointsConf)
if err != nil {
m.log.ERROR.Print(err)
m.Unlock()
return
}

// replace current batch
points := m.points
m.points = nil
m.Unlock()

// write batch
batch.AddPoints(points)
m.log.TRACE.Printf("writing %d point(s)", len(points))

if err := m.client.Write(batch); err != nil {
m.log.ERROR.Print(err)

// put points back at beginning of next batch
m.Lock()
m.points = append(points, m.points...)
m.Unlock()
}
}

// asyncWriter periodically calls writeBatchPoints
func (m *Influx) asyncWriter(exit <-chan struct{}) <-chan struct{} {
done := make(chan struct{}) // signal writer stopped

// async batch writer
go func() {
ticker := time.NewTicker(m.interval)
for {
select {
case <-ticker.C:
m.writeBatchPoints()
case <-exit:
ticker.Stop()
m.writeBatchPoints()
close(done)
return
}
}
}()

return done
}

// Run Influx publisher
func (m *Influx) Run(in <-chan core.Param) {
// run async writer
exit := make(chan struct{}) // exit signals to stop writer
done := m.asyncWriter(exit) // done signals writer stopped

// add points to batch for async writing
for param := range in {
if _, ok := param.Val.(float64); !ok {
continue
}

p, err := influxdb.NewPoint(
param.Key,
map[string]string{},
map[string]interface{}{
"value": param.Val,
},
time.Now(),
)
if err != nil {
m.log.ERROR.Printf("failed creating point: %v", err)
continue
}

m.Lock()
m.points = append(m.points, p)
m.Unlock()
}

// close write loop
exit <- struct{}{}
<-done

m.client.Close()
}
4 changes: 2 additions & 2 deletions server/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

const (
// Time allowed to write a message to the peer
writeTimeout = 10 * time.Second
socketWriteTimeout = 10 * time.Second
)

var upgrader = websocket.Upgrader{
Expand Down Expand Up @@ -39,7 +39,7 @@ func (c *SocketClient) writePump() {

for {
msg := <-c.send
if err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
if err := c.conn.SetWriteDeadline(time.Now().Add(socketWriteTimeout)); err != nil {
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
Expand Down

0 comments on commit 9b6f5ae

Please sign in to comment.