forked from kpetremann/mqtt-exporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexporter.py
109 lines (86 loc) · 3.51 KB
/
exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
"""MQTT exporter."""
import json
import logging
import os
import signal
import sys
import paho.mqtt.client as mqtt
from prometheus_client import Gauge, Counter, start_http_server
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
LOG = logging.getLogger("mqtt-exporter")
PREFIX = os.environ.get("PROMETHEUS_PREFIX", "mqtt_")
# global variable
prom_metrics = {} # pylint: disable=C0103
topic_label = os.environ.get("TOPIC_LABEL", "topic")
prom_msg_counter = Counter(
f"{PREFIX}message_counter", "Counter of received messages", [topic_label]
)
LOG.info("creating counter for message count")
def subscribe(client, userdata, flags, connection_result): # pylint: disable=W0613
"""Subscribe to mqtt events (callback)."""
client.subscribe("zigbee2mqtt/#")
def expose_metrics(client, userdata, msg): # pylint: disable=W0613
"""Expose metrics to prometheus when a message has been published (callback)."""
try:
payload = json.loads(msg.payload)
topic = msg.topic.replace("/", "_")
except json.JSONDecodeError:
LOG.debug('failed to parse as JSON: "%s"', msg.payload)
return
# we except a dict from zigbee metrics in MQTT
if not isinstance(payload, dict):
LOG.debug('unexpected payload format: "%s"', payload)
return
for metric, value in payload.items():
# we only expose numeric values
try:
metric_value = float(value)
except (ValueError, TypeError):
LOG.debug("Failed to convert %s: %s", metric, value)
continue
# create metric if does not exist
prom_metric_name = f"{PREFIX}{metric}"
if not prom_metrics.get(prom_metric_name):
prom_metrics[prom_metric_name] = Gauge(
prom_metric_name, "metric generated from MQTT message.", [topic_label]
)
LOG.info("creating prometheus metric: %s ", prom_metric_name)
# expose the metric to prometheus
prom_metrics[prom_metric_name].labels(**{topic_label: topic}).set(metric_value)
LOG.debug("new value for %s: %s", prom_metric_name, metric_value)
# Now inc a counter for the message count
prom_msg_counter.labels(**{topic_label: topic}).inc()
def main():
"""Start the exporter."""
client = mqtt.Client()
def stop_request(signum, frame):
"""Stop handler for SIGTERM and SIGINT.
Keyword arguments:
signum -- signal number
frame -- None or a frame object. Represents execution frames
"""
LOG.warning("Stopping MQTT exporter")
LOG.debug("SIGNAL: %s, FRAME: %s", signum, frame)
client.disconnect()
sys.exit(0)
signal.signal(signal.SIGTERM, stop_request)
signal.signal(signal.SIGINT, stop_request)
# get parameters from environment
mqtt_address = os.environ.get("MQTT_ADDRESS", "127.0.0.1")
mqtt_port = os.environ.get("MQTT_PORT", 1883)
mqtt_keepalive = os.environ.get("MQTT_KEEPALIVE", 60)
mqtt_username = os.environ.get("MQTT_USERNAME")
mqtt_password = os.environ.get("MQTT_PASSWORD")
# start prometheus server
start_http_server(os.environ.get("PROMETHEUS_PORT", 9000))
# define mqtt client
client.on_connect = subscribe
client.on_message = expose_metrics
# start the connection and the loop
if mqtt_username and mqtt_password:
client.username_pw_set(mqtt_username, mqtt_password)
client.connect(mqtt_address, int(mqtt_port), mqtt_keepalive)
client.loop_forever()
if __name__ == "__main__":
main()