forked from commaai/openpilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrealtime.py
94 lines (73 loc) · 2.54 KB
/
realtime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""Utilities for reading real time clocks and keeping soft real time constraints."""
import gc
import os
import time
from collections import deque
from setproctitle import getproctitle
from openpilot.system.hardware import PC
# time step for each process
DT_CTRL = 0.01 # controlsd
DT_MDL = 0.05 # model
DT_HW = 0.5 # hardwared and manager
DT_DMON = 0.05 # driver monitoring
class Priority:
# CORE 2
# - modeld = 55
# - camerad = 54
CTRL_LOW = 51 # plannerd & radard
# CORE 3
# - pandad = 55
CTRL_HIGH = 53
def set_realtime_priority(level: int) -> None:
if not PC:
os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(level))
def set_core_affinity(cores: list[int]) -> None:
if not PC:
os.sched_setaffinity(0, cores)
def config_realtime_process(cores: int | list[int], priority: int) -> None:
gc.disable()
set_realtime_priority(priority)
c = cores if isinstance(cores, list) else [cores, ]
set_core_affinity(c)
class Ratekeeper:
def __init__(self, rate: float, print_delay_threshold: float | None = 0.0) -> None:
"""Rate in Hz for ratekeeping. print_delay_threshold must be nonnegative."""
self._interval = 1. / rate
self._next_frame_time = time.monotonic() + self._interval
self._print_delay_threshold = print_delay_threshold
self._frame = 0
self._remaining = 0.0
self._process_name = getproctitle()
self._dts = deque([self._interval], maxlen=100)
self._last_monitor_time = time.monotonic()
@property
def frame(self) -> int:
return self._frame
@property
def remaining(self) -> float:
return self._remaining
@property
def lagging(self) -> bool:
avg_dt = sum(self._dts) / len(self._dts)
expected_dt = self._interval * (1 / 0.9)
return avg_dt > expected_dt
# Maintain loop rate by calling this at the end of each loop
def keep_time(self) -> bool:
lagged = self.monitor_time()
if self._remaining > 0:
time.sleep(self._remaining)
return lagged
# Monitors the cumulative lag, but does not enforce a rate
def monitor_time(self) -> bool:
prev = self._last_monitor_time
self._last_monitor_time = time.monotonic()
self._dts.append(self._last_monitor_time - prev)
lagged = False
remaining = self._next_frame_time - time.monotonic()
self._next_frame_time += self._interval
if self._print_delay_threshold is not None and remaining < -self._print_delay_threshold:
print(f"{self._process_name} lagging by {-remaining * 1000:.2f} ms")
lagged = True
self._frame += 1
self._remaining = remaining
return lagged