Skip to content

Commit

Permalink
Readded old .NET discovery method to try in hbldh#95.
Browse files Browse the repository at this point in the history
  • Loading branch information
hbldh committed Aug 2, 2019
1 parent b16d7c3 commit 52155e1
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 4 deletions.
9 changes: 6 additions & 3 deletions bleak/backends/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ class BLEDevice(object):
a `discover` call.
- When using Windows backend, `details` attribute is a
`Windows.Devices.Bluetooth.Advertisement.BluetoothLEAdvertisement` object.
`Windows.Devices.Bluetooth.Advertisement.BluetoothLEAdvertisement` object, unless
it is created with the Windows.Devices.Enumeration discovery method, then is is a
`Windows.Devices.Enumeration.DeviceInformation`
- When using Linux backend, `details` attribute is a
dict with keys `path` which has the string path to the DBus device object and `props`
which houses the properties dictionary of the D-Bus Device.
- When using macOS backend, `details` attribute will be a CBPeripheral
object
- When using macOS backend, `details` attribute will be a CBPeripheral object
"""

def __init__(self, address, name, details=None, **kwargs):
Expand All @@ -35,6 +36,8 @@ def rssi(self):
rssi = self.details["props"].get("RSSI", 0) # Should not be set to 0...
elif hasattr(self.details, "RawSignalStrengthInDBm"):
rssi = self.details.RawSignalStrengthInDBm
elif hasattr(self.details, "Properties"):
rssi = {p.Key: p.Value for p in self.details.Properties}['System.Devices.Aep.SignalStrength']
else:
rssi = None
return int(rssi) if rssi is not None else None
Expand Down
121 changes: 120 additions & 1 deletion bleak/backends/dotnet/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from BleakBridge import Bridge

from System import Array, Byte
from Windows.Devices import Enumeration
from Windows.Devices.Bluetooth.Advertisement import BluetoothLEAdvertisementWatcher
from Windows.Storage.Streams import DataReader, IBuffer

Expand All @@ -27,7 +28,7 @@
async def discover(
timeout: float = 5.0, loop: AbstractEventLoop = None, **kwargs
) -> List[BLEDevice]:
"""Perform a Bluetooth LE Scan.
"""Perform a Bluetooth LE Scan using Windows.Devices.Bluetooth.Advertisement
Args:
timeout (float): Time to scan for.
Expand Down Expand Up @@ -111,3 +112,121 @@ def AdvertisementWatcher_Stopped(sender, e):
)

return found


async def discover_by_enumeration(
timeout: float = 5.0, loop: AbstractEventLoop = None, **kwargs
) -> List[BLEDevice]:
"""Perform a Bluetooth LE Scan using Windows.Devices.Enumeration
Args:
timeout (float): Time to scan for.
loop (Event Loop): The event loop to use.
Keyword Args:
string_output (bool): If set to false, ``discover`` returns .NET
device objects instead.
Returns:
List of strings or objects found.
"""
loop = loop if loop else asyncio.get_event_loop()

requested_properties = Array[str](
[
"System.Devices.Aep.DeviceAddress",
"System.Devices.Aep.IsConnected",
"System.Devices.Aep.Bluetooth.Le.IsConnectable",
"System.ItemNameDisplay",
"System.Devices.Aep.Manufacturer",
"System.Devices.Manufacturer",
"System.Devices.Aep.ModelName",
"System.Devices.ModelName",
"System.Devices.Aep.SignalStrength",
]
)
aqs_all_bluetooth_le_devices = '(System.Devices.Aep.ProtocolId:="' '{bb7bb05e-5972-42b5-94fc-76eaa7084d49}")'
watcher = Enumeration.DeviceInformation.CreateWatcher(
aqs_all_bluetooth_le_devices,
requested_properties,
Enumeration.DeviceInformationKind.AssociationEndpoint,
)

devices = {}

def _format_device_info(d):
try:
return "{0}: {1}".format(
d.Id.split("-")[-1], d.Name if d.Name else "Unknown"
)
except Exception:
return d.Id

def DeviceWatcher_Added(sender, dinfo):
if sender == watcher:

logger.debug("Added {0}.".format(_format_device_info(dinfo)))
if dinfo.Id not in devices:
devices[dinfo.Id] = dinfo

def DeviceWatcher_Updated(sender, dinfo_update):
if sender == watcher:
if dinfo_update.Id in devices:
logger.debug(
"Updated {0}.".format(_format_device_info(devices[dinfo_update.Id]))
)
devices[dinfo_update.Id].Update(dinfo_update)

def DeviceWatcher_Removed(sender, dinfo_update):
if sender == watcher:
logger.debug(
"Removed {0}.".format(_format_device_info(devices[dinfo_update.Id]))
)
if dinfo_update.Id in devices:
devices.pop(dinfo_update.Id)

def DeviceWatcher_EnumCompleted(sender, obj):
if sender == watcher:
logger.debug(
"{0} devices found. Enumeration completed. Watching for updates...".format(
len(devices)
)
)

def DeviceWatcher_Stopped(sender, obj):
if sender == watcher:
logger.debug(
"{0} devices found. Watcher status: {1}.".format(
len(devices), watcher.Status
)
)

watcher.Added += DeviceWatcher_Added
watcher.Updated += DeviceWatcher_Updated
watcher.Removed += DeviceWatcher_Removed
watcher.EnumerationCompleted += DeviceWatcher_EnumCompleted
watcher.Stopped += DeviceWatcher_Stopped

# Watcher works outside of the Python process.
watcher.Start()
await asyncio.sleep(timeout, loop=loop)
watcher.Stop()

try:
watcher.Added -= DeviceWatcher_Added
watcher.Updated -= DeviceWatcher_Updated
watcher.Removed -= DeviceWatcher_Removed
watcher.EnumerationCompleted -= DeviceWatcher_EnumCompleted
watcher.Stopped -= DeviceWatcher_Stopped
except Exception as e:
logger.debug("Could not remove event handlers: {0}...".format(e))

found = []
for d in devices.values():
properties = {p.Key: p.Value for p in d.Properties}
found.append(
BLEDevice(properties["System.Devices.Aep.DeviceAddress"], d.Name, d, uuids=[], manufacturer_data=b'')
)

return found

0 comments on commit 52155e1

Please sign in to comment.