forked from adafruit/circuitpython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupip_utarfile.py
99 lines (78 loc) · 2.48 KB
/
upip_utarfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# SPDX-FileCopyrightText: 2014 MicroPython & CircuitPython contributors (https://github.com/adafruit/circuitpython/graphs/contributors)
#
# SPDX-License-Identifier: MIT
import uctypes
# http://www.gnu.org/software/tar/manual/html_node/Standard.html
TAR_HEADER = {
"name": (uctypes.ARRAY | 0, uctypes.UINT8 | 100),
"size": (uctypes.ARRAY | 124, uctypes.UINT8 | 11),
}
DIRTYPE = "dir"
REGTYPE = "file"
def roundup(val, align):
return (val + align - 1) & ~(align - 1)
class FileSection:
def __init__(self, f, content_len, aligned_len):
self.f = f
self.content_len = content_len
self.align = aligned_len - content_len
def read(self, sz=65536):
if self.content_len == 0:
return b""
if sz > self.content_len:
sz = self.content_len
data = self.f.read(sz)
sz = len(data)
self.content_len -= sz
return data
def readinto(self, buf):
if self.content_len == 0:
return 0
if len(buf) > self.content_len:
buf = memoryview(buf)[: self.content_len]
sz = self.f.readinto(buf)
self.content_len -= sz
return sz
def skip(self):
sz = self.content_len + self.align
if sz:
buf = bytearray(16)
while sz:
s = min(sz, 16)
self.f.readinto(buf, s)
sz -= s
class TarInfo:
def __str__(self):
return "TarInfo(%r, %s, %d)" % (self.name, self.type, self.size)
class TarFile:
def __init__(self, name=None, fileobj=None):
if fileobj:
self.f = fileobj
else:
self.f = open(name, "rb")
self.subf = None
def next(self):
if self.subf:
self.subf.skip()
buf = self.f.read(512)
if not buf:
return None
h = uctypes.struct(uctypes.addressof(buf), TAR_HEADER, uctypes.LITTLE_ENDIAN)
# Empty block means end of archive
if h.name[0] == 0:
return None
d = TarInfo()
d.name = str(h.name, "utf-8").rstrip("\0")
d.size = int(bytes(h.size), 8)
d.type = [REGTYPE, DIRTYPE][d.name[-1] == "/"]
self.subf = d.subf = FileSection(self.f, d.size, roundup(d.size, 512))
return d
def __iter__(self):
return self
def __next__(self):
v = self.next()
if v is None:
raise StopIteration
return v
def extractfile(self, tarinfo):
return tarinfo.subf