Skip to content

Commit

Permalink
Update MQTT component and add example
Browse files Browse the repository at this point in the history
  • Loading branch information
balloob committed Aug 9, 2015
1 parent a2c6dbf commit c8b54d7
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 56 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ homeassistant/components/frontend/www_static/polymer/bower_components/*
config/custom_components/*
!config/custom_components/example.py
!config/custom_components/hello_world.py
!config/custom_components/mqtt_example.py

# Hide sublime text stuff
*.sublime-project
Expand Down
60 changes: 60 additions & 0 deletions config/custom_components/mqtt_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
custom_components.mqtt_example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Shows how to communicate with MQTT. Follows a topic on MQTT and updates the
state of an entity to the last message received on that topic.
Also offers a service 'set_state' that will publish a message on the topic that
will be passed via MQTT to our message received listener. Call the service with
example payload {"new_state": "some new state"}.
Configuration:
To use the mqtt_example component you will need to add the following to your
config/configuration.yaml
mqtt_example:
topic: home-assistant/mqtt_example
"""
import homeassistant.loader as loader

# The domain of your component. Should be equal to the name of your component
DOMAIN = "mqtt_example"

# List of component names (string) your component depends upon
DEPENDENCIES = ['mqtt']


CONF_TOPIC = 'topic'
DEFAULT_TOPIC = 'home-assistant/mqtt_example'


def setup(hass, config):
""" Setup our mqtt_example component. """
mqtt = loader.get_component('mqtt')
topic = config[DOMAIN].get('topic', DEFAULT_TOPIC)
entity_id = 'mqtt_example.last_message'

# Listen to a message on MQTT

def message_received(topic, payload, qos):
""" A new MQTT message has been received. """
hass.states.set(entity_id, payload)

mqtt.subscribe(hass, topic, message_received)

hass.states.set(entity_id, 'No messages')

# Service to publish a message on MQTT

def set_state_service(call):
""" Service to send a message. """
mqtt.publish(hass, topic, call.data.get('new_state'))

# Register our service with Home Assistant
hass.services.register(DOMAIN, 'set_state', set_state_service)

# return boolean to indicate that initialization was successful
return True
148 changes: 92 additions & 56 deletions homeassistant/components/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
To use MQTT you will need to add something like the following to your
config/configuration.yaml.
mqtt:
broker: 127.0.0.1
Or, if you want more options:
mqtt:
broker: 127.0.0.1
port: 1883
topic: home-assistant
client_id: home-assistant-1
keepalive: 60
qos: 0
Variables:
Expand All @@ -28,18 +32,14 @@
*Optional
The network port to connect to. Default is 1883.
topic
client_id
*Optional
The MQTT topic to subscribe to. Default is home-assistant.
Client ID that Home Assistant will use. Has to be unique on the server.
Default is a random generated one.
keepalive
*Optional
The keep alive in seconds for this client, e.g. 60.
qos
*Optional
Quality of service level to use for the subscription.
0, 1, or 2, defaults to 0.
The keep alive in seconds for this client. Default is 60.
"""
import logging
import socket
Expand All @@ -55,7 +55,6 @@

MQTT_CLIENT = None

DEFAULT_TOPIC = 'home-assistant'
DEFAULT_PORT = 1883
DEFAULT_KEEPALIVE = 60
DEFAULT_QOS = 0
Expand All @@ -68,24 +67,36 @@

CONF_BROKER = 'broker'
CONF_PORT = 'port'
CONF_TOPIC = 'topic'
CONF_CLIENT_ID = 'client_id'
CONF_KEEPALIVE = 'keepalive'
CONF_QOS = 'qos'

ATTR_QOS = 'qos'
ATTR_TOPIC = 'topic'
ATTR_SUBTOPIC = 'subtopic'
ATTR_PAYLOAD = 'payload'


def publish(hass, payload, subtopic=None):
def publish(hass, topic, payload):
""" Send an MQTT message. """
data = {ATTR_PAYLOAD: payload}
if subtopic is not None:
data[ATTR_SUBTOPIC] = subtopic
data = {
ATTR_TOPIC: topic,
ATTR_PAYLOAD: payload,
}
hass.services.call(DOMAIN, SERVICE_PUBLISH, data)


def subscribe(hass, topic, callback, qos=0):
""" Subscribe to a topic. """
def mqtt_topic_subscriber(event):
""" Subscribes to a specific MQTT topic. """
if event.data[ATTR_TOPIC] == topic:
callback(topic, event.data[ATTR_PAYLOAD], event.data[ATTR_QOS])

hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, mqtt_topic_subscriber)

if topic not in MQTT_CLIENT.topics:
MQTT_CLIENT.subscribe(topic, qos)


def setup(hass, config):
""" Get the MQTT protocol service. """

Expand All @@ -96,13 +107,12 @@ def setup(hass, config):

broker = conf[CONF_BROKER]
port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT)
topic = util.convert(conf.get(CONF_TOPIC), str, DEFAULT_TOPIC)
client_id = util.convert(conf.get(CONF_CLIENT_ID), str)
keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE)
qos = util.convert(conf.get(CONF_QOS), int, DEFAULT_QOS)

global MQTT_CLIENT
try:
MQTT_CLIENT = MQTT(hass, broker, port, keepalive, qos)
MQTT_CLIENT = MQTT(hass, broker, port, client_id, keepalive)
except socket.error:
_LOGGER.exception("Can't connect to the broker. "
"Please check your settings and the broker "
Expand All @@ -115,19 +125,16 @@ def stop_mqtt(event):

def start_mqtt(event):
""" Launch MQTT component when Home Assistant starts up. """
MQTT_CLIENT.subscribe('{}/#'.format(topic))
MQTT_CLIENT.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt)

def publish_service(call):
""" Handle MQTT publish service calls. """
msg_topic = call.data.get(ATTR_TOPIC)
payload = call.data.get(ATTR_PAYLOAD)
if payload is None:
if msg_topic is None or payload is None:
return
subtopic = call.data.get(ATTR_SUBTOPIC)
msg_topic = '{}/{}'.format(topic, subtopic) if subtopic else topic

MQTT_CLIENT.publish(msg_topic, payload=payload)
MQTT_CLIENT.publish(msg_topic, payload)

hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt)

Expand All @@ -138,49 +145,78 @@ def publish_service(call):

# This is based on one of the paho-mqtt examples:
# http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/tree/examples/sub-class.py
# pylint: disable=too-many-arguments, invalid-name
class MQTT(object):
""" Implements messaging service for MQTT. """
def __init__(self, hass, broker, port, keepalive, qos):
def __init__(self, hass, broker, port, client_id, keepalive):
import paho.mqtt.client as mqtt

self.hass = hass
self._qos = qos
self._progress = {}
self.topics = {}

self._mqttc = mqtt.Client()
self._mqttc.on_message = self.mqtt_on_message
self._mqttc.connect(broker, port, keepalive)

def mqtt_on_message(self, mqttc, obj, msg):
""" Message callback """
if '/' in msg.topic:
msg_topic, msg_subtopic = msg.topic.split('/', 1)
if client_id is None:
self._mqttc = mqtt.Client()
else:
msg_topic, msg_subtopic = msg.topic, ''

self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
ATTR_TOPIC: msg_topic,
ATTR_SUBTOPIC: msg_subtopic,
ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'),
})
self._mqttc = mqtt.Client(client_id)
self._mqttc.on_subscribe = self._mqtt_on_subscribe
self._mqttc.on_unsubscribe = self._mqtt_on_unsubscribe
self._mqttc.on_connect = self._mqtt_on_connect
self._mqttc.on_message = self._mqtt_on_message
self._mqttc.connect(broker, port, keepalive)

def subscribe(self, topic):
""" Subscribe to a topic. """
self._mqttc.subscribe(topic, qos=self._qos)
def publish(self, topic, payload):
""" Publish a MQTT message. """
self._mqttc.publish(topic, payload)

def unsubscribe(self, topic):
""" Unsubscribe from topic. """
self._mqttc.unsubscribe(topic)
result, mid = self._mqttc.unsubscribe(topic)
self._progress[mid] = topic

def start(self):
""" Run the MQTT client. """
self._mqttc.loop_start()

def stop(self):
""" Stop the MQTT client. """
self._mqttc.loop_stop()

def start(self):
""" Run the MQTT client. """
self._mqttc.loop_start()
def subscribe(self, topic, qos):
""" Subscribe to a topic. """
if topic in self.topics:
return
result, mid = self._mqttc.subscribe(topic, qos)
self._progress[mid] = topic
self.topics[topic] = None

def _mqtt_on_connect(self, mqttc, obj, flags, rc):
""" On connect, resubscribe to all topics we were subscribed to. """
old_topics = self.topics
self._progress = {}
self.topics = {}
for topic, qos in old_topics.items():
# qos is None if we were in process of subscribing
if qos is not None:
self._mqttc.subscribe(topic, qos)

def _mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos):
""" Called when subscribe succesfull. """
topic = self._progress.pop(mid, None)
if topic is None:
return
self.topics[topic] = granted_qos

def publish(self, topic, payload):
""" Publish a MQTT message. """
self._mqttc.publish(topic, payload)
def _mqtt_on_unsubscribe(self, mqttc, obj, mid, granted_qos):
""" Called when subscribe succesfull. """
topic = self._progress.pop(mid, None)
if topic is None:
return
self.topics.pop(topic, None)

def _mqtt_on_message(self, mqttc, obj, msg):
""" Message callback """
self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
ATTR_TOPIC: msg.topic,
ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'),
})

0 comments on commit c8b54d7

Please sign in to comment.