Skip to content

Commit

Permalink
add proxy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhaus committed Aug 20, 2022
1 parent 8afc17e commit 6fb5e99
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 45 deletions.
27 changes: 24 additions & 3 deletions bumper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import os
import socket
import sys
from typing import Any

from bumper.db import revoke_expired_oauths, revoke_expired_tokens
from bumper.db import (
bot_reset_connectionStatus,
client_reset_connectionStatus,
revoke_expired_oauths,
revoke_expired_tokens,
)
from bumper.mqtt.helper_bot import HelperBot
from bumper.mqtt.server import MQTTServer
from bumper.util import get_logger, log_to_stdout
Expand Down Expand Up @@ -50,6 +54,7 @@ def strtobool(strbool: str | bool | None) -> bool:
use_auth = False
token_validity_seconds = 3600 # 1 hour
oauth_validity_days = 15
bumper_proxy_mode = strtobool(os.environ.get("BUMPER_PROXY_MODE")) or False

mqtt_server: MQTTServer
mqtt_helperbot: HelperBot
Expand All @@ -60,6 +65,7 @@ def strtobool(strbool: str | bool | None) -> bool:

bumperlog = get_logger("bumper")
logging.getLogger("asyncio").setLevel(logging.CRITICAL + 1) # Ignore this logger
proxymodelog = get_logger("proxymode")

web_server_https_port = os.environ.get("WEB_SERVER_HTTPS_PORT") or 443
mqtt_listen_port = 8883
Expand All @@ -71,6 +77,10 @@ def strtobool(strbool: str | bool | None) -> bool:


async def start() -> None:
# Reset xmpp/mqtt to false in database for bots and clients
bot_reset_connectionStatus()
client_reset_connectionStatus()

try:
loop = asyncio.get_event_loop()
except:
Expand Down Expand Up @@ -102,12 +112,16 @@ async def start() -> None:
return

bumperlog.info("Starting Bumper")

if bumper_proxy_mode:
bumperlog.info("Proxy Mode Enabled")

global mqtt_server
mqtt_server = MQTTServer(bumper_listen, mqtt_listen_port)
global mqtt_helperbot
mqtt_helperbot = HelperBot(bumper_listen, mqtt_listen_port)
global web_server
web_server = WebServer(web_server_bindings)
web_server = WebServer(web_server_bindings, bumper_proxy_mode)
global xmpp_server
xmpp_server = XMPPServer(bumper_listen, xmpp_listen_port)

Expand Down Expand Up @@ -194,12 +208,19 @@ def main(argv: None | list[str] = None) -> None:
help="announce address to bots on checkin",
)
parser.add_argument("--debug", action="store_true", help="enable debug logs")
parser.add_argument(
"--proxy-mode", action="store_true", help="enable proxy mode"
)

args = parser.parse_args(args=argv)

if args.debug:
bumper_debug = True

if args.proxy_mode:
global bumper_proxy_mode
bumper_proxy_mode = True

if args.listen:
bumper_listen = args.listen

Expand Down
14 changes: 14 additions & 0 deletions bumper/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,17 @@ def client_set_xmpp(resource: str, xmpp: bool) -> None:
clients = db_get().table("clients")
Client = Query()
clients.upsert({"xmpp_connection": xmpp}, Client.resource == resource)


def bot_reset_connectionStatus() -> None:
bots = db_get().table("bots")
for bot in bots:
bot_set_mqtt(bot["did"], False)
bot_set_xmpp(bot["did"], False)


def client_reset_connectionStatus() -> None:
clients = db_get().table("clients")
for client in clients:
client_set_mqtt(client["resource"], False)
client_set_xmpp(client["resource"], False)
11 changes: 11 additions & 0 deletions bumper/dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from aiohttp import AsyncResolver


def get_resolver_with_public_nameserver() -> AsyncResolver:
# requires aiodns
return AsyncResolver(nameservers=["1.1.1.1", "8.8.8.8"])


async def resolve(host: str) -> str:
hosts = await get_resolver_with_public_nameserver().resolve(host)
return hosts[0]["host"]
2 changes: 1 addition & 1 deletion bumper/mqtt/helper_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, host: str, port: int, timeout: float = 60):
self._port = port
self._client_id = "helperbot@bumper/helperbot"
self._timeout = timeout
self._client = Client("helperbot@bumper/helperbot")
self._client = Client(self._client_id)

