Skip to content

Commit

Permalink
Merge pull request hybridgroup#650 from trevrosen/feature/644/extend-…
Browse files Browse the repository at this point in the history
…mqtt

Add some new MQTT adaptor functions with QOS
  • Loading branch information
deadprogram authored Jan 4, 2019
2 parents 209256b + 6d1e099 commit ad31c39
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions platforms/mqtt/mqtt_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (

paho "github.com/eclipse/paho.mqtt.golang"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
)

var (
// ErrNilClient is returned when a client action can't be taken because the struct has no client
ErrNilClient = errors.New("no MQTT client available")
)

// Message is a message received from the broker.
Expand Down Expand Up @@ -131,21 +137,43 @@ func (a *Adaptor) Finalize() (err error) {

// Publish a message under a specific topic
func (a *Adaptor) Publish(topic string, message []byte) bool {
if a.client == nil {
_, err := a.PublishWithQOS(topic, a.qos, message)
if err != nil {
return false
}
a.client.Publish(topic, byte(a.qos), false, message)

return true
}

// On subscribes to a topic, and then calls the message handler function when data is received
func (a *Adaptor) On(event string, f func(msg Message)) bool {
// PublishWithQOS allows per-publish QOS values to be set and returns a poken.Token
func (a *Adaptor) PublishWithQOS(topic string, qos int, message []byte) (paho.Token, error) {
if a.client == nil {
return false
return nil, ErrNilClient
}

token := a.client.Publish(topic, byte(qos), false, message)
return token, nil
}

// OnWithQOS allows per-subscribe QOS values to be set and returns a paho.Token
func (a *Adaptor) OnWithQOS(event string, qos int, f func(msg Message)) (paho.Token, error) {
if a.client == nil {
return nil, ErrNilClient
}
a.client.Subscribe(event, byte(a.qos), func(client paho.Client, msg paho.Message) {

token := a.client.Subscribe(event, byte(qos), func(client paho.Client, msg paho.Message) {
f(msg)
})

return token, nil
}

// On subscribes to a topic, and then calls the message handler function when data is received
func (a *Adaptor) On(event string, f func(msg Message)) bool {
_, err := a.OnWithQOS(event, a.qos, f)
if err != nil {
return false
}
return true
}

Expand Down

0 comments on commit ad31c39

Please sign in to comment.