forked from MarkFzp/mobile-aloha
-
Notifications
You must be signed in to change notification settings - Fork 0
/
robot_utils.py
206 lines (175 loc) · 7.95 KB
/
robot_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import numpy as np
import time
from constants import DT
from interbotix_xs_msgs.msg import JointSingleCommand
import IPython
e = IPython.embed
class ImageRecorder:
def __init__(self, init_node=True, is_debug=False):
from collections import deque
import rospy
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
self.is_debug = is_debug
self.bridge = CvBridge()
self.camera_names = ['cam_high', 'cam_left_wrist', 'cam_right_wrist'] #['cam_high', 'cam_low', 'cam_left_wrist', 'cam_right_wrist']
if init_node:
rospy.init_node('image_recorder', anonymous=True)
for cam_name in self.camera_names:
setattr(self, f'{cam_name}_image', None)
setattr(self, f'{cam_name}_secs', None)
setattr(self, f'{cam_name}_nsecs', None)
if cam_name == 'cam_high':
callback_func = self.image_cb_cam_high
elif cam_name == 'cam_low':
callback_func = self.image_cb_cam_low
elif cam_name == 'cam_left_wrist':
callback_func = self.image_cb_cam_left_wrist
elif cam_name == 'cam_right_wrist':
callback_func = self.image_cb_cam_right_wrist
else:
raise NotImplementedError
rospy.Subscriber(f"/usb_{cam_name}/image_raw", Image, callback_func)
if self.is_debug:
setattr(self, f'{cam_name}_timestamps', deque(maxlen=50))
time.sleep(0.5)
def image_cb(self, cam_name, data):
setattr(self, f'{cam_name}_image', self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough'))
setattr(self, f'{cam_name}_secs', data.header.stamp.secs)
setattr(self, f'{cam_name}_nsecs', data.header.stamp.nsecs)
# cv2.imwrite('/home/tonyzhao/Desktop/sample.jpg', cv_image)
if self.is_debug:
getattr(self, f'{cam_name}_timestamps').append(data.header.stamp.secs + data.header.stamp.secs * 1e-9)
def image_cb_cam_high(self, data):
cam_name = 'cam_high'
return self.image_cb(cam_name, data)
def image_cb_cam_low(self, data):
cam_name = 'cam_low'
return self.image_cb(cam_name, data)
def image_cb_cam_left_wrist(self, data):
cam_name = 'cam_left_wrist'
return self.image_cb(cam_name, data)
def image_cb_cam_right_wrist(self, data):
cam_name = 'cam_right_wrist'
return self.image_cb(cam_name, data)
def get_images(self):
image_dict = dict()
for cam_name in self.camera_names:
image_dict[cam_name] = getattr(self, f'{cam_name}_image')
return image_dict
def print_diagnostics(self):
def dt_helper(l):
l = np.array(l)
diff = l[1:] - l[:-1]
return np.mean(diff)
for cam_name in self.camera_names:
image_freq = 1 / dt_helper(getattr(self, f'{cam_name}_timestamps'))
print(f'{cam_name} {image_freq=:.2f}')
print()
class Recorder:
def __init__(self, side, init_node=True, is_debug=False):
from collections import deque
import rospy
from sensor_msgs.msg import JointState
from interbotix_xs_msgs.msg import JointGroupCommand, JointSingleCommand
self.secs = None
self.nsecs = None
self.qpos = None
self.effort = None
self.arm_command = None
self.gripper_command = None
self.is_debug = is_debug
if init_node:
rospy.init_node('recorder', anonymous=True)
rospy.Subscriber(f"/puppet_{side}/joint_states", JointState, self.puppet_state_cb)
rospy.Subscriber(f"/puppet_{side}/commands/joint_group", JointGroupCommand, self.puppet_arm_commands_cb)
rospy.Subscriber(f"/puppet_{side}/commands/joint_single", JointSingleCommand, self.puppet_gripper_commands_cb)
if self.is_debug:
self.joint_timestamps = deque(maxlen=50)
self.arm_command_timestamps = deque(maxlen=50)
self.gripper_command_timestamps = deque(maxlen=50)
time.sleep(0.1)
def puppet_state_cb(self, data):
self.qpos = data.position
self.qvel = data.velocity
self.effort = data.effort
self.data = data
if self.is_debug:
self.joint_timestamps.append(time.time())
def puppet_arm_commands_cb(self, data):
self.arm_command = data.cmd
if self.is_debug:
self.arm_command_timestamps.append(time.time())
def puppet_gripper_commands_cb(self, data):
self.gripper_command = data.cmd
if self.is_debug:
self.gripper_command_timestamps.append(time.time())
def print_diagnostics(self):
def dt_helper(l):
l = np.array(l)
diff = l[1:] - l[:-1]
return np.mean(diff)
joint_freq = 1 / dt_helper(self.joint_timestamps)
arm_command_freq = 1 / dt_helper(self.arm_command_timestamps)
gripper_command_freq = 1 / dt_helper(self.gripper_command_timestamps)
print(f'{joint_freq=:.2f}\n{arm_command_freq=:.2f}\n{gripper_command_freq=:.2f}\n')
def get_arm_joint_positions(bot):
return bot.arm.core.joint_states.position[:6]
def get_arm_gripper_positions(bot):
joint_position = bot.gripper.core.joint_states.position[6]
return joint_position
def move_arms(bot_list, target_pose_list, move_time=1):
num_steps = int(move_time / DT)
curr_pose_list = [get_arm_joint_positions(bot) for bot in bot_list]
traj_list = [np.linspace(curr_pose, target_pose, num_steps) for curr_pose, target_pose in zip(curr_pose_list, target_pose_list)]
for t in range(num_steps):
for bot_id, bot in enumerate(bot_list):
bot.arm.set_joint_positions(traj_list[bot_id][t], blocking=False)
time.sleep(DT)
def move_grippers(bot_list, target_pose_list, move_time):
gripper_command = JointSingleCommand(name="gripper")
num_steps = int(move_time / DT)
curr_pose_list = [get_arm_gripper_positions(bot) for bot in bot_list]
traj_list = [np.linspace(curr_pose, target_pose, num_steps) for curr_pose, target_pose in zip(curr_pose_list, target_pose_list)]
for t in range(num_steps):
for bot_id, bot in enumerate(bot_list):
gripper_command.cmd = traj_list[bot_id][t]
bot.gripper.core.pub_single.publish(gripper_command)
time.sleep(DT)
def setup_puppet_bot(bot):
bot.dxl.robot_reboot_motors("single", "gripper", True)
bot.dxl.robot_set_operating_modes("group", "arm", "position")
bot.dxl.robot_set_operating_modes("single", "gripper", "current_based_position")
torque_on(bot)
def setup_master_bot(bot):
bot.dxl.robot_set_operating_modes("group", "arm", "pwm")
bot.dxl.robot_set_operating_modes("single", "gripper", "current_based_position")
torque_off(bot)
def set_standard_pid_gains(bot):
bot.dxl.robot_set_motor_registers("group", "arm", 'Position_P_Gain', 800)
bot.dxl.robot_set_motor_registers("group", "arm", 'Position_I_Gain', 0)
def set_low_pid_gains(bot):
bot.dxl.robot_set_motor_registers("group", "arm", 'Position_P_Gain', 100)
bot.dxl.robot_set_motor_registers("group", "arm", 'Position_I_Gain', 0)
def torque_off(bot):
bot.dxl.robot_torque_enable("group", "arm", False)
bot.dxl.robot_torque_enable("single", "gripper", False)
def torque_on(bot):
bot.dxl.robot_torque_enable("group", "arm", True)
bot.dxl.robot_torque_enable("single", "gripper", True)
def calibrate_linear_vel(base_action, c=None):
if c is None:
c = 0.
v = base_action[..., 0]
w = base_action[..., 1]
base_action = base_action.copy()
base_action[..., 0] = v - c * w
return base_action
def smooth_base_action(base_action):
return np.stack([
np.convolve(base_action[:, i], np.ones(5)/5, mode='same') for i in range(base_action.shape[1])
], axis=-1).astype(np.float32)
def postprocess_base_action(base_action):
linear_vel, angular_vel = base_action
angular_vel *= 0.9
return np.array([linear_vel, angular_vel])