Skip to content

Commit

Permalink
espat: implement MQTT subscribe functionality via blocking select/cha…
Browse files Browse the repository at this point in the history
…nnels.

       also refactor response processing for greater speed and efficiency.

Signed-off-by: Ron Evans <[email protected]>
  • Loading branch information
deadprogram authored and conejoninja committed Nov 25, 2019
1 parent 2e606b0 commit 7dcbfbe
Show file tree
Hide file tree
Showing 14 changed files with 851 additions and 247 deletions.
150 changes: 55 additions & 95 deletions espat/espat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package espat // import "tinygo.org/x/drivers/espat"

import (
"errors"
"machine"
"strconv"
"strings"
Expand Down Expand Up @@ -54,11 +55,11 @@ func (d *Device) Connected() bool {
d.Execute(Test)

// handle response here, should include "OK"
r := d.Response(100)
if strings.Contains(string(r), "OK") {
return true
_, err := d.Response(100)
if err != nil {
return false
}
return false
return true
}

// Write raw bytes to the UART.
Expand All @@ -72,7 +73,7 @@ func (d *Device) Read(b []byte) (n int, err error) {
}

// how long in milliseconds to pause after sending AT commands
const pause = 100
const pause = 300

// Execute sends an AT command to the ESP8266/ESP32.
func (d Device) Execute(cmd string) error {
Expand All @@ -97,7 +98,11 @@ func (d Device) Set(cmd, params string) error {
// Version returns the ESP8266/ESP32 firmware version info.
func (d Device) Version() []byte {
d.Execute(Version)
return d.Response(100)
r, err := d.Response(100)
if err != nil {
return []byte("unknown")
}
return r
}

// Echo sets the ESP8266/ESP32 echo setting.
Expand All @@ -122,7 +127,7 @@ func (d Device) Reset() {
// ReadSocket returns the data that has already been read in from the responses.
func (d *Device) ReadSocket(b []byte) (n int, err error) {
// make sure no data in buffer
d.Response(100)
d.Response(300)

count := len(b)
if len(b) >= len(d.socketdata) {
Expand All @@ -142,118 +147,73 @@ func (d *Device) ReadSocket(b []byte) (n int, err error) {

// Response gets the next response bytes from the ESP8266/ESP32.
// The call will retry for up to timeout milliseconds before returning nothing.
func (d *Device) Response(timeout int) []byte {
var i int
pause := 10 // pause to wait for 10 ms
func (d *Device) Response(timeout int) ([]byte, error) {
// read data
var size int
var start, end int
pause := 100 // pause to wait for 100 ms
retries := timeout / pause

header := make([]byte, 2)
for {
for d.bus.Buffered() > 0 {
// get the first 2 bytes
header[0], _ = d.bus.ReadByte()
header[1], _ = d.bus.ReadByte()

if d.isLeadingCRLF(header) {
// skip it
header[0], _ = d.bus.ReadByte()
header[1], _ = d.bus.ReadByte()
size = d.bus.Buffered()

if size > 0 {
end += size
d.bus.Read(d.response[start:end])

// if "+IPD" then read socket data
if strings.Contains(string(d.response[:end]), "+IPD") {
// handle socket data
return nil, d.parseIPD(end)
}

if d.isIPD(header) {
// is socket data packet
d.parseIPD()
} else {
// no, so put into response
d.response[i] = header[0]
i++
d.response[i] = header[1]
i++
// if "OK" then the command worked
if strings.Contains(string(d.response[:end]), "OK") {
return d.response[start:end], nil
}

// read the rest of normal command response
for d.bus.Buffered() > 0 {
data, err := d.bus.ReadByte()
if err != nil {
return nil
}
d.response[i] = data
i++
// if "Error" then the command failed
if strings.Contains(string(d.response[:end]), "ERROR") {
return d.response[start:end], errors.New("response error:" + string(d.response[start:end]))
}

// if anything else, then keep reading data in?
start = end
}

// wait longer?
retries--
if retries == 0 {
break
return nil, errors.New("response timeout error:" + string(d.response[start:end]))
}

// pause to make sure is no more data to be read
time.Sleep(time.Duration(pause) * time.Millisecond)
}
return d.response[:i]
}

func (d *Device) isLeadingCRLF(b []byte) bool {
if len(b) < 2 {
return false
}
if b[0] == 13 && b[1] == 10 {
return true
}
return false
}
func (d *Device) parseIPD(end int) error {
// find the "+IPD," to get length
s := strings.Index(string(d.response[:end]), "+IPD,")

func (d *Device) isIPD(b []byte) bool {
if len(b) < 2 {
return false
}
if b[0] == '+' && b[1] == 'I' {
return true
}
return false
}
// find the ":"
e := strings.Index(string(d.response[:end]), ":")

func (d *Device) parseIPD() bool {
data, _ := d.bus.ReadByte()
if data != 'P' {
// error
return false
}
data, _ = d.bus.ReadByte()
if data != 'D' {
// error
return false
}
data, _ = d.bus.ReadByte()
if data != ',' {
// error
return false
}
// find the data length
val := string(d.response[s+5 : e])

// get the expected data length
// skip remaining header up to the ":"
buf := []byte{}
data, _ = d.bus.ReadByte()
for data != ':' {
// put into the buffer with int value here
buf = append(buf, data)

// read next value
data, _ = d.bus.ReadByte()
}

val := string(buf)
count, err := strconv.Atoi(val)
// TODO: verify count
_, err := strconv.Atoi(val)
if err != nil {
// not expected data here. what to do?
return false
return err
}

// load up the socket data
// only read the expected amount of data
for m := 0; m < count; m++ {
data, _ = d.bus.ReadByte()
d.socketdata = append(d.socketdata, data)
}
d.socketdata = append(d.socketdata, d.response[e+1:end]...)
return nil
}

return true
// IsSocketDataAvailable returns of there is socket data available
func (d *Device) IsSocketDataAvailable() bool {
return len(d.socketdata) > 0 || d.bus.Buffered() > 0
}
Loading

0 comments on commit 7dcbfbe

Please sign in to comment.