Teleoperation for Data Collection¶
Teleoperation is the most common method for collecting high-quality robot demonstrations.
Teleoperation Devices¶
SpaceMouse (Recommended for Arm Manipulation)¶
Pros: - 6-DoF control - Intuitive for manipulation - Affordable ($150-400) - Low latency
Setup:
import spacemouse
# Initialize
device = spacemouse.SpaceMouse()
# Read 6-DoF pose
while True:
state = device.read()
# state.x, state.y, state.z (translation)
# state.roll, state.pitch, state.yaw (rotation)
# state.buttons[0] # Gripper control
# Map to robot actions
action = map_spacemouse_to_robot(state)
robot.execute(action)
Recommended Models: - SpaceMouse Compact: $150, good for beginners - SpaceMouse Pro: $400, better ergonomics - SpaceMouse Wireless: $250, no cables
VR Controllers¶
Pros: - Immersive control - Natural hand movements - Can control bimanual robots - Spatial awareness
Cons: - More expensive ($300-1000) - Requires VR headset - Setup complexity
Setup with Meta Quest:
import oculus_reader
class VRTeleopController:
def __init__(self, robot):
self.robot = robot
self.vr = oculus_reader.OculusReader()
def collect_demonstration(self):
demo = []
while True:
# Get VR controller poses
transforms, buttons = self.vr.get_transformations_and_buttons()
# Right controller -> robot end-effector
right_pos = transforms['r'][:3, 3]
right_rot = transforms['r'][:3, :3]
# Map to robot workspace
robot_pos = self.map_vr_to_robot_workspace(right_pos)
robot_rot = self.map_vr_to_robot_rotation(right_rot)
# Gripper from trigger
gripper = 1.0 if buttons['RTr'] else 0.0
action = np.concatenate([robot_pos, robot_rot, [gripper]])
demo.append({
'obs': self.robot.get_observation(),
'action': action
})
self.robot.execute(action)
# Exit condition
if buttons['A']:
break
return demo
Keyboard + Mouse¶
Pros: - No extra hardware needed - Good for simple tasks - Easy to set up
Cons: - Not intuitive for 6-DoF control - Slower demonstrations - Lower quality
Setup:
import pygame
class KeyboardTeleop:
def __init__(self, robot):
self.robot = robot
pygame.init()
self.screen = pygame.display.set_mode((800, 600))
# Control gains
self.translation_gain = 0.01
self.rotation_gain = 0.05
def run(self):
current_action = np.zeros(7)
while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
return
keys = pygame.key.get_pressed()
# Translation
if keys[pygame.K_w]: current_action[0] += self.translation_gain
if keys[pygame.K_s]: current_action[0] -= self.translation_gain
if keys[pygame.K_a]: current_action[1] += self.translation_gain
if keys[pygame.K_d]: current_action[1] -= self.translation_gain
if keys[pygame.K_q]: current_action[2] += self.translation_gain
if keys[pygame.K_e]: current_action[2] -= self.translation_gain
# Rotation
if keys[pygame.K_i]: current_action[3] += self.rotation_gain
if keys[pygame.K_k]: current_action[3] -= self.rotation_gain
if keys[pygame.K_j]: current_action[4] += self.rotation_gain
if keys[pygame.K_l]: current_action[4] -= self.rotation_gain
# Gripper
if keys[pygame.K_SPACE]: current_action[6] = 1.0
else: current_action[6] = 0.0
self.robot.execute(current_action)
pygame.time.wait(10)
Haptic Devices (Advanced)¶
Devices like Touch/Phantom Omni provide force feedback:
Pros: - Force feedback improves quality - High precision - Natural feel
Cons: - Expensive ($2000-10000+) - Complex setup - Limited workspace
Game Controllers¶
Xbox/PlayStation controllers for mobile robots:
import inputs
def collect_with_gamepad(robot):
while True:
events = inputs.get_gamepad()
for event in events:
if event.ev_type == 'Absolute':
if event.code == 'ABS_X': # Left stick X
linear_vel = event.state / 32768.0
elif event.code == 'ABS_RZ': # Right stick Y
angular_vel = event.state / 32768.0
elif event.ev_type == 'Key':
if event.code == 'BTN_SOUTH' and event.state == 1:
# A button pressed
robot.stop()
break
robot.set_velocity(linear_vel, angular_vel)
Best Practices¶
1. Workspace Mapping¶
Map teleoperation device workspace to robot workspace:
class WorkspaceMapper:
def __init__(self, teleop_bounds, robot_bounds):
self.teleop_bounds = teleop_bounds
self.robot_bounds = robot_bounds
def map_position(self, teleop_pos):
# Normalize to [0, 1]
normalized = (teleop_pos - self.teleop_bounds['min']) / (
self.teleop_bounds['max'] - self.teleop_bounds['min']
)
# Scale to robot workspace
robot_pos = (
self.robot_bounds['min'] +
normalized * (self.robot_bounds['max'] - self.robot_bounds['min'])
)
# Clamp to safe bounds
robot_pos = np.clip(robot_pos, self.robot_bounds['min'], self.robot_bounds['max'])
return robot_pos
2. Safety Limits¶
Always enforce safety constraints:
class SafeTeleopController:
def __init__(self, robot, max_velocity=0.5, workspace_bounds=None):
self.robot = robot
self.max_velocity = max_velocity
self.workspace_bounds = workspace_bounds
def execute_safe(self, action):
# Limit velocity
action_velocity = np.linalg.norm(action[:3])
if action_velocity > self.max_velocity:
action[:3] = action[:3] / action_velocity * self.max_velocity
# Check workspace bounds
predicted_pos = self.robot.get_ee_position() + action[:3]
if not self.in_workspace(predicted_pos):
print("Warning: Action would leave workspace, clamping")
action[:3] = self.clamp_to_workspace(action[:3])
self.robot.execute(action)
def in_workspace(self, position):
if self.workspace_bounds is None:
return True
return np.all(position >= self.workspace_bounds['min']) and \
np.all(position <= self.workspace_bounds['max'])
3. Recording Pipeline¶
Complete teleoperation recording system:
import cv2
import time
from collections import deque
class TeleopRecorder:
def __init__(self, robot, camera, device, save_dir):
self.robot = robot
self.camera = camera
self.device = device
self.save_dir = Path(save_dir)
self.save_dir.mkdir(parents=True, exist_ok=True)
def record_episode(self, fps=30):
episode_data = []
frame_buffer = deque(maxlen=3) # For temporal smoothing
print("Starting recording in 3...")
time.sleep(1)
print("2...")
time.sleep(1)
print("1...")
time.sleep(1)
print("Recording!")
start_time = time.time()
frame_idx = 0
while True:
loop_start = time.time()
# Get teleoperation input
teleop_input = self.device.read()
# Map to robot action
action = self.map_input_to_action(teleop_input)
# Get robot observation
obs = {
'image': self.camera.get_image(),
'state': self.robot.get_state(),
'ee_pose': self.robot.get_ee_pose()
}
# Smooth actions (optional)
frame_buffer.append(action)
smoothed_action = np.mean(frame_buffer, axis=0)
# Execute
self.robot.execute(smoothed_action)
# Record
episode_data.append({
'frame_idx': frame_idx,
'timestamp': time.time() - start_time,
'observation': obs,
'action': smoothed_action,
'raw_action': action
})
frame_idx += 1
# Check for stop condition
if teleop_input.stop_button:
print("Recording stopped by user")
break
# Maintain FPS
elapsed = time.time() - loop_start
if elapsed < 1.0 / fps:
time.sleep(1.0 / fps - elapsed)
# Save episode
self.save_episode(episode_data)
return episode_data
def save_episode(self, episode_data):
episode_idx = len(list(self.save_dir.glob("episode_*.pkl")))
# Save data
import pickle
with open(self.save_dir / f"episode_{episode_idx:06d}.pkl", 'wb') as f:
pickle.dump(episode_data, f)
# Save video
video_path = self.save_dir / f"episode_{episode_idx:06d}.mp4"
frames = [step['observation']['image'] for step in episode_data]
self.save_video(frames, video_path)
print(f"Saved episode {episode_idx} with {len(episode_data)} frames")
def save_video(self, frames, path, fps=30):
height, width = frames[0].shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(str(path), fourcc, fps, (width, height))
for frame in frames:
writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
writer.release()
Common Issues¶
Issue 1: Latency¶
Symptom: Delay between teleoperation input and robot response
Solutions: - Use wired connections instead of wireless - Reduce control frequency if needed - Optimize observation encoding - Use predictive display
Issue 2: Coordinate Frame Mismatch¶
Symptom: Robot moves in unexpected directions
Solution: Align coordinate frames
def align_frames(teleop_input, camera_frame, robot_frame):
# Rotation from camera to robot frame
R_cam_to_robot = compute_rotation_matrix(camera_frame, robot_frame)
# Transform action
action_robot_frame = R_cam_to_robot @ teleop_input
return action_robot_frame
Issue 3: Unstable Control¶
Symptom: Jerky, oscillating movements
Solutions: - Add low-pass filter - Reduce control gains - Implement dead zones
class StabilizedControl:
def __init__(self, cutoff_freq=5.0, dt=0.03):
self.dt = dt
self.alpha = dt / (dt + 1.0 / (2 * np.pi * cutoff_freq))
self.prev_action = None
def filter(self, action):
if self.prev_action is None:
self.prev_action = action
return action
# Low-pass filter
filtered = self.alpha * action + (1 - self.alpha) * self.prev_action
self.prev_action = filtered
return filtered
Evaluation Metrics¶
Track these metrics during teleoperation:
class TeleopMetrics:
def __init__(self):
self.success_count = 0
self.total_episodes = 0
self.episode_lengths = []
self.action_magnitudes = []
def log_episode(self, episode_data, success):
self.total_episodes += 1
if success:
self.success_count += 1
self.episode_lengths.append(len(episode_data))
actions = [step['action'] for step in episode_data]
self.action_magnitudes.extend([np.linalg.norm(a) for a in actions])
def report(self):
print(f"Success Rate: {self.success_count / self.total_episodes * 100:.1f}%")
print(f"Avg Episode Length: {np.mean(self.episode_lengths):.1f} steps")
print(f"Avg Action Magnitude: {np.mean(self.action_magnitudes):.3f}")
Next Steps¶
- Human Demonstrations - Best practices for collecting demos
- Data Quality - Ensuring high-quality data
- LeRobot Format - Structuring your dataset