forked from commaai/openpilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbc.py
executable file
·192 lines (153 loc) · 5.75 KB
/
dbc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import re
import bitstring
from binascii import hexlify
from collections import namedtuple
def int_or_float(s):
# return number, trying to maintain int format
try:
return int(s)
except ValueError:
return float(s)
DBCSignal = namedtuple(
"DBCSignal", ["name", "start_bit", "size", "is_little_endian", "is_signed",
"factor", "offset", "tmin", "tmax", "units"])
class dbc(object):
def __init__(self, fn):
with open(fn) as f:
self.txt = f.read().split("\n")
self._warned_addresses = set()
# regexps from https://github.com/ebroecker/canmatrix/blob/master/canmatrix/importdbc.py
bo_regexp = re.compile("^BO\_ (\w+) (\w+) *: (\w+) (\w+)")
sg_regexp = re.compile("^SG\_ (\w+) : (\d+)\|(\d+)@(\d+)([\+|\-]) \(([0-9.+\-eE]+),([0-9.+\-eE]+)\) \[([0-9.+\-eE]+)\|([0-9.+\-eE]+)\] \"(.*)\" (.*)")
sgm_regexp = re.compile("^SG\_ (\w+) (\w+) *: (\d+)\|(\d+)@(\d+)([\+|\-]) \(([0-9.+\-eE]+),([0-9.+\-eE]+)\) \[([0-9.+\-eE]+)\|([0-9.+\-eE]+)\] \"(.*)\" (.*)")
# A dictionary which maps message ids to tuples ((name, size), signals).
# name is the ASCII name of the message.
# size is the size of the message in bytes.
# signals is a list signals contained in the message.
# signals is a list of DBCSignal in order of increasing start_bit.
self.msgs = {}
self.bits = []
for i in range(0, 64, 8):
for j in range(7, -1, -1):
self.bits.append(i+j)
for l in self.txt:
l = l.strip()
if l.startswith("BO_ "):
# new group
dat = bo_regexp.match(l)
if dat is None:
print "bad BO", l
name = dat.group(2)
size = int(dat.group(3))
ids = int(dat.group(1), 0) # could be hex
self.msgs[ids] = ((name, size), [])
if l.startswith("SG_ "):
# new signal
dat = sg_regexp.match(l)
go = 0
if dat is None:
dat = sgm_regexp.match(l)
go = 1
if dat is None:
print "bad SG", l
sgname = dat.group(1)
start_bit = int(dat.group(go+2))
signal_size = int(dat.group(go+3))
is_little_endian = int(dat.group(go+4))==1
is_signed = dat.group(go+5)=='-'
factor = int_or_float(dat.group(go+6))
offset = int_or_float(dat.group(go+7))
tmin = int_or_float(dat.group(go+8))
tmax = int_or_float(dat.group(go+9))
units = dat.group(go+10)
self.msgs[ids][1].append(
DBCSignal(sgname, start_bit, signal_size, is_little_endian,
is_signed, factor, offset, tmin, tmax, units))
for msg in self.msgs.viewvalues():
msg[1].sort(key=lambda x: x.start_bit)
def encode(self, msg_id, dd):
"""Encode a CAN message using the dbc.
Inputs:
msg_id: The message ID.
dd: A dictionary mapping signal name to signal data.
"""
# TODO: Stop using bitstring, which is super slow.
msg_def = self.msgs[msg_id]
size = msg_def[0][1]
bsf = bitstring.Bits(hex="00"*size)
for s in msg_def[1]:
ival = dd.get(s.name)
if ival is not None:
ival = (ival / s.factor) - s.offset
ival = int(round(ival))
# should pack this
if s.is_little_endian:
ss = s.start_bit
else:
ss = self.bits.index(s.start_bit)
if s.is_signed:
tbs = bitstring.Bits(int=ival, length=s.size)
else:
tbs = bitstring.Bits(uint=ival, length=s.size)
lpad = bitstring.Bits(bin="0b"+"0"*ss)
rpad = bitstring.Bits(bin="0b"+"0"*(8*size-(ss+s.size)))
tbs = lpad+tbs+rpad
bsf |= tbs
return bsf.tobytes()
def decode(self, x, arr=None, debug=False):
"""Decode a CAN message using the dbc.
Inputs:
x: A collection with elements (address, time, data), where address is
the CAN address, time is the bus time, and data is the CAN data as a
hex string.
arr: Optional list of signals which should be decoded and returned.
debug: True to print debugging statements.
Returns:
A tuple (name, data), where name is the name of the CAN message and data
is the decoded result. If arr is None, data is a dict of properties.
Otherwise data is a list of the same length as arr.
Returns (None, None) if the message could not be decoded.
"""
def swap_order(d, wsz=4, gsz=2 ):
return "".join(["".join([m[i:i+gsz] for i in range(wsz-gsz,-gsz,-gsz)]) for m in [d[i:i+wsz] for i in range(0,len(d),wsz)]])
if arr is None:
out = {}
else:
out = [None]*len(arr)
msg = self.msgs.get(x[0])
if msg is None:
if x[0] not in self._warned_addresses:
print("WARNING: Unknown message address {}".format(x[0]))
self._warned_addresses.add(x[0])
return None, None
name = msg[0][0]
if debug:
print name
blen = 8*len(x[2])
for s in msg[1]:
if arr is not None and s[0] not in arr:
continue
# big or little endian?
# see http://vi-firmware.openxcplatform.com/en/master/config/bit-numbering.html
if s[3] is False:
ss = self.bits.index(s[1])
x2_int = int(hexlify(x[2]), 16)
data_bit_pos = (blen - (ss + s[2]))
else:
x2_int = int(swap_order(hexlify(x[2]), 16, 2), 16)
ss = s[1]
data_bit_pos = ss
if data_bit_pos < 0:
continue
ival = (x2_int >> data_bit_pos) & ((1 << (s[2])) - 1)
if s[4] and (ival & (1<<(s[2]-1))): # signed
ival -= (1<<s[2])
# control the offset
ival = (ival * s[5]) + s[6]
#if debug:
# print "%40s %2d %2d %7.2f %s" % (s[0], s[1], s[2], ival, s[-1])
if arr is None:
out[s[0]] = ival
else:
out[arr.index(s[0])] = ival
return name, out