-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsshClient.py
281 lines (233 loc) · 9.38 KB
/
sshClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# -*- coding:utf-8 -*-
import paramiko
import time
import httpService
#import paho.mqtt.client as mqtt
hostname = "192.168.1.10" #网口直连X3机器
mqtt_ip = "119.23.212.113" # 生产用MQTT服务器
port = 22
username = "root"
password = "lFi@NovaBot"
'''
class Mqtt():
def on_connect(self, client, userdata, flags, rc):
print("hello mqtt")
def on_message(self, client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
def connect(self):
client = mqtt.Client()
client.on_connect = self.on_connect
client.on_message = self.on_message
client.connect(mqtt_ip, 1883, 60)
client.loop_start()
'''
class RemoteControl():
def __init__(self, hostname, port, username, password):
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.ssh = paramiko.SSHClient()
def reconnect(self):
try:
# 创建SSH对象
self.ssh.close()
self.ssh = paramiko.SSHClient()
# 允许连接不在~/.ssh/known_hosts文件中的主机
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
self.ssh.connect(hostname, port, username, password)
# 创建SFTP客户端
self.sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
print("SSH 连接成功")
return
except Exception as e:
print(e)
def print(self):
print(hostname, port, username, password)
def remoteCmd(self, cmd):
try:
print("执行命令:", cmd)
stdin, stdout, stderr = self.ssh.exec_command(cmd)
stdin.write("终端等待输入...\n") # 如果不需要与终端交互,则不写这两行
stdin.flush()
# 获取命令结果
res, err = stdout.read(), stderr.read()
result = res if res else err
res = result.decode(encoding="utf-8")
print(res)
return "Remote CMD:" + cmd + " execute completed"
except Exception as e:
print("执行命令失败:", e)
self.reconnect() #断线重连
self.remoteCmd(cmd) #重新执行该命令
def uploadFile(self, localPath, remotePath):
self.sftp.put(localPath, remotePath, confirm=True)
def downloadFile(self, remotePath, localPath):
self.sftp.get(remotePath, localPath)
def interactive(self):
shell = self.ssh.invoke_shell()
shell.settimeout(1)
command = input(">>>") + "\n"
shell.send(command)
while True:
try:
recv = shell.recv(512).decode()
if recv:
print(recv)
else:
continue
except:
command = input(">>>") + "\n"
shell.send(command)
def close(self):
self.sftp.close()
self.ssh.close()
def genGDC(control):
control.remoteCmd(" python3 /userdata/lfi/camera_params/gdc_map.py /userdata/lfi/camera_params/preposition_intrinsic.json /userdata/lfi/camera_params/layout_preposition.json /userdata/lfi/camera_params/gdc_map_preposition.txt")
def checkFile(control):
control.remoteCmd("ls -al /userdata/lfi/camera_params/")
control.remoteCmd("cat /userdata/lfi/json_config.json | grep code")
def uploadCameraFile(control):
preSn, panoSn = httpService.getPrePanoCameraSn()
preFile = "./camera/preposition/" + preSn + ".json"
panoFile = "./camera/panoramic/" + panoSn + ".yaml"
control.reconnect()
control.uploadFile(preFile, "/userdata/lfi/camera_params/preposition_intrinsic.json")
control.uploadFile(panoFile, "/userdata/lfi/camera_params/panoramic_intrinsic.yaml")
def copyGDCScript(control):
control.remoteCmd("cp /root/novabot/ota_lib/camera_params/gdc_map.py /userdata/lfi/camera_params/")
control.remoteCmd("cp /root/novabot/ota_lib/camera_params/layout_preposition.json /userdata/lfi/camera_params/")
def upgrade(control):
print("开始测试=================")
#input("随意点击键盘开始测试...")
# print("测试第", num, "台机器")
#num += 1
control.reconnect()
control.uploadFile("d:/lfimvpfactory20231014475.deb", "/root/lfimvpfactory20231014475.deb")
print("文件上传成功")
control.remoteCmd("dpkg -x lfimvpfactory20231014475.deb novabot.new")
print("文件解压完成")
control.remoteCmd("ls")
control.remoteCmd("cp /root/novabot.new/scripts/run_ota.sh /userdata/ota/")
control.remoteCmd("echo 1 > /userdata/ota/upgrade.txt")
control.remoteCmd("cat /userdata/ota/upgrade.txt")
time.sleep(5)
print("远程命令执行完成,开始重启机器")
control.remoteCmd("reboot -f")
print("等待30s后继续执行数据清理...")
time.sleep(30)
control.reconnect()
control.remoteCmd("rm -rf /root/lfimvpfactory20231014475.deb")
control.remoteCmd("rm -rf /root/novabot.bak")
control.remoteCmd("cat /userdata/ota/upgrade.txt")
control.remoteCmd("ls")
time.sleep(40)
while(1):
checkFile(control)
ret = input("是否结束?0:继续检测,1:结束")
if(ret == "1"):
break
print("机器已完成检测")
control.close()
print("完成测试============")
def upgrade2(control):
print("开始测试=================")
control.reconnect()
#control.remoteCmd("/root/novabot/scripts/stop_service.sh")
control.remoteCmd("rm -rf /root/novabot")
control.remoteCmd("sync")
control.uploadFile("d:/lfimvp-factory-20231114489.deb", "/root/lfimvp-factory-20231114489.deb")
control.remoteCmd("sync")
print("文件上传成功")
control.remoteCmd("dpkg -x lfimvp-factory-20231114489.deb novabot")
control.remoteCmd("sync")
control.remoteCmd("sleep 2s")
print("文件解压完成")
control.remoteCmd("rm -rf /root/lfimvp-factory-20231114489.deb")
control.remoteCmd("cat /root/novabot/Readme.txt")
control.remoteCmd("/root/novabot/scripts/start_service.sh")
control.close()
print("完成升级============")
def quit(control):
print("程序退出")
exit(0)
def startService(control):
control.remoteCmd("ps -aux | grep tof_camera_node")
num = input("是否需要重启StartService 1: 重启,0,不重启")
if (num == 1):
print("重启服务")
control.remoteCmd("~/novabot/scripts/start_service.sh")
else:
print("不需要重启服务")
return
def getMac(control):
#control.remoteCmd("bash /usr/bin/startbt6212.sh & ") #获取蓝牙MAC,此处会一直阻塞在这里,需优化执行脚本
#control.remoteCmd("cat /bl.cfg | grep \"BD Address\" | awk '{print $3}'")
control.remoteCmd("cat /userdata/lfi/ble_mac.txt ")
def recharge(control):
control.remoteCmd("/root/novabot/debug_sh/test_recharge.sh ")
def agingTest(control):
control.remoteCmd("nohup python3 /root/novabot/debug_sh/chassis_Aging_Test.py&")
# control.remoteCmd("nohup python3 /root/novabot/debug_sh/chassis_Aging_Test.py&")
def model2User(control):
control.remoteCmd("sed -i 's/flag=true/flag=false/' /root/novabot/test_scripts/factory_test/start_test.sh ")
def model2Factory(control):
control.remoteCmd("sed -i 's/flag=false/flag=true/' /root/novabot/test_scripts/factory_test/start_test.sh ")
def modelStatus(control):
control.remoteCmd("grep 'flag=' /root/novabot/test_scripts/factory_test/start_test.sh ")
def checkGDC(control):
control.uploadFile("d:/verify_txt.py", "~/verify_txt.py")
control.remoteCmd("python3 verify_txt.py -g /userdata/lfi/camera_params/gdc_map_preposition.txt")
def checkBT(control):
control.remoteCmd("hciconfig")
def checkMotorSpeed(control):
#control.remoteCmd("source /opt/ros/galactic/setup.bash ")
control.remoteCmd("ros2 topic echo blade_speed_get") #电机转速
def checkMotorCurrent(control):
control.remoteCmd("ros2 topic echo motor_current") # 电机电流
def main():
switch_dict = {
0: quit,
1: genGDC,
2: uploadCameraFile,
3: upgrade2,
4: checkFile,
5: getMac,
6: copyGDCScript,
7: startService,
8: recharge,
9: agingTest,
10: model2User,
11: model2Factory,
12: modelStatus,
13: checkBT,
14: checkMotorCurrent,
15: checkMotorSpeed
}
control = RemoteControl(hostname, port, username, password)
# 通过SSH网口直连机器
print("请先通过网口直连Novabot机器")
control.reconnect()
while (1):
print("操作说明:\n"
"0:退出\n"
"1:生成GDC文件\n"
"2:上传相机内参文件\n"
"3:手动OTA升级\n"
"4:文件完整性检测\n"
"5: 获取X3蓝牙MAC\n"
"6: 拷贝GDC执行脚本文件\n"
"7: 检查是否需要重启Service\n"
"8: TTT===自动回充测试\n"
"9: TTT===老化测试\n"
"10: ===>切换到用户模式\n"
"11: ===>切换到工厂模式\n"
"12: ===>工作模式状态查询\n"
"13: 检查蓝牙驱动\n"
"14: 电机电流\n"
"15: 电机转速\n")
num = input("请输入操作项ID:")
switch_dict.get(int(num), quit)(control)
if __name__ == "__main__":
main()