Skip to content

Commit

Permalink
libusb-c: interrupt support, enable more tests
Browse files Browse the repository at this point in the history
Signed-off-by: John McMaster <[email protected]>
  • Loading branch information
JohnDMcMaster committed Apr 24, 2022
1 parent a65bd7d commit 4c4edfb
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 37 deletions.
7 changes: 3 additions & 4 deletions test/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def run_printers_json(fn, argsj):
usbrply.printers.run("libusb-py",
usbrply.parsers.pcap2json(fn, argsj=argsj),
argsj=argsj)
usbrply.printers.run("libusb-c",
usbrply.parsers.pcap2json(fn, argsj=argsj),
argsj=argsj)
return j


Expand Down Expand Up @@ -143,17 +146,13 @@ def test_cprinter_lin_wrapped(self):
usbrply.printers.run("libusb-c", filtered, argsj=self.argsj)

def test_cprinter_win(self):
# FIXME: need bulk support
return
usbrply.printers.run(
"libusb-c",
usbrply.parsers.pcap2json("test/data/win_misc.pcapng",
argsj=self.argsj),
argsj=self.argsj)

def test_cprinter_win_wrapped(self):
# FIXME: need bulk support
return
self.argsj["wrapper"] = True
parsed = usbrply.parsers.pcap2json("test/data/win_misc.pcapng",
argsj=self.argsj)
Expand Down
75 changes: 49 additions & 26 deletions usbrply/cprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def print_macros(self):
#define VALIDATE_READ(expected, actual, actual_size, msg) \
validate_read(expected, sizeof(expected), actual, actual_size, msg)
#define RET_ON_ERR(_x) do { usbret = _x; if (usbret < 0) { return usbret; } } while(0)
#define RET_ON_XFER_ERR(_x, _nexpect) do { \
usbret = _x; \
if (usbret < 0) { \
return usbret; \
} \
if (bytes_transferred != (_nexpect)) { \
return -1; \
} \
} while(0)
''',
file=printer.print_file)

Expand Down Expand Up @@ -109,11 +118,13 @@ def print_wrapper_header(self):
uint8_t buff[16384];
(void)buff;
int timeout = 0;
unsigned bytes_transferred = 0;
''',
file=printer.print_file)

def header(self):
comment("Generated by usbrply")
comment("***WARNING: libusb-c is alpha level quality***")
comment("cmd: %s" % (' '.join(sys.argv), ))
indented("")

Expand Down Expand Up @@ -177,11 +188,13 @@ def footer(self):
}
int main(int argc, char **argv) {
struct libusb_context *usb_ctx = NULL;
libusb_device_handle *devh= NULL;
uint16_t vid = ''' + "0x%04X" % self.vid + ''';
uint16_t pid = ''' + "0x%04X" % self.pid + ''';
struct libusb_context *usb_ctx = NULL;
libusb_device_handle *devh= NULL;
int err;
int bytes_transferred;
(void)bytes_transferred;
int ret = libusb_init(&usb_ctx);
if (ret < 0 || usb_ctx == NULL) {
Expand Down Expand Up @@ -262,39 +275,49 @@ def parse_data(self, d):
packet_numbering,
))
elif d["type"] == "controlWrite":
data = bytes2AnonArray(binascii.unhexlify(
data_str = bytes2AnonArray(binascii.unhexlify(
d["data"])) if d["data"] else "NULL"
indented(
"RET_ON_ERR(libusb_control_transfer(devh, 0x%02X, 0x%02X, 0x%04X, 0x%04X, %s, 0x%04X, timeout));"
% (d["bRequestType"], d["bRequest"], d["wValue"], d["wIndex"],
data, len(d["data"])))
data_str, len(d["data"])))

elif d["type"] == "bulkRead":
assert 0, "fixme"
data_str = "\"\""
indented("buff = bulkRead(0x%02X, 0x%04X)" % (d["endp"], d["len"]))
indented("validate_read(%s, %u, buff, err, \"%s\")" %
(bytes2AnonArray(binascii.unhexlify(
d["data"])), len(d["data"]), packet_numbering))
indented(
"RET_ON_ERR(libusb_bulk_transfer(devh, 0x%02X, buff, 0x%04X, &bytes_transferred, timeout));"
% (d["endp"], len(d["data"])))
if len(d["data"]):
indented(
"RET_ON_ERR(validate_read(%s, %u, buff, bytes_transferred, \"%s\"));"
% (
bytes2AnonArray(binascii.unhexlify(d["data"])),
len(d["data"]),
packet_numbering,
))
elif d["type"] == "bulkWrite":
assert 0, "fixme"
# Note that its the submit from earlier, not the ack that we care about
data_str = bytes2AnonArray(binascii.unhexlify(d["data"]))
# def bulkWrite(self, endpoint, data, timeout=0):
indented("bulkWrite(0x%02X, %s)" % (d["endp"], data_str))

