-
Notifications
You must be signed in to change notification settings - Fork 3
/
vehicle.py
141 lines (115 loc) · 4.08 KB
/
vehicle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 25 10:44:24 2017
@author: wroscoe
"""
import time
from threading import Thread
from .memory import Memory
from .log import get_logger
logger = get_logger(__name__)
class Vehicle:
def __init__(self, mem=None):
if not mem:
mem = Memory()
self.mem = mem
self.parts = []
self.on = True
self.threads = []
def add(self, part, inputs=[], outputs=[],
threaded=False, run_condition=None):
"""
Method to add a part to the vehicle drive loop.
Parameters
----------
inputs : list
Channel names to get from memory.
outputs : list
Channel names to save to memory.
threaded : boolean
If a part should be run in a separate thread.
run_condition: boolean
If a part should be run at all.
"""
p = part
logger.info('Adding part {}.'.format(p.__class__.__name__))
entry = dict()
entry['part'] = p
entry['inputs'] = inputs
entry['outputs'] = outputs
entry['run_condition'] = run_condition
if threaded:
t = Thread(target=part.update, args=())
t.daemon = True
entry['thread'] = t
self.parts.append(entry)
def start(self, rate_hz=10, max_loop_count=None):
"""
Start vehicle's main drive loop.
This is the main thread of the vehicle. It starts all the new
threads for the threaded parts then starts an infinit loop
that runs each part and updates the memory.
Parameters
----------
rate_hz : int
The max frequency that the drive loop should run. The actual
frequency may be less than this if there are many blocking parts.
max_loop_count : int
Maxiumum number of loops the drive loop should execute. This is
used for testing the all the parts of the vehicle work.
"""
try:
self.on = True
for entry in self.parts:
if entry.get('thread'):
# start the update thread
entry.get('thread').start()
# wait until the parts warm up.
logger.info('Starting vehicle...')
time.sleep(1)
loop_count = 0
while self.on:
start_time = time.time()
loop_count += 1
self.update_parts()
# stop drive loop if loop_count exceeds max_loopcount
if max_loop_count and loop_count > max_loop_count:
self.on = False
sleep_time = 1.0 / rate_hz - (time.time() - start_time)
if sleep_time > 0.0:
time.sleep(sleep_time)
except KeyboardInterrupt:
pass
finally:
self.stop()
def update_parts(self):
"""
loop over all parts
"""
for entry in self.parts:
# don't run if there is a run condition that is False
run = True
if entry.get('run_condition'):
run_condition = entry.get('run_condition')
run = self.mem.get([run_condition])[0]
# print('run_condition', entry['part'], entry.get('run_condition'), run)
if run:
p = entry['part']
# get inputs from memory
inputs = self.mem.get(entry['inputs'])
# run the part
if entry.get('thread'):
outputs = p.run_threaded(*inputs)
else:
outputs = p.run(*inputs)
# save the output to memory
if outputs is not None:
self.mem.put(entry['outputs'], outputs)
def stop(self):
logger.info('Shutting down vehicle and its parts...')
for entry in self.parts:
try:
entry['part'].shutdown()
except Exception as e:
logger.debug(e)