From 6fb5e99e0623cc42e67975cf12ced5be7a5d598b Mon Sep 17 00:00:00 2001 From: Robert Resch Date: Sat, 20 Aug 2022 12:22:31 +0200 Subject: [PATCH] add proxy mode --- bumper/__init__.py | 27 ++++++- bumper/db.py | 14 ++++ bumper/dns.py | 11 +++ bumper/mqtt/helper_bot.py | 2 +- bumper/mqtt/proxy.py | 89 +++++++++++++++++++++++ bumper/mqtt/server.py | 94 +++++++++++++++++++++++- bumper/web/server.py | 149 +++++++++++++++++++++++++++++--------- requirements.txt | 2 + tests/conftest.py | 2 +- tests/web/test_server.py | 6 +- 10 files changed, 351 insertions(+), 45 deletions(-) create mode 100644 bumper/dns.py create mode 100644 bumper/mqtt/proxy.py diff --git a/bumper/__init__.py b/bumper/__init__.py index ad3e3be..34ffc95 100644 --- a/bumper/__init__.py +++ b/bumper/__init__.py @@ -3,9 +3,13 @@ import os import socket import sys -from typing import Any -from bumper.db import revoke_expired_oauths, revoke_expired_tokens +from bumper.db import ( + bot_reset_connectionStatus, + client_reset_connectionStatus, + revoke_expired_oauths, + revoke_expired_tokens, +) from bumper.mqtt.helper_bot import HelperBot from bumper.mqtt.server import MQTTServer from bumper.util import get_logger, log_to_stdout @@ -50,6 +54,7 @@ def strtobool(strbool: str | bool | None) -> bool: use_auth = False token_validity_seconds = 3600 # 1 hour oauth_validity_days = 15 +bumper_proxy_mode = strtobool(os.environ.get("BUMPER_PROXY_MODE")) or False mqtt_server: MQTTServer mqtt_helperbot: HelperBot @@ -60,6 +65,7 @@ def strtobool(strbool: str | bool | None) -> bool: bumperlog = get_logger("bumper") logging.getLogger("asyncio").setLevel(logging.CRITICAL + 1) # Ignore this logger +proxymodelog = get_logger("proxymode") web_server_https_port = os.environ.get("WEB_SERVER_HTTPS_PORT") or 443 mqtt_listen_port = 8883 @@ -71,6 +77,10 @@ def strtobool(strbool: str | bool | None) -> bool: async def start() -> None: + # Reset xmpp/mqtt to false in database for bots and clients + bot_reset_connectionStatus() + client_reset_connectionStatus() + try: loop = asyncio.get_event_loop() except: @@ -102,12 +112,16 @@ async def start() -> None: return bumperlog.info("Starting Bumper") + + if bumper_proxy_mode: + bumperlog.info("Proxy Mode Enabled") + global mqtt_server mqtt_server = MQTTServer(bumper_listen, mqtt_listen_port) global mqtt_helperbot mqtt_helperbot = HelperBot(bumper_listen, mqtt_listen_port) global web_server - web_server = WebServer(web_server_bindings) + web_server = WebServer(web_server_bindings, bumper_proxy_mode) global xmpp_server xmpp_server = XMPPServer(bumper_listen, xmpp_listen_port) @@ -194,12 +208,19 @@ def main(argv: None | list[str] = None) -> None: help="announce address to bots on checkin", ) parser.add_argument("--debug", action="store_true", help="enable debug logs") + parser.add_argument( + "--proxy-mode", action="store_true", help="enable proxy mode" + ) args = parser.parse_args(args=argv) if args.debug: bumper_debug = True + if args.proxy_mode: + global bumper_proxy_mode + bumper_proxy_mode = True + if args.listen: bumper_listen = args.listen diff --git a/bumper/db.py b/bumper/db.py index b73e89f..cfbd006 100644 --- a/bumper/db.py +++ b/bumper/db.py @@ -415,3 +415,17 @@ def client_set_xmpp(resource: str, xmpp: bool) -> None: clients = db_get().table("clients") Client = Query() clients.upsert({"xmpp_connection": xmpp}, Client.resource == resource) + + +def bot_reset_connectionStatus() -> None: + bots = db_get().table("bots") + for bot in bots: + bot_set_mqtt(bot["did"], False) + bot_set_xmpp(bot["did"], False) + + +def client_reset_connectionStatus() -> None: + clients = db_get().table("clients") + for client in clients: + client_set_mqtt(client["resource"], False) + client_set_xmpp(client["resource"], False) diff --git a/bumper/dns.py b/bumper/dns.py new file mode 100644 index 0000000..e480de3 --- /dev/null +++ b/bumper/dns.py @@ -0,0 +1,11 @@ +from aiohttp import AsyncResolver + + +def get_resolver_with_public_nameserver() -> AsyncResolver: + # requires aiodns + return AsyncResolver(nameservers=["1.1.1.1", "8.8.8.8"]) + + +async def resolve(host: str) -> str: + hosts = await get_resolver_with_public_nameserver().resolve(host) + return hosts[0]["host"] diff --git a/bumper/mqtt/helper_bot.py b/bumper/mqtt/helper_bot.py index 4ce7060..4dd6ccb 100644 --- a/bumper/mqtt/helper_bot.py +++ b/bumper/mqtt/helper_bot.py @@ -46,7 +46,7 @@ def __init__(self, host: str, port: int, timeout: float = 60): self._port = port self._client_id = "helperbot@bumper/helperbot" self._timeout = timeout - self._client = Client("helperbot@bumper/helperbot") + self._client = Client(self._client_id) # pylint: disable=unused-argument async def _on_message( diff --git a/bumper/mqtt/proxy.py b/bumper/mqtt/proxy.py new file mode 100644 index 0000000..8b15711 --- /dev/null +++ b/bumper/mqtt/proxy.py @@ -0,0 +1,89 @@ +"""Mqtt proxy module.""" +import asyncio +import re +from typing import Any, MutableMapping + +from amqtt.client import MQTTClient +from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 +from cachetools import TTLCache + +from ..util import get_logger + +_LOGGER = get_logger("proxymode") + +# iot/p2p/[command]]/[sender did]/[sender class]]/[sender resource] +# /[receiver did]/[receiver class]]/[receiver resource]/[q|p/[request id/j +# [q|p] q-> request p-> response + +TOPIC_P2P = re.compile( + "iot/p2p/(?P[^/]+)/(?P[^/]+)/(?P[^/]+)/(?P[^/]+)/" + "/(?P[^/]+)/(?P[^/]+)/(?P[^/]+)/(?P[^/]+)/" + "(?P[^/]+)/(?P[^/]+)" +) + + +class ProxyClient: + """Mqtt client, which proxies all messages to the ecovacs servers.""" + + def __init__( + self, + client_id: str, + host: str, + port: int = 443, + config: dict[str, Any] | None = None, + timeout: float = 180, + ): + self.request_mapper: MutableMapping[str, str] = TTLCache( + maxsize=timeout * 60, ttl=timeout * 1.1 + ) + self._client = MQTTClient(client_id=client_id, config=config) + self._host = host + self._port = port + + async def connect(self, username: str, password: str) -> None: + try: + await self._client.connect( + f"mqtts://{username}:{password}@{self._host}:{self._port}" + ) + except Exception: + _LOGGER.exception("An exception occurred during startup", exc_info=True) + raise + + asyncio.create_task(self._handle_messages()) + + async def _handle_messages(self) -> None: + while self._client.session.transitions.is_connected(): + try: + message = await self._client.deliver_message() + data = str(message.data.decode("utf-8")) + + _LOGGER.info( + f"MQTT Proxy Client - Message Received From Ecovacs - Topic: {message.topic} - Message: {data}" + ) + topic = message.topic + ttopic = topic.split("/") + if ttopic[1] == "p2p": + self.request_mapper[ttopic[10]] = ttopic[3] + ttopic[3] = "proxyhelper" + topic = "/".join(ttopic) + _LOGGER.info( + f"MQTT Proxy Client - Converted Topic From {message.topic} TO {topic}" + ) + + _LOGGER.info( + f"MQTT Proxy Client - Proxy Forward Message to Robot - Topic: {topic} - Message: {data}" + ) + await self._client.publish(topic, data.encode(), QOS_0) + except Exception: # pylint: disable=broad-except + _LOGGER.error( + "An error occurred during handling a message", exc_info=True + ) + + async def subscribe(self, topic: str, qos: QOS_0 | QOS_1 | QOS_2 = QOS_0) -> None: + await self._client.subscribe([(topic, qos)]) + + async def disconnect(self) -> None: + await self._client.disconnect() + + async def publish(self, topic: str, message: bytes, qos: int | None = None) -> None: + await self._client.publish(topic, message, qos) diff --git a/bumper/mqtt/server.py b/bumper/mqtt/server.py index 3e88255..a2614f0 100644 --- a/bumper/mqtt/server.py +++ b/bumper/mqtt/server.py @@ -1,15 +1,16 @@ """Server module.""" - import os from typing import Any import amqtt import pkg_resources from amqtt.broker import Broker, BrokerContext +from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from amqtt.session import IncomingApplicationMessage, Session from passlib.apps import custom_app_context as pwd_context import bumper +from bumper import dns from bumper.db import ( bot_add, bot_get, @@ -19,11 +20,13 @@ client_get, client_set_mqtt, ) +from bumper.mqtt.proxy import ProxyClient from bumper.util import get_logger mqttserverlog = get_logger("mqttserver") helperbotlog = get_logger("helperbot") boterrorlog = get_logger("boterror") +proxymodelog = get_logger("proxymode") class MQTTServer: @@ -122,6 +125,7 @@ class BumperMQTTServerPlugin: """MQTT Server plugin which handles the authentication.""" def __init__(self, context: BrokerContext) -> None: + self._proxy_clients: dict[str, ProxyClient] = {} self.context = context try: self.auth_config = self.context.config["auth"] @@ -163,6 +167,32 @@ async def authenticate(self, session: Session, **kwargs: dict[str, Any]) -> bool didsplit[0], tmpbotdetail[0], ) + + if bumper.bumper_proxy_mode: + mqtt_server = await dns.resolve("mq-ww.ecouser.net") + if mqtt_server: + proxymodelog.info( + f"MQTT Proxy Mode - Using server {mqtt_server}" + ) + else: + proxymodelog.error( + "MQTT Proxy Mode - No server found! Load defaults or " + "set mqtt_server in config_proxymode table!" + ) + proxymodelog.exception( + f"MQTT Proxy Mode - Exiting due to no MQTT Server configured!" + ) + exit(1) + + proxymodelog.info( + f"MQTT Proxy Mode - Proxy Bot to MQTT - Client_id: {client_id} - Username: {username}" + ) + proxy = ProxyClient( + client_id, mqtt_server, config={"check_hostname": False} + ) + self._proxy_clients[client_id] = proxy + await proxy.connect(username, password) + return True tmpclientdetail = str(didsplit[1]).split("/") @@ -242,6 +272,21 @@ def _read_password_file(self) -> dict[str, str]: return users + async def on_broker_client_subscribed( + self, client_id: str, topic: str, qos: QOS_0 | QOS_1 | QOS_2 + ) -> None: + if bumper.bumper_proxy_mode: + # if proxy mode, also subscribe on ecovacs server + if client_id in self._proxy_clients: + await self._proxy_clients[client_id].subscribe(topic, qos) + proxymodelog.info( + f"MQTT Proxy Mode - New MQTT Topic Subscription - Client: {client_id} - Topic: {topic}" + ) + else: + proxymodelog.warning( + f"MQTT Proxy Mode - No proxy client found! - Client: {client_id} - Topic: {topic}" + ) + async def on_broker_client_connected(self, client_id: str) -> None: """On client connected.""" self._set_client_connected(client_id, True) @@ -262,12 +307,13 @@ def _set_client_connected( # pylint: disable=no-self-use client_set_mqtt(client["resource"], connected) async def on_broker_message_received( # pylint: disable=no-self-use - self, message: IncomingApplicationMessage, **_: dict[str, Any] + self, message: IncomingApplicationMessage, client_id: str ) -> None: """On message received.""" topic = message.topic topic_split = str(topic).split("/") data_decoded = str(message.data.decode("utf-8")) + if topic_split[6] == "helperbot": # Response to command _log__helperbot_message("Received Response", topic, data_decoded) @@ -285,6 +331,50 @@ async def on_broker_message_received( # pylint: disable=no-self-use else: _log__helperbot_message("Received Message", topic, data_decoded) + if bumper.bumper_proxy_mode and client_id in self._proxy_clients: + if not topic_split[3] == "proxyhelper": + # if from proxyhelper, don't send back to ecovacs...yet + if topic_split[6] == "proxyhelper": + ttopic = message.topic.split("/") + ttopic[6] = self._proxy_clients[client_id].request_mapper.pop( + ttopic[10], "" + ) + if ttopic[6] == "": + proxymodelog.warning( + "MQTT Proxy Client - Request mapper is missing entry, " + f"probably request took to long... Client_id: {client_id}" + f" - Request_id: {ttopic[10]}" + ) + return + + ttopic_join = "/".join(ttopic) + proxymodelog.info( + f"MQTT Proxy Client - Bot Message Converted Topic From {message.topic} TO {ttopic_join} " + f"with message: {data_decoded}" + ) + else: + ttopic_join = message.topic + proxymodelog.info( + f"MQTT Proxy Client - Bot Message From {ttopic_join} with message: {data_decoded}" + ) + + try: + # Send back to ecovacs + proxymodelog.info( + "MQTT Proxy Client - Proxy Forward Message to Ecovacs - Topic:" + f" {ttopic_join} - Message: {data_decoded}" + ) + await self._proxy_clients[client_id].publish( + ttopic_join, data_decoded.encode(), message.qos + ) + except Exception: # pylint: disable=broad-except + proxymodelog.error( + "MQTT Proxy Client - Forwarding to Ecovacs - Exception", + exc_info=True, + ) + async def on_broker_client_disconnected(self, client_id: str) -> None: """On client disconnect.""" + if bumper.bumper_proxy_mode and client_id in self._proxy_clients: + await self._proxy_clients.pop(client_id).disconnect() self._set_client_connected(client_id, False) diff --git a/bumper/web/server.py b/bumper/web/server.py index cc231cb..c506d44 100644 --- a/bumper/web/server.py +++ b/bumper/web/server.py @@ -1,5 +1,4 @@ """Web server module.""" - import asyncio import dataclasses import json @@ -7,20 +6,18 @@ import os import ssl +import aiohttp import aiohttp_jinja2 import jinja2 from aiohttp import web from aiohttp.typedefs import Handler -from aiohttp.web_exceptions import ( - HTTPBadRequest, - HTTPInternalServerError, - HTTPNoContent, -) +from aiohttp.web_exceptions import HTTPInternalServerError, HTTPNoContent from aiohttp.web_request import Request from aiohttp.web_response import Response, StreamResponse import bumper from bumper.db import bot_get, bot_remove, client_get, client_remove, db_get +from bumper.dns import get_resolver_with_public_nameserver from bumper.util import get_logger from bumper.web.plugins import add_plugins @@ -41,6 +38,7 @@ def filter(self, record: logging.LogRecord) -> bool: confserverlog = get_logger("confserver") # Add logging filter above to aiohttp.access logging.getLogger("aiohttp.access").addFilter(_aiohttp_filter()) +proxymodelog = logging.getLogger("proxymode") @dataclasses.dataclass(frozen=True) @@ -57,7 +55,9 @@ class WebServer: _EXCLUDE_FROM_LOGGING = ["base", "remove-bot", "remove-client", "restart-service"] - def __init__(self, bindings: list[WebserverBinding] | WebserverBinding): + def __init__( + self, bindings: list[WebserverBinding] | WebserverBinding, proxy_mode: bool + ): self._runners: list[web.AppRunner] = [] if isinstance(bindings, WebserverBinding): @@ -75,32 +75,39 @@ def __init__(self, bindings: list[WebserverBinding] | WebserverBinding): os.path.join(bumper.bumper_dir, "bumper", "web", "templates") ), ) - self._add_routes() + self._add_routes(proxy_mode) self._app.freeze() # no modification allowed anymore - def _add_routes(self) -> None: + def _add_routes(self, proxy_mode: bool) -> None: self._app.add_routes( [ - web.get("", self._handle_base, name="base"), - web.get( - "/bot/remove/{did}", self._handle_remove_bot, name="remove-bot" - ), + web.get("/bot/remove/{did}", self._handle_remove_bot), web.get( "/client/remove/{resource}", self._handle_remove_client, - name="remove-client", ), web.get( "/restart_{service}", self._handle_restart_service, - name="restart-service", ), - web.post("/lookup.do", self._handle_lookup), - web.post("/newauth.do", self._handle_newauth), ] ) - add_plugins(self._app) + if proxy_mode: + self._app.add_routes( + [ + web.route("*", "/{path:.*}", self._handle_proxy), + ] + ) + else: + self._app.add_routes( + [ + web.get("", self._handle_base), + web.post("/lookup.do", self._handle_lookup), + web.post("/newauth.do", self._handle_newauth), + ] + ) + add_plugins(self._app) async def start(self) -> None: """Start server.""" @@ -189,22 +196,11 @@ async def _log_all_requests( } } try: - postbody = None if request.content_length: - if request.content_type == "application/x-www-form-urlencoded": - postbody = await request.post() - - elif request.content_type == "application/json": - try: - postbody = json.loads(await request.text()) - except Exception as e: - confserverlog.error(f"Request body not json: {e}") - raise HTTPBadRequest(reason="Body was not json") - + if request.content_type == "application/json": + to_log["request"]["body"] = await request.text() else: - postbody = await request.post() - - to_log["request"]["body"] = f"{postbody}" + to_log["request"]["body"] = f"{await request.post()}" response = await handler(request) if response is None: @@ -217,13 +213,17 @@ async def _log_all_requests( to_log["response"] = { "status": f"{response.status}", } + if ( - "application/octet-stream" not in response.content_type - and isinstance(response, Response) + isinstance(response, Response) and response.body + and ( + response.content_type.startswith("text") + or response.content_type == "application/json" + ) ): assert isinstance(response.body, bytes) - to_log["response"]["body"] = f"{json.loads(response.body)}" + to_log["response"]["body"] = response.body.decode("utf-8") confserverlog.debug(json.dumps(to_log)) @@ -365,3 +365,82 @@ async def _handle_newauth(self, request: Request) -> Response: confserverlog.exception(f"{e}") raise HTTPInternalServerError + + async def _handle_proxy(self, request: Request) -> Response: + try: + if request.raw_path == "/": + return await self._handle_base(request) + if request.raw_path == "/lookup.do": + return await self._handle_lookup(request) + # use bumper to handle lookup so bot gets Bumper IP and not Ecovacs + + async with aiohttp.ClientSession( + headers=request.headers, + connector=aiohttp.TCPConnector( + verify_ssl=False, resolver=get_resolver_with_public_nameserver() + ), + ) as session: + if request.content.total_bytes > 0: + read_body = await request.read() + proxymodelog.info( + f"HTTP Proxy Request to EcoVacs (body=true) (URL:{request.url}) - {read_body}" + ) + if request.content_type == "application/x-www-form-urlencoded": + # android apps use form + fdata = await request.post() + async with session.request( + request.method, request.url, data=fdata + ) as resp: + response = await resp.text() + proxymodelog.info( + f"HTTP Proxy Response from EcoVacs (URL: {request.url}) - (Status: {resp.status}) - {response}" + ) + else: + # handle json + jdata = read_body.decode("utf8") + jdata = json.loads(jdata) + async with session.request( + request.method, request.url, json=jdata + ) as resp: + response = await resp.text() + proxymodelog.info( + f"HTTP Proxy Response from EcoVacs (URL: {request.url}) - (Status: {resp.status}) - {response}" + ) + + else: + proxymodelog.info( + f"HTTP Proxy Request to EcoVacs (body=false) (URL:{request.url})" + ) + async with session.request(request.method, request.url) as resp: + if resp.content_type == "application/octet-stream": + response = await resp.read() + proxymodelog.info( + f"HTTP Proxy Response from EcoVacs (URL: {request.url}) - (Status: {resp.status}) - " + ) + else: + response = await resp.text() + proxymodelog.info( + f"HTTP Proxy Response from EcoVacs (URL: {request.url}) - (Status: {resp.status}) - {response}" + ) + + if resp.status == 200: + if resp.content_type == "application/json": + response = json.loads(response) + return web.json_response(response) + elif resp.content_type == "application/octet-stream": + return web.Response(body=response) + else: + return web.Response(text=response) + + else: + return web.Response(text=response) + + except asyncio.CancelledError: + proxymodelog.exception( + f"Request cancelled or timeout - {request.url} - {jdata}", exc_info=True + ) + + except Exception: + proxymodelog.exception("An exception occurred", exc_info=True) + + raise HTTPInternalServerError diff --git a/requirements.txt b/requirements.txt index 9c78c3a..e9b4061 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +aiodns==3.0.0 aiohttp==3.8.1 aiohttp-jinja2==1.5 cachetools==5.2.0 @@ -5,3 +6,4 @@ git+https://github.com/Yakifo/amqtt@master#amqtt==11.0.0 gmqtt==0.6.11 Jinja2==3.1.2 tinydb==4.7.0 +websockets==10.3 diff --git a/tests/conftest.py b/tests/conftest.py index 30176be..0e6b9d9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -54,7 +54,7 @@ async def helper_bot(mqtt_server: MQTTServer): @pytest.fixture async def webserver_client(aiohttp_client): - webserver = bumper.WebServer(WebserverBinding(HOST, WEBSERVER_PORT, False)) + webserver = bumper.WebServer(WebserverBinding(HOST, WEBSERVER_PORT, False), False) client = await aiohttp_client(webserver._app) yield client diff --git a/tests/web/test_server.py b/tests/web/test_server.py index 943b056..47464c5 100644 --- a/tests/web/test_server.py +++ b/tests/web/test_server.py @@ -12,7 +12,7 @@ def create_webserver(): - return WebServer(WebserverBinding(HOST, WEBSERVER_PORT, False)) + return WebServer(WebserverBinding(HOST, WEBSERVER_PORT, False), False) def async_return(result): @@ -27,12 +27,12 @@ def remove_existing_db(): async def test_webserver_ssl(): - webserver = WebServer(WebserverBinding(HOST, WEBSERVER_PORT, True)) + webserver = WebServer(WebserverBinding(HOST, WEBSERVER_PORT, True), False) await webserver.start() async def test_webserver_no_ssl(): - webserver = WebServer(WebserverBinding(HOST, 11112, False)) + webserver = WebServer(WebserverBinding(HOST, 11112, False), False) await webserver.start()