data_str = bytes2AnonArray(binascii.unhexlify(
d["data"])) if d["data"] else "NULL"
indented(
"RET_ON_XFER_ERR(libusb_bulk_transfer(devh, 0x%02X, %s, 0x%04X, &bytes_transferred, timeout), 0x%04X);"
% (d["endp"], data_str, len(d["data"]), len(d["data"])))
elif d["type"] == "interruptIn":
assert 0, "fixme"
data_str = "\"\""
indented("buff = interruptRead(0x%02X, 0x%04X)" %
(d["endp"], d["len"]))
indented("validate_read(%s, buff, \"%s\")" % (bytes2AnonArray(
binascii.unhexlify(d["data"])), packet_numbering))

indented(
"RET_ON_ERR(libusb_interrupt_transfer(devh, 0x%02X, buff, 0x%04X, &bytes_transferred, timeout));"
% (d["endp"], len(d["data"])))
if len(d["data"]):
indented(
"RET_ON_ERR(validate_read(%s, %u, buff, bytes_transferred, \"%s\"));"
% (
bytes2AnonArray(binascii.unhexlify(d["data"])),
len(d["data"]),
packet_numbering,
))
elif d["type"] == "interruptOut":
assert 0, "fixme"
data_str = bytes2AnonArray(binascii.unhexlify(d["data"]))
indented("interruptWrite(0x%02X, %s)" % (d["endp"], data_str))
data_str = bytes2AnonArray(binascii.unhexlify(
d["data"])) if d["data"] else "NULL"
indented(
"RET_ON_XFER_ERR(libusb_interrupt_transfer(devh, 0x%02X, %s, 0x%04X, &bytes_transferred, timeout), 0x%04X);"
% (d["endp"], data_str, len(d["data"]), len(d["data"])))
elif d["type"] == "irpInfo":
comment("IRP_INFO(): func %s" %
(d["submit"]["urb"]["usb_func_str"], ))
Expand Down
16 changes: 9 additions & 7 deletions usbrply/pyprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def footer(self):
if not self.wrapper:
return
print('''
def open_dev(usbcontext=None):
def open_dev(vid_want, pid_want, usbcontext=None):
if usbcontext is None:
usbcontext = usb1.USBContext()
print('Scanning for devices...')
for udev in usbcontext.getDeviceList(skip_on_error=True):
vid = udev.getVendorID()
pid = udev.getProductID()
if (vid, pid) == (''' + "0x%04X, 0x%04X" % (self.vid, self.pid) + '''):
if (vid, pid) == (vid_want, pid_want):
print('Found device')
print('Bus %03i Device %03i: ID %04x:%04x' % (
udev.getBusNumber(),
Expand All @@ -120,18 +120,22 @@ def open_dev(usbcontext=None):
return udev.open()
raise Exception("Failed to find a device")
if __name__ == "__main__":
def main():
import argparse
vid_want = ''' + "0x%04X" % (self.vid, ) + '''
pid_want = ''' + "0x%04X" % (self.pid, ) + '''
parser = argparse.ArgumentParser(description='Replay captured USB packets')
args = parser.parse_args()
usbcontext = usb1.USBContext()
dev = open_dev(usbcontext)
dev = open_dev(vid_want, pid_want, usbcontext)
dev.claimInterface(0)
dev.resetDevice()
replay(dev)
if __name__ == "__main__":
main()
''',
file=printer.print_file)

Expand Down Expand Up @@ -188,7 +192,6 @@ def gett(d):
d["wIndex"], data_str))

elif d["type"] == "bulkRead":
data_str = "\"\""
indented("buff = bulkRead(0x%02X, 0x%04X)" % (d["endp"], d["len"]))
indented("validate_read(%s, buff, \"%s\")" % (bytes2AnonArray(
binascii.unhexlify(d["data"])), packet_numbering))
Expand All @@ -199,7 +202,6 @@ def gett(d):
indented("bulkWrite(0x%02X, %s)" % (d["endp"], data_str))

elif d["type"] == "interruptIn":
data_str = "\"\""
indented("buff = interruptRead(0x%02X, 0x%04X)" %
(d["endp"], d["len"]))
indented("validate_read(%s, buff, \"%s\")" % (bytes2AnonArray(
Expand Down

0 comments on commit 4c4edfb

Please sign in to comment.