forked from hbldh/bleak
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async_callback_with_queue.py
69 lines (54 loc) · 2 KB
/
async_callback_with_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Async callbacks with a queue and external consumer
--------------------------------------------------
An example showing how async notification callbacks can be used to
send data received through notifications to some external consumer of
that data.
Created on 2021-02-25 by hbldh <[email protected]>
"""
import sys
import time
import platform
import asyncio
import logging
from bleak import BleakClient
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
log.addHandler(h)
ADDRESS = (
"24:71:89:cc:09:05"
if platform.system() != "Darwin"
else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
)
NOTIFICATION_UUID = f"0000{0xFFE1:x}-0000-1000-8000-00805f9b34fb"
async def run_ble_client(address: str, queue: asyncio.Queue):
async def callback_handler(sender, data):
await queue.put((time.time(), data))
async with BleakClient(address) as client:
log.info(f"Connected: {client.is_connected}")
await client.start_notify(NOTIFICATION_UUID, callback_handler)
await asyncio.sleep(10.0)
await client.stop_notify(NOTIFICATION_UUID)
# Send an "exit command to the consumer"
await queue.put((time.time(), None))
async def run_queue_consumer(queue: asyncio.Queue):
while True:
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
epoch, data = await queue.get()
if data is None:
log.info(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
else:
log.info(f"Received callback data via async queue at {epoch}: {data}")
async def main(address: str):
queue = asyncio.Queue()
client_task = run_ble_client(address, queue)
consumer_task = run_queue_consumer(queue)
await asyncio.gather(client_task, consumer_task)
log.info("Main method done.")
if __name__ == "__main__":
asyncio.run(main(ADDRESS))