# pylint: disable=unused-argument
async def _on_message(
Expand Down
89 changes: 89 additions & 0 deletions bumper/mqtt/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Mqtt proxy module."""
import asyncio
import re
from typing import Any, MutableMapping

from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
from cachetools import TTLCache

from ..util import get_logger

_LOGGER = get_logger("proxymode")

# iot/p2p/[command]]/[sender did]/[sender class]]/[sender resource]
# /[receiver did]/[receiver class]]/[receiver resource]/[q|p/[request id/j
# [q|p] q-> request p-> response

TOPIC_P2P = re.compile(
"iot/p2p/(?P<command>[^/]+)/(?P<sender_id>[^/]+)/(?P<sender_cls>[^/]+)/(?P<sender_resource>[^/]+)/"
"/(?P<receiver_id>[^/]+)/(?P<receiver_cls>[^/]+)/(?P<receiver_resource>[^/]+)/(?P<mode>[^/]+)/"
"(?P<request_id>[^/]+)/(?P<data_type>[^/]+)"
)


class ProxyClient:
"""Mqtt client, which proxies all messages to the ecovacs servers."""

def __init__(
self,
client_id: str,
host: str,
port: int = 443,
config: dict[str, Any] | None = None,
timeout: float = 180,
):
self.request_mapper: MutableMapping[str, str] = TTLCache(
maxsize=timeout * 60, ttl=timeout * 1.1
)
self._client = MQTTClient(client_id=client_id, config=config)
self._host = host
self._port = port

async def connect(self, username: str, password: str) -> None:
try:
await self._client.connect(
f"mqtts://{username}:{password}@{self._host}:{self._port}"
)
except Exception:
_LOGGER.exception("An exception occurred during startup", exc_info=True)
raise

asyncio.create_task(self._handle_messages())

async def _handle_messages(self) -> None:
while self._client.session.transitions.is_connected():
try:
message = await self._client.deliver_message()
data = str(message.data.decode("utf-8"))

_LOGGER.info(
f"MQTT Proxy Client - Message Received From Ecovacs - Topic: {message.topic} - Message: {data}"
)
topic = message.topic
ttopic = topic.split("/")
if ttopic[1] == "p2p":
self.request_mapper[ttopic[10]] = ttopic[3]
ttopic[3] = "proxyhelper"
topic = "/".join(ttopic)
_LOGGER.info(
f"MQTT Proxy Client - Converted Topic From {message.topic} TO {topic}"
)

_LOGGER.info(
f"MQTT Proxy Client - Proxy Forward Message to Robot - Topic: {topic} - Message: {data}"
)
await self._client.publish(topic, data.encode(), QOS_0)
except Exception: # pylint: disable=broad-except
_LOGGER.error(
"An error occurred during handling a message", exc_info=True
)

async def subscribe(self, topic: str, qos: QOS_0 | QOS_1 | QOS_2 = QOS_0) -> None:
await self._client.subscribe([(topic, qos)])

async def disconnect(self) -> None:
await self._client.disconnect()

async def publish(self, topic: str, message: bytes, qos: int | None = None) -> None:
await self._client.publish(topic, message, qos)
94 changes: 92 additions & 2 deletions bumper/mqtt/server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Server module."""

import os
from typing import Any

import amqtt
import pkg_resources
from amqtt.broker import Broker, BrokerContext
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
from amqtt.session import IncomingApplicationMessage, Session
from passlib.apps import custom_app_context as pwd_context

import bumper
from bumper import dns
from bumper.db import (
bot_add,
bot_get,
Expand All @@ -19,11 +20,13 @@
client_get,
client_set_mqtt,
)
from bumper.mqtt.proxy import ProxyClient
from bumper.util import get_logger

mqttserverlog = get_logger("mqttserver")
helperbotlog = get_logger("helperbot")
boterrorlog = get_logger("boterror")
proxymodelog = get_logger("proxymode")


class MQTTServer:
Expand Down Expand Up @@ -122,6 +125,7 @@ class BumperMQTTServerPlugin:
"""MQTT Server plugin which handles the authentication."""

def __init__(self, context: BrokerContext) -> None:
self._proxy_clients: dict[str, ProxyClient] = {}
self.context = context
try:
self.auth_config = self.context.config["auth"]
Expand Down Expand Up @@ -163,6 +167,32 @@ async def authenticate(self, session: Session, **kwargs: dict[str, Any]) -> bool
didsplit[0],
tmpbotdetail[0],
)

