forked from Mboukhal/RaspberryGate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdevice.py
57 lines (44 loc) · 1.72 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import ctypes
import evdev
import pyudev
import threading
import extractData as exd
from logs import log
def get_connected_usb_devices():
"""Get connected USB devices."""
devices = [evdev.InputDevice(device) for device in evdev.list_devices()]
usb_devices = [device for device in devices if 'usb' in device.phys.lower()]
return usb_devices
def wait_for_usb_events(connected=True):
"""Wait for USB ports updates."""
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='usb')
condition = threading.Condition()
def device_event(observer, device):
if (connected and device.action == 'add') or (not connected and device.action == 'remove'):
with condition:
condition.notify()
observer = pyudev.MonitorObserver(monitor, device_event)
observer.start()
with condition:
condition.wait()
observer.stop()
def gracefully_terminate_threads(thread_list):
"""Gracefully terminate threads."""
for thread in thread_list:
thread.stop_requested = True
thread.join()
def reset_threads(old_thread_list, new_devices_list):
"""Update thread list with new devices."""
gracefully_terminate_threads(old_thread_list)
new_thread_list = []
for device in new_devices_list:
thread = threading.Thread(target=exd.collectId, args=(device,))
thread.start()
new_thread_list.append(thread)
return new_thread_list
# Usage
connected_usb_devices = get_connected_usb_devices()
threads = reset_threads(threads, connected_usb_devices)
wait_for_usb_events(connected=False) # Wait for USB disconnections