Skip to content

Commit

Permalink
Optimize BlueZ device watchers and condition callbacks to avoid linea…
Browse files Browse the repository at this point in the history
…r searches (hbldh#1400)
  • Loading branch information
bdraco authored Aug 28, 2023
1 parent cd0fda5 commit 256a5be
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Fixed
* Fixed possible crash in ``_stopped_handler()`` in WinRT backend. Fixes #1330.
* Reduced expensive logging in the BlueZ backend. Merged #1376.
* Fixed race condition with ``"InterfaceRemoved"`` when getting services in BlueZ backend.
* Optimize BlueZ backend device watchers and condition callbacks to avoid linear searches

`0.20.2`_ (2023-04-19)
======================
Expand Down
96 changes: 64 additions & 32 deletions bleak/backends/bluezdbus/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
from .descriptor import BleakGATTDescriptorBlueZDBus
from .service import BleakGATTServiceBlueZDBus
from .signals import MatchRules, add_match
from .utils import assert_reply, get_dbus_authenticator
from .utils import (
assert_reply,
get_dbus_authenticator,
device_path_from_characteristic_path,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,12 +71,31 @@ class CallbackAndState(NamedTuple):
"""


DevicePropertiesChangedCallback = Callable[[], None]
DevicePropertiesChangedCallback = Callable[[Optional[Any]], None]
"""
A callback that is called when the properties of a device change in BlueZ.
Args:
arg0: The new property value.
"""


class DeviceConditionCallback(NamedTuple):
"""
Encapsulates a :data:`DevicePropertiesChangedCallback` and the property name being watched.
"""

callback: DevicePropertiesChangedCallback
"""
The callback.
"""

property_name: str
"""
The name of the property to watch.
"""


DeviceRemovedCallback = Callable[[str], None]
"""
A callback that is called when a device is removed from BlueZ.
Expand Down Expand Up @@ -173,8 +196,8 @@ def __init__(self):

self._advertisement_callbacks: List[CallbackAndState] = []
self._device_removed_callbacks: List[DeviceRemovedCallbackAndState] = []
self._device_watchers: Set[DeviceWatcher] = set()
self._condition_callbacks: Dict[str, Set[DevicePropertiesChangedCallback]] = {}
self._device_watchers: Dict[str, Set[DeviceWatcher]] = {}
self._condition_callbacks: Dict[str, Set[DeviceConditionCallback]] = {}
self._services_cache: Dict[str, BleakGATTServiceCollection] = {}

def _check_adapter(self, adapter_path: str) -> None:
Expand Down Expand Up @@ -571,7 +594,7 @@ def add_device_watcher(
device_path, on_connected_changed, on_characteristic_value_changed
)

self._device_watchers.add(watcher)
self._device_watchers.setdefault(device_path, set()).add(watcher)
return watcher

def remove_device_watcher(self, watcher: DeviceWatcher) -> None:
Expand All @@ -582,7 +605,10 @@ def remove_device_watcher(self, watcher: DeviceWatcher) -> None:
The device watcher token that was returned by
:meth:`add_device_watcher`.
"""
self._device_watchers.remove(watcher)
device_path = watcher.device_path
self._device_watchers[device_path].remove(watcher)
if not self._device_watchers[device_path]:
del self._device_watchers[device_path]

async def get_services(
self, device_path: str, use_cached: bool, requested_services: Optional[Set[str]]
Expand Down Expand Up @@ -804,21 +830,23 @@ async def _wait_condition(

event = asyncio.Event()

def callback():
if (
self._properties[device_path][defs.DEVICE_INTERFACE][property_name]
== property_value
):
def _wait_condition_callback(new_value: Optional[Any]) -> None:
"""Callback for when a property changes."""
if new_value == property_value:
event.set()

device_callbacks = self._condition_callbacks.setdefault(device_path, set())
condition_callbacks = self._condition_callbacks
device_callbacks = condition_callbacks.setdefault(device_path, set())
callback = DeviceConditionCallback(_wait_condition_callback, property_name)
device_callbacks.add(callback)

try:
# can be canceled
await event.wait()
finally:
device_callbacks.remove(callback)
if not device_callbacks:
del condition_callbacks[device_path]

def _parse_msg(self, message: Message):
"""
Expand Down Expand Up @@ -915,9 +943,9 @@ def _parse_msg(self, message: Message):
except KeyError:
pass
elif message.member == "PropertiesChanged":
assert message.path is not None

interface, changed, invalidated = message.body
message_path = message.path
assert message_path is not None

try:
self_interface = self._properties[message.path][interface]
Expand Down Expand Up @@ -946,36 +974,40 @@ def _parse_msg(self, message: Message):

if interface == defs.DEVICE_INTERFACE:
# handle advertisement watchers
device_path = message_path

self._run_advertisement_callbacks(
message.path, cast(Device1, self_interface), changed.keys()
device_path, cast(Device1, self_interface), changed.keys()
)

# handle device condition watchers
for condition_callback in self._condition_callbacks.get(
message.path, ()
):
condition_callback()
callbacks = self._condition_callbacks.get(device_path)
if callbacks:
for callback in callbacks:
name = callback.property_name
if name in changed:
callback.callback(self_interface.get(name))

# handle device connection change watchers

if "Connected" in changed:
for (
device_path,
on_connected_changed,
_,
) in self._device_watchers.copy():
# callbacks may remove the watcher, hence the copy() above
if message.path == device_path:
on_connected_changed(self_interface["Connected"])
new_connected = self_interface["Connected"]
watchers = self._device_watchers.get(device_path)
if watchers:
# callbacks may remove the watcher, hence the copy
for watcher in watchers.copy():
watcher.on_connected_changed(new_connected)

elif interface == defs.GATT_CHARACTERISTIC_INTERFACE:
# handle characteristic value change watchers

if "Value" in changed:
for device_path, _, on_value_changed in self._device_watchers:
if message.path.startswith(device_path):
on_value_changed(message.path, self_interface["Value"])
new_value = self_interface["Value"]
device_path = device_path_from_characteristic_path(message_path)
watchers = self._device_watchers.get(device_path)
if watchers:
for watcher in watchers:
watcher.on_characteristic_value_changed(
message_path, new_value
)

def _run_advertisement_callbacks(
self, device_path: str, device: Device1, changed: Iterable[str]
Expand Down
14 changes: 14 additions & 0 deletions bleak/backends/bluezdbus/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ def bdaddr_from_device_path(device_path: str) -> str:
return ":".join(device_path[-17:].split("_"))


def device_path_from_characteristic_path(characteristic_path: str) -> str:
"""
Scrape the device path from a D-Bus characteristic path.
Args:
characteristic_path: The D-Bus object path of the characteristic.
Returns:
A D-Bus object path of the device.
"""
# /org/bluez/hci1/dev_FA_23_9D_AA_45_46/service000c/char000d
return characteristic_path[:37]


def get_dbus_authenticator():
uid = None
try:
Expand Down
23 changes: 23 additions & 0 deletions tests/bleak/backends/bluezdbus/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python

"""Tests for `bleak.backends.bluezdbus.utils` package."""

import pytest
import sys


@pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="requires dbus-fast on Linux"
)
def test_device_path_from_characteristic_path():
"""Test device_path_from_characteristic_path."""
from bleak.backends.bluezdbus.utils import ( # pylint: disable=import-outside-toplevel
device_path_from_characteristic_path,
)

assert (
device_path_from_characteristic_path(
"/org/bluez/hci0/dev_11_22_33_44_55_66/service000c/char000d"
)
== "/org/bluez/hci0/dev_11_22_33_44_55_66"
)

0 comments on commit 256a5be

Please sign in to comment.