forked from luxonis/depthai-experiments
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
66 lines (55 loc) · 2.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import depthai as dai
import threading
import contextlib
import cv2
import time
# This can be customized to pass multiple parameters
def getPipeline(stereo):
# Start defining a pipeline
pipeline = dai.Pipeline()
# Define a source - color camera
cam_rgb = pipeline.create(dai.node.ColorCamera)
# For the demo, just set a larger RGB preview size for OAK-D
if stereo:
cam_rgb.setPreviewSize(600, 300)
else:
cam_rgb.setPreviewSize(300, 300)
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_rgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam_rgb.setInterleaved(False)
# Create output
xout_rgb = pipeline.create(dai.node.XLinkOut)
xout_rgb.setStreamName("rgb")
cam_rgb.preview.link(xout_rgb.input)
return pipeline
def worker(dev_info, stack, dic):
openvino_version = dai.OpenVINO.Version.VERSION_2021_4
device: dai.Device = stack.enter_context(dai.Device(openvino_version, dev_info, False))
# Note: currently on POE, DeviceInfo.getMxId() and Device.getMxId() are different!
print("=== Connected to " + dev_info.getMxId())
mxid = device.getMxId()
cameras = device.getConnectedCameras()
usb_speed = device.getUsbSpeed()
print(" >>> MXID:", mxid)
print(" >>> Cameras:", *[c.name for c in cameras])
print(" >>> USB speed:", usb_speed.name)
device.startPipeline(getPipeline(len(cameras)==3))
dic["rgb-" + mxid] = device.getOutputQueue(name="rgb")
device_infos = dai.Device.getAllAvailableDevices()
print(f'Found {len(device_infos)} devices')
with contextlib.ExitStack() as stack:
queues = {}
threads = []
for dev in device_infos:
thread = threading.Thread(target=worker, args=(dev, stack, queues))
thread.start()
threads.append(thread)
for t in threads:
t.join() # Wait for all threads to finish
while True:
for name, queue in queues.items():
if queue.has():
cv2.imshow(name, queue.get().getCvFrame())
if cv2.waitKey(1) == ord('q'):
break
print('Devices closed')