Skip to content

async-ify io.USB #479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions pylabrobot/io/usb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional

Expand Down Expand Up @@ -84,6 +86,8 @@ def __init__(
self.read_endpoint: Optional[usb.core.Endpoint] = None
self.write_endpoint: Optional[usb.core.Endpoint] = None

self._executor: Optional[ThreadPoolExecutor] = None

# unique id in the logs
self._unique_id = f"[{hex(self._id_vendor)}:{hex(self._id_product)}][{self._serial_number or ''}][{self._device_address or ''}]"

Expand All @@ -102,7 +106,15 @@ async def write(self, data: bytes, timeout: Optional[float] = None):
timeout = self.write_timeout

# write command to endpoint
self.dev.write(self.write_endpoint, data, timeout=timeout)
loop = asyncio.get_running_loop()
write_endpoint = self.write_endpoint
dev = self.dev
if self._executor is None or dev is None or write_endpoint is None:
raise RuntimeError("Call setup() first.")
loop.run_in_executor(
self._executor,
lambda: dev.write(write_endpoint, data, timeout=timeout),
)
logger.log(LOG_LEVEL_IO, "%s write: %s", self._unique_id, data)
capturer.record(
USBCommand(device_id=self._unique_id, action="write", data=data.decode("unicode_escape"))
Expand Down Expand Up @@ -144,31 +156,39 @@ async def read(self, timeout: Optional[int] = None) -> bytes:
if timeout is None:
timeout = self.read_timeout

# Attempt to read packets until timeout, or when we identify the right id.
timeout_time = time.time() + timeout

while time.time() < timeout_time:
# read response from endpoint, and keep reading until the packet is smaller than the max
# packet size: if the packet is that size, it means that there may be more data to read.
resp = bytearray()
last_packet: Optional[bytearray] = None
while True: # read while we have data, and while the last packet is the max size.
last_packet = self._read_packet()
if last_packet is not None:
resp += last_packet
if last_packet is None or len(last_packet) != self.read_endpoint.wMaxPacketSize:
break

if len(resp) == 0:
continue

logger.log(LOG_LEVEL_IO, "%s read: %s", self._unique_id, resp)
capturer.record(
USBCommand(device_id=self._unique_id, action="read", data=resp.decode("unicode_escape"))
)
return resp
def read_or_timeout():
# Attempt to read packets until timeout, or when we identify the right id.
timeout_time = time.time() + timeout

while time.time() < timeout_time:
# read response from endpoint, and keep reading until the packet is smaller than the max
# packet size: if the packet is that size, it means that there may be more data to read.
resp = bytearray()
last_packet: Optional[bytearray] = None
while True: # read while we have data, and while the last packet is the max size.
last_packet = self._read_packet()
if last_packet is not None:
resp += last_packet
if self.read_endpoint is None:
raise RuntimeError("Read endpoint is None. Call setup() first.")
if last_packet is None or len(last_packet) != self.read_endpoint.wMaxPacketSize:
break

if len(resp) == 0:
continue

raise TimeoutError("Timeout while reading.")
logger.log(LOG_LEVEL_IO, "%s read: %s", self._unique_id, resp)
capturer.record(
USBCommand(device_id=self._unique_id, action="read", data=resp.decode("unicode_escape"))
)
return resp

raise TimeoutError("Timeout while reading.")

loop = asyncio.get_running_loop()
if self._executor is None or self.dev is None:
raise RuntimeError("Call setup() first.")
return await loop.run_in_executor(self._executor, read_or_timeout)

def get_available_devices(self) -> List["usb.core.Device"]:
"""Get a list of available devices that match the specified vendor and product IDs, and serial
Expand Down Expand Up @@ -265,6 +285,8 @@ async def setup(self):
while self._read_packet() is not None:
pass

self._executor = ThreadPoolExecutor(max_workers=1)

async def stop(self):
"""Close the USB connection to the machine."""

Expand All @@ -274,6 +296,10 @@ async def stop(self):
usb.util.dispose_resources(self.dev)
self.dev = None

if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None

def serialize(self) -> dict:
"""Serialize the backend to a dictionary."""

Expand Down