if bumper.bumper_proxy_mode:
mqtt_server = await dns.resolve("mq-ww.ecouser.net")
if mqtt_server:
proxymodelog.info(
f"MQTT Proxy Mode - Using server {mqtt_server}"
)
else:
proxymodelog.error(
"MQTT Proxy Mode - No server found! Load defaults or "
"set mqtt_server in config_proxymode table!"
)
proxymodelog.exception(
f"MQTT Proxy Mode - Exiting due to no MQTT Server configured!"
)
exit(1)

proxymodelog.info(
f"MQTT Proxy Mode - Proxy Bot to MQTT - Client_id: {client_id} - Username: {username}"
)
proxy = ProxyClient(
client_id, mqtt_server, config={"check_hostname": False}
)
self._proxy_clients[client_id] = proxy
await proxy.connect(username, password)

return True

tmpclientdetail = str(didsplit[1]).split("/")
Expand Down Expand Up @@ -242,6 +272,21 @@ def _read_password_file(self) -> dict[str, str]:

return users

async def on_broker_client_subscribed(
self, client_id: str, topic: str, qos: QOS_0 | QOS_1 | QOS_2
) -> None:
if bumper.bumper_proxy_mode:
# if proxy mode, also subscribe on ecovacs server
if client_id in self._proxy_clients:
await self._proxy_clients[client_id].subscribe(topic, qos)
proxymodelog.info(
f"MQTT Proxy Mode - New MQTT Topic Subscription - Client: {client_id} - Topic: {topic}"
)
else:
proxymodelog.warning(
f"MQTT Proxy Mode - No proxy client found! - Client: {client_id} - Topic: {topic}"
)

async def on_broker_client_connected(self, client_id: str) -> None:
"""On client connected."""
self._set_client_connected(client_id, True)
Expand All @@ -262,12 +307,13 @@ def _set_client_connected( # pylint: disable=no-self-use
client_set_mqtt(client["resource"], connected)

async def on_broker_message_received( # pylint: disable=no-self-use
self, message: IncomingApplicationMessage, **_: dict[str, Any]
self, message: IncomingApplicationMessage, client_id: str
) -> None:
"""On message received."""
topic = message.topic
topic_split = str(topic).split("/")
data_decoded = str(message.data.decode("utf-8"))

if topic_split[6] == "helperbot":
# Response to command
_log__helperbot_message("Received Response", topic, data_decoded)
Expand All @@ -285,6 +331,50 @@ async def on_broker_message_received( # pylint: disable=no-self-use
else:
_log__helperbot_message("Received Message", topic, data_decoded)

if bumper.bumper_proxy_mode and client_id in self._proxy_clients:
if not topic_split[3] == "proxyhelper":
# if from proxyhelper, don't send back to ecovacs...yet
if topic_split[6] == "proxyhelper":
ttopic = message.topic.split("/")
ttopic[6] = self._proxy_clients[client_id].request_mapper.pop(
ttopic[10], ""
)
if ttopic[6] == "":
proxymodelog.warning(
"MQTT Proxy Client - Request mapper is missing entry, "
f"probably request took to long... Client_id: {client_id}"
f" - Request_id: {ttopic[10]}"
)
return

ttopic_join = "/".join(ttopic)
proxymodelog.info(
f"MQTT Proxy Client - Bot Message Converted Topic From {message.topic} TO {ttopic_join} "
f"with message: {data_decoded}"
)
else:
ttopic_join = message.topic
proxymodelog.info(
f"MQTT Proxy Client - Bot Message From {ttopic_join} with message: {data_decoded}"
)

try:
# Send back to ecovacs
proxymodelog.info(
"MQTT Proxy Client - Proxy Forward Message to Ecovacs - Topic:"
f" {ttopic_join} - Message: {data_decoded}"
)
await self._proxy_clients[client_id].publish(
ttopic_join, data_decoded.encode(), message.qos
)
except Exception: # pylint: disable=broad-except
proxymodelog.error(
"MQTT Proxy Client - Forwarding to Ecovacs - Exception",
exc_info=True,
)

async def on_broker_client_disconnected(self, client_id: str) -> None:
"""On client disconnect."""
if bumper.bumper_proxy_mode and client_id in self._proxy_clients:
await self._proxy_clients.pop(client_id).disconnect()
self._set_client_connected(client_id, False)
Loading

0 comments on commit 6fb5e99

Please sign in to comment.