diff --git a/python/uds.py b/python/uds.py index 41f4e98d62..ed0404a6f0 100644 --- a/python/uds.py +++ b/python/uds.py @@ -103,7 +103,8 @@ def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue): filter_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF) else: raise ValueError("invalid tx_addr: {}".format(tx_addr)) - rx_frame = {"size": 0, "data": "", "sent": True} + rx_frame = {"size": 0, "data": "", "idx": 0, "done": True} + tx_frame = {"size": 0, "data": "", "idx": 0, "done": True} # clear tx buffer panda.can_clear(bus) @@ -121,34 +122,64 @@ def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue): # single rx_frame rx_frame["size"] = rx_data[0] & 0xFF rx_frame["data"] = rx_data[1:1+rx_frame["size"]] - rx_frame["sent"] = True + rx_frame["idx"] = 0 + rx_frame["done"] = True rx_queue.put(rx_frame["data"]) elif rx_data[0] >> 4 == 0x1: # first rx_frame rx_frame["size"] = ((rx_data[0] & 0x0F) << 8) + rx_data[1] rx_frame["data"] = rx_data[2:] - rx_frame["sent"] = False + rx_frame["idx"] = 0 + rx_frame["done"] = False # send flow control message (send all bytes) - flow_ctl = "\x30\x00\x00".ljust(8, "\x00") - if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(flow_ctl))) - panda.can_send(tx_addr, flow_ctl, bus) + msg = "\x30\x00\x00".ljust(8, "\x00") + if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg))) + panda.can_send(tx_addr, msg, bus) elif rx_data[0] >> 4 == 0x2: - # consecutive rx_frame - assert rx_frame["sent"] == False, "no active frame" + # consecutive rx frame + assert rx_frame["done"] == False, "rx: no active frame" + # validate frame index + rx_frame["idx"] += 1 + assert rx_frame["idx"] & 0xF == rx_data[0] & 0xF, "rx: invalid consecutive frame index" rx_size = rx_frame["size"] - len(rx_frame["data"]) rx_frame["data"] += rx_data[1:1+min(rx_size, 7)] - if rx_size <= 7: - rx_frame["sent"] = True + if rx_frame["size"] == len(rx_frame["data"]): + rx_frame["done"] = True rx_queue.put(rx_frame["data"]) + elif rx_data[0] >> 4 == 0x3: + # flow control + assert tx_frame["done"] == False, "tx: no active frame" + # TODO: support non-zero block size and separate time + assert rx_data[0] == 0x30 and rx_data[2] == 0x00, "tx: flow-control requires: continue, no delay" + # first frame = 6 bytes, each consecutive frame = 7 bytes + start = 6 + tx_frame["idx"] * 7 + count = rx_data[1] + end = start + count * 7 if count > 0 else tx_frame["size"] + for i in range(start, end, 7): + tx_frame["idx"] += 1 + # consecutive tx frames + msg = (chr(0x20 | (tx_frame["idx"] & 0xF)) + tx_frame["data"][i:i+7]).ljust(8, "\x00") + if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg))) + panda.can_send(tx_addr, msg, bus) + tx_frame["done"] = True if not tx_queue.empty(): req = tx_queue.get(block=False) - # reset rx rx_frame - rx_frame = {"size": 0, "data": "", "sent": True} - # TODO: support sending more than 7 bytes - req = (chr(len(req)) + req).ljust(8, "\x00") - if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(req))) - panda.can_send(tx_addr, req, bus) + # reset rx and tx frames + rx_frame = {"size": 0, "data": "", "idx": 0, "done": True} + tx_frame = {"size": len(req), "data": req, "idx": 0, "done": False} + if tx_frame["size"] < 8: + # single frame + tx_frame["done"] = True + msg = (chr(tx_frame["size"]) + tx_frame["data"]).ljust(8, "\x00") + if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg))) + panda.can_send(tx_addr, msg, bus) + else: + # first rx_frame + tx_frame["done"] = False + msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, "\x00") + if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg))) + panda.can_send(tx_addr, msg, bus) else: time.sleep(0.01) finally: @@ -667,10 +698,14 @@ def request_transfer_exit(address): can_reader_t.start() # examples + print("tester present ...") tester_present(tx_addr) + print("extended diagnostic session ...") + diagnostic_session_control(tx_addr, SESSION_TYPE.EXTENDED_DIAGNOSTIC) + print("application software id ...") app_id = read_data_by_identifier(tx_addr, DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION) print(app_id) - # for i in range(0xF100, 0xFFFF): + # for i in range(0xF100, 0xF2FF): # try: # dat = read_data_by_identifier(tx_addr, i) # desc = ""