Skip to content

Commit

Permalink
Make waiter always expect initial value even if timeout is zero (evcc…
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Dec 12, 2021
1 parent 96a0475 commit cd43f09
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 40 deletions.
8 changes: 1 addition & 7 deletions provider/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/evcc-io/evcc/provider/mqtt"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/request"
"github.com/itchyny/gojq"
)

Expand Down Expand Up @@ -117,15 +116,10 @@ var _ FloatProvider = (*Mqtt)(nil)
// receiver will ensure actual data guarded by `timeout` and return error
// if initial value is not received within `timeout` or max. 10s if timeout is not given.
func (m *Mqtt) newReceiver() *msgHandler {
wait := m.timeout
if wait == 0 {
wait = request.Timeout
}

h := &msgHandler{
topic: m.topic,
scale: m.scale,
mux: util.NewWaiter(wait, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
mux: util.NewWaiter(m.timeout, func() { m.log.DEBUG.Printf("%s wait for initial value", m.topic) }),
re: m.re,
jq: m.jq,
}
Expand Down
6 changes: 3 additions & 3 deletions provider/mqtt_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func (h *msgHandler) receive(payload string) {

// hasValue returned the received and processed payload as string
func (h *msgHandler) hasValue() (string, error) {
elapsed := h.mux.LockWithTimeout()
h.mux.Lock()
defer h.mux.Unlock()

if elapsed > 0 {
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
if late := h.mux.Overdue(); late > 0 {
return "", fmt.Errorf("%s outdated: %v", h.topic, late.Truncate(time.Second))
}

var err error
Expand Down
6 changes: 3 additions & 3 deletions provider/sma/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func (d *Device) Values() (map[sunny.ValueID]interface{}, error) {
// ensure update loop was started
d.StartUpdateLoop()

elapsed := d.mux.LockWithTimeout()
d.mux.Lock()
defer d.mux.Unlock()

if elapsed > 0 {
return nil, fmt.Errorf("update timeout: %v", elapsed.Truncate(time.Second))
if late := d.mux.Overdue(); late > 0 {
return nil, fmt.Errorf("update timeout: %v", late.Truncate(time.Second))
}

// return a copy of the map to avoid race conditions
Expand Down
6 changes: 3 additions & 3 deletions provider/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ func (p *Socket) listen() {
}

func (p *Socket) hasValue() (interface{}, error) {
elapsed := p.mux.LockWithTimeout()
p.mux.Lock()
defer p.mux.Unlock()

if elapsed > 0 {
return nil, fmt.Errorf("outdated: %v", elapsed.Truncate(time.Second))
if late := p.mux.Overdue(); late > 0 {
return nil, fmt.Errorf("outdated: %v", late.Truncate(time.Second))
}

return p.val, nil
Expand Down
48 changes: 24 additions & 24 deletions util/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,59 @@ import (
"time"
)

const waitTimeout = 50 * time.Millisecond // polling interval when waiting for initial value
var waitInitialTimeout = 10 * time.Second

// Waiter provides monitoring of receive timeouts and reception of initial value
type Waiter struct {
sync.Mutex
log func()
once sync.Once
cond *sync.Cond
updated time.Time
timeout time.Duration
}

// NewWaiter creates new waiter
func NewWaiter(timeout time.Duration, logInitialWait func()) *Waiter {
return &Waiter{
p := &Waiter{
log: logInitialWait,
timeout: timeout,
}
p.cond = sync.NewCond(p)
return p
}

// Update is called when client has received data. Update resets the timeout counter.
// It is client responsibility to ensure that the waiter is not locked when Update is called.
func (p *Waiter) Update() {
p.updated = time.Now()
p.cond.Broadcast()
}

// waitForInitialValue blocks until Update has been called at least once.
// It assumes lock has been obtained before and returns with lock active.
func (p *Waiter) waitForInitialValue() {
// Overdue waits for initial update and returns the duration since the last update
// in excess of timeout. Waiter MUST be locked when calling Overdue.
func (p *Waiter) Overdue() time.Duration {
if p.updated.IsZero() {
p.log()

// wait for initial update
waitStarted := time.Now()
for p.updated.IsZero() {
p.Unlock()
time.Sleep(waitTimeout)
p.Lock()
c := make(chan struct{})

// abort initial wait with error
if p.timeout != 0 && time.Since(waitStarted) > p.timeout {
p.updated = waitStarted
return
go func() {
defer close(c)
for p.updated.IsZero() {
p.cond.Wait()
}
}()

select {
case <-c:
// initial value received, lock established
case <-time.After(waitInitialTimeout):
p.Update() // unblock the sync.Cond
<-c // wait for goroutine, re-establish lock
p.updated = time.Time{} // reset updated to initial value missing
return waitInitialTimeout
}
}
}

// LockWithTimeout waits for initial value and checks if update timeout has elapsed
func (p *Waiter) LockWithTimeout() time.Duration {
p.Lock()

// waiting assumes lock acquired and returns with lock
p.once.Do(p.waitForInitialValue)

if elapsed := time.Since(p.updated); p.timeout != 0 && elapsed > p.timeout {
return elapsed
Expand Down
76 changes: 76 additions & 0 deletions util/waiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package util

import (
"os"
"testing"
"time"
)

const testTimeout = 100 * time.Millisecond

func TestMain(t *testing.M) {
waitInitialTimeout = 2 * testTimeout
os.Exit(t.Run())
}

func TestWaiterInitialUpdateInTime(t *testing.T) {
for _, timeout := range []time.Duration{0, testTimeout} {
w := NewWaiter(timeout, func() {})

go func() {
time.Sleep(testTimeout / 2)
w.Update()
}()

w.Lock()
defer w.Unlock()

if elapsed := w.Overdue(); elapsed != 0 {
t.Errorf("expected %v, got %v", 0, elapsed)
}
}
}

func TestWaiterInitialUpdateNotReceived(t *testing.T) {
for _, timeout := range []time.Duration{0, testTimeout} {
w := NewWaiter(timeout, func() {})

w.Lock()
defer w.Unlock()

if elapsed := w.Overdue(); elapsed != waitInitialTimeout {
t.Errorf("expected %v, got %v", waitInitialTimeout, elapsed)
}
}
}

func TestWaiterUpdateInTime(t *testing.T) {
w := NewWaiter(testTimeout, func() {})
w.Update()

go func() {
time.Sleep(testTimeout / 2)
w.Update()
}()

w.Lock()
defer w.Unlock()

if elapsed := w.Overdue(); elapsed != 0 {
t.Errorf("expected %v, got %v", 0, elapsed)
}
}

func TestWaiterUpdateNotReceived(t *testing.T) {
w := NewWaiter(testTimeout, func() {})
w.Update()

time.Sleep(2 * testTimeout)

w.Lock()
defer w.Unlock()

if elapsed := w.Overdue(); elapsed < 2*testTimeout {
t.Errorf("expected >%v, got %v", 2*testTimeout, elapsed)
}
}

0 comments on commit cd43f09

Please sign in to comment.