Skip to content

Commit

Permalink
Mqtt: fix handler not returning when initial value not received (evcc…
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Dec 10, 2021
1 parent 6dcd0aa commit 94e2a28
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 113 deletions.
133 changes: 20 additions & 113 deletions provider/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package provider

import (
"fmt"
"math"
"regexp"
"strconv"
"time"

"github.com/evcc-io/evcc/provider/mqtt"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/jq"
"github.com/evcc-io/evcc/util/request"
"github.com/itchyny/gojq"
)

Expand Down Expand Up @@ -115,63 +113,54 @@ func (m *Mqtt) WithJq(jq string) (*Mqtt, error) {

var _ FloatProvider = (*Mqtt)(nil)

// FloatGetter creates handler for float64 from MQTT topic that returns cached value
func (m *Mqtt) FloatGetter() func() (float64, error) {
// newReceiver creates a msgHandler and subscribes it to the topic.
// receiver will ensure actual data guarded by `timeout` and return error
// if initial value is not received within `timeout` or max. 10s if timeout is not given.
func (m *Mqtt) newReceiver() *msgHandler {
wait := m.timeout
if wait == 0 {
wait = request.Timeout
}

h := &msgHandler{
topic: m.topic,
scale: m.scale,
mux: util.NewWaiter(m.timeout, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
mux: util.NewWaiter(wait, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
re: m.re,
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
return h
}

// FloatGetter creates handler for float64 from MQTT topic that returns cached value
func (m *Mqtt) FloatGetter() func() (float64, error) {
h := m.newReceiver()
return h.floatGetter
}

var _ IntProvider = (*Mqtt)(nil)

// IntGetter creates handler for int64 from MQTT topic that returns cached value
func (m *Mqtt) IntGetter() func() (int64, error) {
h := &msgHandler{
topic: m.topic,
scale: m.scale,
mux: util.NewWaiter(m.timeout, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
re: m.re,
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
h := m.newReceiver()
return h.intGetter
}

var _ StringProvider = (*Mqtt)(nil)

// StringGetter creates handler for string from MQTT topic that returns cached value
func (m *Mqtt) StringGetter() func() (string, error) {
h := &msgHandler{
topic: m.topic,
mux: util.NewWaiter(m.timeout, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
re: m.re,
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
h := m.newReceiver()
return h.stringGetter
}

var _ BoolProvider = (*Mqtt)(nil)

// BoolGetter creates handler for string from MQTT topic that returns cached value
func (m *Mqtt) BoolGetter() func() (bool, error) {
h := &msgHandler{
topic: m.topic,
mux: util.NewWaiter(m.timeout, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
re: m.re,
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
h := m.newReceiver()
return h.boolGetter
}

Expand Down Expand Up @@ -216,85 +205,3 @@ func (m *Mqtt) StringSetter(param string) func(string) error {
return m.client.Publish(m.topic, false, payload)
}
}

type msgHandler struct {
mux *util.Waiter
scale float64
topic string
payload string
re *regexp.Regexp
jq *gojq.Query
}

func (h *msgHandler) receive(payload string) {
h.mux.Lock()
defer h.mux.Unlock()

h.payload = payload
h.mux.Update()
}

func (h *msgHandler) hasValue() (string, error) {
elapsed := h.mux.LockWithTimeout()
defer h.mux.Unlock()

if elapsed > 0 {
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
}

var err error
payload := h.payload

if h.re != nil {
m := h.re.FindStringSubmatch(payload)
if len(m) > 1 {
payload = m[1] // first submatch
}
}

if h.jq != nil {
var val interface{}
if val, err = jq.Query(h.jq, []byte(payload)); err == nil {
payload = fmt.Sprintf("%v", val)
}
}

return payload, err
}

func (h *msgHandler) floatGetter() (float64, error) {
v, err := h.hasValue()
if err != nil {
return 0, err
}

f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, fmt.Errorf("%s invalid: '%s'", h.topic, v)
}

return f * h.scale, nil
}

func (h *msgHandler) intGetter() (int64, error) {
f, err := h.floatGetter()
return int64(math.Round(f)), err
}

func (h *msgHandler) stringGetter() (string, error) {
v, err := h.hasValue()
if err != nil {
return "", err
}

return string(v), nil
}

func (h *msgHandler) boolGetter() (bool, error) {
v, err := h.hasValue()
if err != nil {
return false, err
}

return util.Truish(v), nil
}
96 changes: 96 additions & 0 deletions provider/mqtt_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package provider

import (
"fmt"
"math"
"regexp"
"strconv"
"time"

"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/jq"
"github.com/itchyny/gojq"
)

type msgHandler struct {
mux *util.Waiter
scale float64
topic string
payload string
re *regexp.Regexp
jq *gojq.Query
}

func (h *msgHandler) receive(payload string) {
h.mux.Lock()
defer h.mux.Unlock()

h.payload = payload
h.mux.Update()
}

// hasValue returned the received and processed payload as string
func (h *msgHandler) hasValue() (string, error) {
elapsed := h.mux.LockWithTimeout()
defer h.mux.Unlock()

if elapsed > 0 {
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
}

var err error
payload := h.payload

if h.re != nil {
m := h.re.FindStringSubmatch(payload)
if len(m) > 1 {
payload = m[1] // first submatch
}
}

if h.jq != nil {
var val interface{}
if val, err = jq.Query(h.jq, []byte(payload)); err == nil {
payload = fmt.Sprintf("%v", val)
}
}

return payload, err
}

func (h *msgHandler) floatGetter() (float64, error) {
v, err := h.hasValue()
if err != nil {
return 0, err
}

f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, fmt.Errorf("%s invalid: '%s'", h.topic, v)
}

return f * h.scale, nil
}

func (h *msgHandler) intGetter() (int64, error) {
f, err := h.floatGetter()
return int64(math.Round(f)), err
}

func (h *msgHandler) stringGetter() (string, error) {
v, err := h.hasValue()
if err != nil {
return "", err
}

return string(v), nil
}

func (h *msgHandler) boolGetter() (bool, error) {
v, err := h.hasValue()
if err != nil {
return false, err
}

return util.Truish(v), nil
}

0 comments on commit 94e2a28

Please sign in to comment.