forked from Flipper-XFW/Xtreme-Firmware
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wifi_board.py
executable file
·240 lines (192 loc) · 7.67 KB
/
wifi_board.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
from flipper.app import App
from serial.tools.list_ports_common import ListPortInfo
import logging
import os
import tempfile
import subprocess
import serial.tools.list_ports as list_ports
import json
import requests
import tarfile
class UpdateDownloader:
UPDATE_SERVER = "https://update.flipperzero.one"
UPDATE_PROJECT = "/blackmagic-firmware"
UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json"
UPDATE_TYPE = "full_tgz"
CHANNEL_ID_ALIAS = {
"dev": "development",
"rc": "release-candidate",
"r": "release",
"rel": "release",
}
def __init__(self):
self.logger = logging.getLogger()
def download(self, channel_id: str, dir: str) -> bool:
# Aliases
if channel_id in self.CHANNEL_ID_ALIAS:
channel_id = self.CHANNEL_ID_ALIAS[channel_id]
# Make directory
if not os.path.exists(dir):
self.logger.info(f"Creating directory {dir}")
os.makedirs(dir)
# Download json index
self.logger.info(f"Downloading {self.UPDATE_INDEX}")
response = requests.get(self.UPDATE_INDEX)
if response.status_code != 200:
self.logger.error(f"Failed to download {self.UPDATE_INDEX}")
return False
# Parse json index
try:
index = json.loads(response.content)
except Exception as e:
self.logger.error(f"Failed to parse json index: {e}")
return False
# Find channel
channel = None
for channel_candidate in index["channels"]:
if channel_candidate["id"] == channel_id:
channel = channel_candidate
break
# Check if channel found
if channel is None:
self.logger.error(
f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}"
)
return False
self.logger.info(f"Using channel '{channel_id}'")
# Get latest version
try:
version = channel["versions"][0]
except Exception as e:
self.logger.error(f"Failed to get version: {e}")
return False
self.logger.info(f"Using version '{version['version']}'")
# Get changelog
changelog = None
try:
changelog = version["changelog"]
except Exception as e:
self.logger.error(f"Failed to get changelog: {e}")
# print changelog
if changelog is not None:
self.logger.info(f"Changelog:")
for line in changelog.split("\n"):
if line.strip() == "":
continue
self.logger.info(f" {line}")
# Find file
file_url = None
for file_candidate in version["files"]:
if file_candidate["type"] == self.UPDATE_TYPE:
file_url = file_candidate["url"]
break
if file_url is None:
self.logger.error(f"File not found")
return False
# Make file path
file_name = file_url.split("/")[-1]
file_path = os.path.join(dir, file_name)
# Download file
self.logger.info(f"Downloading {file_url} to {file_path}")
with open(file_path, "wb") as f:
response = requests.get(file_url)
f.write(response.content)
# Unzip tgz
self.logger.info(f"Unzipping {file_path}")
with tarfile.open(file_path, "r") as tar:
tar.extractall(dir)
return True
class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
self.parser.add_argument(
"-c", "--channel", help="Channel name", default="release"
)
self.parser.set_defaults(func=self.update)
# logging
self.logger = logging.getLogger()
def find_wifi_board(self) -> bool:
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic")) # type: ignore
daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP")) # type: ignore
return len(blackmagics) > 0 or len(daps) > 0
def find_wifi_board_bootloader(self):
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2")) # type: ignore
if len(ports) == 0:
# Blackmagic probe serial port not found, will be handled later
pass
elif len(ports) > 1:
raise Exception("More than one WiFi board found")
else:
port = ports[0]
if os.name == "nt":
port.device = f"\\\\.\\{port.device}"
return port.device
def update(self):
try:
port = self.find_wifi_board_bootloader()
except Exception as e:
self.logger.error(f"{e}")
return 1
if self.args.port != "auto":
port = self.args.port
available_ports = [p[0] for p in list(list_ports.comports())]
if port not in available_ports:
self.logger.error(f"Port {port} not found")
return 1
if port is None:
if self.find_wifi_board():
self.logger.error("WiFi board found, but not in bootloader mode.")
self.logger.info("Please hold down BOOT button and press RESET button")
else:
self.logger.error("WiFi board not found")
self.logger.info(
"Please connect WiFi board to your computer, hold down BOOT button and press RESET button"
)
return 1
# get temporary dir
with tempfile.TemporaryDirectory() as temp_dir:
downloader = UpdateDownloader()
# download latest channel update
try:
if not downloader.download(self.args.channel, temp_dir):
self.logger.error(f"Cannot download update")
return 1
except Exception as e:
self.logger.error(f"Cannot download update: {e}")
return 1
with open(os.path.join(temp_dir, "flash.command"), "r") as f:
flash_command = f.read()
flash_command = flash_command.replace("\n", "").replace("\r", "")
flash_command = flash_command.replace("(PORT)", port)
# We can't reset the board after flashing via usb
flash_command = flash_command.replace(
"--after hard_reset", "--after no_reset_stub"
)
args = flash_command.split(" ")[0:]
args = list(filter(None, args))
esptool_params = []
esptool_params.extend(args)
self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"')
process = subprocess.Popen(
esptool_params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=temp_dir,
bufsize=1,
universal_newlines=True,
)
while process.poll() is None:
if process.stdout is not None:
for line in process.stdout:
self.logger.debug(f"{line.strip()}")
if process.returncode != 0:
self.logger.error(f"Failed to flash WiFi board")
else:
self.logger.info("WiFi board flashed successfully")
self.logger.info("Press RESET button on WiFi board to start it")
return process.returncode
if __name__ == "__main__":
Main()()