forked from ideamark/desk-emoji
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnect.py
170 lines (143 loc) · 5.86 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import inquirer
import platform
import serial
import serial.tools.list_ports
import asyncio
import threading
import time
from bleak import BleakClient, BleakScanner
from common import *
class SerialClient(object):
def __init__(self):
self.port = ''
self.ser = None
self.connected = False
def __unique_ports(self, ports):
port_list = []
for port in ports:
if port not in port_list:
port_list.append(port)
return port_list
def list_ports(self):
ports = serial.tools.list_ports.comports()
matching_ports = [port.device for port in ports if platform.system() == 'Windows' or "serial" in port.device.lower()]
non_matching_ports = [port.device for port in ports if port.device not in matching_ports]
return self.__unique_ports(matching_ports + non_matching_ports)
def select_port(self):
ports = self.list_ports()
if len(ports) == 1:
return ports[0]
questions = [
inquirer.List('port',
message="Select a port",
choices=ports,
carousel=True)
]
answers = inquirer.prompt(questions)
self.port = answers['port']
return self.port
def connect(self, port="", baud_rate=115200):
try:
port = port if port else self.port
self.ser = serial.Serial(port, baud_rate, timeout=1)
logger.info(f"Connected to {port} at {baud_rate} baud rate.")
self.connected = True
return True
except Exception as e:
error(e, f"Connect to {port} Failed")
self.connected = False
return False
def disconnect(self):
if self.ser and self.ser.is_open:
try:
self.ser.close()
logger.info(f"Disconnected from {self.port}.")
except Exception as e:
error(e, "Failed to disconnect the serial port.")
else:
logger.info("Serial port is already closed or was not connected.")
self.connected = False
self.ser = None
def read(self, port):
while True:
if port.in_waiting:
data = port.read(port.in_waiting)
result = data.decode('utf-8', errors='ignore')
logger.debug("\nReceived:", result.strip('\n').strip())
def send(self, msg):
try:
encode_msg = msg.encode('utf-8')
self.ser.write(encode_msg)
logger.debug(f"Sent: {msg}")
start_time = time.time()
received_msg = ""
while True:
if self.ser.in_waiting > 0:
received_msg += self.ser.read(self.ser.in_waiting).decode('utf-8')
if time.time() - start_time > 10: return
if received_msg and (msg in received_msg):
logger.debug(f"Received: {received_msg}")
return received_msg
time.sleep(0.1)
except Exception as e:
error(e, "Serial port send message Failed!")
class BaseBluetoothClient(object):
def __init__(self, device_name="", service_uuid="", characteristic_uuid=""):
self.device_name = device_name
self.service_uuid = service_uuid
self.characteristic_uuid = characteristic_uuid
self.client = None
self.connected = False
async def list_devices(self):
logger.info("Scanning devices...")
device_list = []
devices = await BleakScanner.discover()
for device in devices:
if device.name == self.device_name:
device_list.append(device.address)
return device_list
async def connect(self, device_address):
self.client = BleakClient(device_address)
try:
await self.client.connect()
logger.info(f"Connected to {self.device_name} at {device_address}")
self.connected = True
return True
except Exception as e:
logger.error(f"Failed to connect to {self.device_name}: {e}")
self.connected = False
return False
async def disconnect(self):
if self.client and self.client.is_connected:
await self.client.disconnect()
self.connected = False
logger.info(f"Disconnected from {self.device_name}")
async def send(self, data):
if self.client and self.client.is_connected:
try:
await self.client.write_gatt_char(self.characteristic_uuid, data.encode('utf-8'))
logger.info(f"Sent to {self.device_name}: {data}")
except Exception as e:
logger.error(f"Failed to send data: {e}")
else:
logger.info("Not connected to any device.")
class BluetoothClient(BaseBluetoothClient):
def __init__(self, device_name="Desk-Emoji",
service_uuid="4db9a22d-6db4-d9fe-4d93-38e350abdc3c",
characteristic_uuid="ff1cdaef-0105-e4fb-7be2-018500c2e927"):
super().__init__(device_name, service_uuid, characteristic_uuid)
self.loop_thread = threading.Thread(target=self._run_event_loop)
self.loop_thread.daemon = True
self.loop = asyncio.new_event_loop()
self.loop_thread.start()
def _run_event_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def list_devices(self):
return asyncio.run_coroutine_threadsafe(super().list_devices(), self.loop).result()
def connect(self, device_address):
return asyncio.run_coroutine_threadsafe(super().connect(device_address), self.loop).result()
def disconnect(self):
asyncio.run_coroutine_threadsafe(super().disconnect(), self.loop).result()
def send(self, data):
asyncio.run_coroutine_threadsafe(super().send(data), self.loop).result()