forked from hamuchiwa/AutoRCCar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrc_driver.py
188 lines (145 loc) · 6.61 KB
/
rc_driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
__author__ = 'zhengwang'
import sys
import threading
import socketserver
import numpy as np
from model import NeuralNetwork
from rc_driver_helper import *
# distance data measured by ultrasonic sensor
sensor_data = None
class SensorDataHandler(socketserver.BaseRequestHandler):
data = " "
def handle(self):
global sensor_data
while self.data:
self.data = self.request.recv(1024)
sensor_data = round(float(self.data), 1)
# print "{} sent:".format(self.client_address[0])
print(sensor_data)
class VideoStreamHandler(socketserver.StreamRequestHandler):
# h1: stop sign, measured manually
# h2: traffic light, measured manually
h1 = 5.5 # cm
h2 = 5.5
# load trained neural network
nn = NeuralNetwork()
nn.load_model("saved_model/nn_model.xml")
obj_detection = ObjectDetection()
rc_car = RCControl("/dev/tty.usbmodem1421")
# cascade classifiers
stop_cascade = cv2.CascadeClassifier("cascade_xml/stop_sign.xml")
light_cascade = cv2.CascadeClassifier("cascade_xml/traffic_light.xml")
d_to_camera = DistanceToCamera()
# hard coded thresholds for stopping, sensor 30cm, other two 25cm
d_sensor_thresh = 30
d_stop_light_thresh = 25
d_stop_sign = d_stop_light_thresh
d_light = d_stop_light_thresh
stop_start = 0 # start time when stop at the stop sign
stop_finish = 0
stop_time = 0
drive_time_after_stop = 0
def handle(self):
global sensor_data
stream_bytes = b' '
stop_flag = False
stop_sign_active = True
try:
# stream video frames one by one
while True:
stream_bytes += self.rfile.read(1024)
first = stream_bytes.find(b'\xff\xd8')
last = stream_bytes.find(b'\xff\xd9')
if first != -1 and last != -1:
jpg = stream_bytes[first:last + 2]
stream_bytes = stream_bytes[last + 2:]
gray = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)
image = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# lower half of the image
height, width = gray.shape
roi = gray[int(height/2):height, :]
# object detection
v_param1 = self.obj_detection.detect(self.stop_cascade, gray, image)
v_param2 = self.obj_detection.detect(self.light_cascade, gray, image)
# distance measurement
if v_param1 > 0 or v_param2 > 0:
d1 = self.d_to_camera.calculate(v_param1, self.h1, 300, image)
d2 = self.d_to_camera.calculate(v_param2, self.h2, 100, image)
self.d_stop_sign = d1
self.d_light = d2
cv2.imshow('image', image)
# cv2.imshow('mlp_image', roi)
# reshape image
image_array = roi.reshape(1, int(height/2) * width).astype(np.float32)
# neural network makes prediction
prediction = self.nn.predict(image_array)
# stop conditions
if sensor_data and int(sensor_data) < d_sensor_thresh: #
print("Stop, obstacle in front")
self.rc_car.stop()
sensor_data = None
elif 0 < self.d_stop_sign < d_stop_light_thresh and stop_sign_active:
print("Stop sign ahead")
self.rc_car.stop()
# stop for 5 seconds
if stop_flag is False:
self.stop_start = cv2.getTickCount()
stop_flag = True
self.stop_finish = cv2.getTickCount()
self.stop_time = (self.stop_finish - self.stop_start) / cv2.getTickFrequency()
print("Stop time: %.2fs" % self.stop_time)
# 5 seconds later, continue driving
if self.stop_time > 5:
print("Waited for 5 seconds")
stop_flag = False
stop_sign_active = False
elif 0 < self.d_light < d_stop_light_thresh:
# print("Traffic light ahead")
if self.obj_detection.red_light:
print("Red light")
self.rc_car.stop()
elif self.obj_detection.green_light:
print("Green light")
pass
elif self.obj_detection.yellow_light:
print("Yellow light flashing")
pass
self.d_light = d_stop_light_thresh
self.obj_detection.red_light = False
self.obj_detection.green_light = False
self.obj_detection.yellow_light = False
else:
self.rc_car.steer(prediction)
self.stop_start = cv2.getTickCount()
self.d_stop_sign = d_stop_light_thresh
if stop_sign_active is False:
self.drive_time_after_stop = (self.stop_start - self.stop_finish) / cv2.getTickFrequency()
if self.drive_time_after_stop > 5:
stop_sign_active = True
if cv2.waitKey(1) & 0xFF == ord('q'):
print("car stopped")
self.rc_car.stop()
break
finally:
cv2.destroyAllWindows()
sys.exit()
class Server(object):
def __init__(self, host, port1, port2):
self.host = host
self.port1 = port1
self.port2 = port2
def video_stream(self, host, port):
s = socketserver.TCPServer((host, port), VideoStreamHandler)
s.serve_forever()
def sensor_stream(self, host, port):
s = socketserver.TCPServer((host, port), SensorDataHandler)
s.serve_forever()
def start(self):
sensor_thread = threading.Thread(target=self.sensor_stream, args=(self.host, self.port2))
sensor_thread.daemon = True
sensor_thread.start()
self.video_stream(self.host, self.port1)
if __name__ == '__main__':
h, p1, p2 = "192.168.1.100", 8000, 8002
ts = Server(h, p1, p2)
ts.start()