Skip to content

Teleoperation for Data Collection

Teleoperation is the most common method for collecting high-quality robot demonstrations.

Teleoperation Devices

Pros: - 6-DoF control - Intuitive for manipulation - Affordable ($150-400) - Low latency

Setup:

import spacemouse

# Initialize
device = spacemouse.SpaceMouse()

# Read 6-DoF pose
while True:
    state = device.read()
    # state.x, state.y, state.z (translation)
    # state.roll, state.pitch, state.yaw (rotation)
    # state.buttons[0]  # Gripper control

    # Map to robot actions
    action = map_spacemouse_to_robot(state)
    robot.execute(action)

Recommended Models: - SpaceMouse Compact: $150, good for beginners - SpaceMouse Pro: $400, better ergonomics - SpaceMouse Wireless: $250, no cables

VR Controllers

Pros: - Immersive control - Natural hand movements - Can control bimanual robots - Spatial awareness

Cons: - More expensive ($300-1000) - Requires VR headset - Setup complexity

Setup with Meta Quest:

import oculus_reader

class VRTeleopController:
    def __init__(self, robot):
        self.robot = robot
        self.vr = oculus_reader.OculusReader()

    def collect_demonstration(self):
        demo = []

        while True:
            # Get VR controller poses
            transforms, buttons = self.vr.get_transformations_and_buttons()

            # Right controller -> robot end-effector
            right_pos = transforms['r'][:3, 3]
            right_rot = transforms['r'][:3, :3]

            # Map to robot workspace
            robot_pos = self.map_vr_to_robot_workspace(right_pos)
            robot_rot = self.map_vr_to_robot_rotation(right_rot)

            # Gripper from trigger
            gripper = 1.0 if buttons['RTr'] else 0.0

            action = np.concatenate([robot_pos, robot_rot, [gripper]])
            demo.append({
                'obs': self.robot.get_observation(),
                'action': action
            })

            self.robot.execute(action)

            # Exit condition
            if buttons['A']:
                break

        return demo

Keyboard + Mouse

Pros: - No extra hardware needed - Good for simple tasks - Easy to set up

Cons: - Not intuitive for 6-DoF control - Slower demonstrations - Lower quality

Setup:

import pygame

class KeyboardTeleop:
    def __init__(self, robot):
        self.robot = robot
        pygame.init()
        self.screen = pygame.display.set_mode((800, 600))

        # Control gains
        self.translation_gain = 0.01
        self.rotation_gain = 0.05

    def run(self):
        current_action = np.zeros(7)

        while True:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    return

            keys = pygame.key.get_pressed()

            # Translation
            if keys[pygame.K_w]: current_action[0] += self.translation_gain
            if keys[pygame.K_s]: current_action[0] -= self.translation_gain
            if keys[pygame.K_a]: current_action[1] += self.translation_gain
            if keys[pygame.K_d]: current_action[1] -= self.translation_gain
            if keys[pygame.K_q]: current_action[2] += self.translation_gain
            if keys[pygame.K_e]: current_action[2] -= self.translation_gain

            # Rotation
            if keys[pygame.K_i]: current_action[3] += self.rotation_gain
            if keys[pygame.K_k]: current_action[3] -= self.rotation_gain
            if keys[pygame.K_j]: current_action[4] += self.rotation_gain
            if keys[pygame.K_l]: current_action[4] -= self.rotation_gain

            # Gripper
            if keys[pygame.K_SPACE]: current_action[6] = 1.0
            else: current_action[6] = 0.0

            self.robot.execute(current_action)
            pygame.time.wait(10)

Haptic Devices (Advanced)

Devices like Touch/Phantom Omni provide force feedback:

Pros: - Force feedback improves quality - High precision - Natural feel

Cons: - Expensive ($2000-10000+) - Complex setup - Limited workspace

Game Controllers

Xbox/PlayStation controllers for mobile robots:

import inputs

def collect_with_gamepad(robot):
    while True:
        events = inputs.get_gamepad()
        for event in events:
            if event.ev_type == 'Absolute':
                if event.code == 'ABS_X':  # Left stick X
                    linear_vel = event.state / 32768.0
                elif event.code == 'ABS_RZ':  # Right stick Y
                    angular_vel = event.state / 32768.0

            elif event.ev_type == 'Key':
                if event.code == 'BTN_SOUTH' and event.state == 1:
                    # A button pressed
                    robot.stop()
                    break

        robot.set_velocity(linear_vel, angular_vel)

Best Practices

1. Workspace Mapping

Map teleoperation device workspace to robot workspace:

class WorkspaceMapper:
    def __init__(self, teleop_bounds, robot_bounds):
        self.teleop_bounds = teleop_bounds
        self.robot_bounds = robot_bounds

    def map_position(self, teleop_pos):
        # Normalize to [0, 1]
        normalized = (teleop_pos - self.teleop_bounds['min']) / (
            self.teleop_bounds['max'] - self.teleop_bounds['min']
        )

        # Scale to robot workspace
        robot_pos = (
            self.robot_bounds['min'] +
            normalized * (self.robot_bounds['max'] - self.robot_bounds['min'])
        )

        # Clamp to safe bounds
        robot_pos = np.clip(robot_pos, self.robot_bounds['min'], self.robot_bounds['max'])

        return robot_pos

2. Safety Limits

Always enforce safety constraints:

class SafeTeleopController:
    def __init__(self, robot, max_velocity=0.5, workspace_bounds=None):
        self.robot = robot
        self.max_velocity = max_velocity
        self.workspace_bounds = workspace_bounds

    def execute_safe(self, action):
        # Limit velocity
        action_velocity = np.linalg.norm(action[:3])
        if action_velocity > self.max_velocity:
            action[:3] = action[:3] / action_velocity * self.max_velocity

        # Check workspace bounds
        predicted_pos = self.robot.get_ee_position() + action[:3]
        if not self.in_workspace(predicted_pos):
            print("Warning: Action would leave workspace, clamping")
            action[:3] = self.clamp_to_workspace(action[:3])

        self.robot.execute(action)

    def in_workspace(self, position):
        if self.workspace_bounds is None:
            return True
        return np.all(position >= self.workspace_bounds['min']) and \
               np.all(position <= self.workspace_bounds['max'])

3. Recording Pipeline

Complete teleoperation recording system:

import cv2
import time
from collections import deque

class TeleopRecorder:
    def __init__(self, robot, camera, device, save_dir):
        self.robot = robot
        self.camera = camera
        self.device = device
        self.save_dir = Path(save_dir)
        self.save_dir.mkdir(parents=True, exist_ok=True)

    def record_episode(self, fps=30):
        episode_data = []
        frame_buffer = deque(maxlen=3)  # For temporal smoothing

        print("Starting recording in 3...")
        time.sleep(1)
        print("2...")
        time.sleep(1)
        print("1...")
        time.sleep(1)
        print("Recording!")

        start_time = time.time()
        frame_idx = 0

        while True:
            loop_start = time.time()

            # Get teleoperation input
            teleop_input = self.device.read()

            # Map to robot action
            action = self.map_input_to_action(teleop_input)

            # Get robot observation
            obs = {
                'image': self.camera.get_image(),
                'state': self.robot.get_state(),
                'ee_pose': self.robot.get_ee_pose()
            }

            # Smooth actions (optional)
            frame_buffer.append(action)
            smoothed_action = np.mean(frame_buffer, axis=0)

            # Execute
            self.robot.execute(smoothed_action)

            # Record
            episode_data.append({
                'frame_idx': frame_idx,
                'timestamp': time.time() - start_time,
                'observation': obs,
                'action': smoothed_action,
                'raw_action': action
            })

            frame_idx += 1

            # Check for stop condition
            if teleop_input.stop_button:
                print("Recording stopped by user")
                break

            # Maintain FPS
            elapsed = time.time() - loop_start
            if elapsed < 1.0 / fps:
                time.sleep(1.0 / fps - elapsed)

        # Save episode
        self.save_episode(episode_data)

        return episode_data

    def save_episode(self, episode_data):
        episode_idx = len(list(self.save_dir.glob("episode_*.pkl")))

        # Save data
        import pickle
        with open(self.save_dir / f"episode_{episode_idx:06d}.pkl", 'wb') as f:
            pickle.dump(episode_data, f)

        # Save video
        video_path = self.save_dir / f"episode_{episode_idx:06d}.mp4"
        frames = [step['observation']['image'] for step in episode_data]
        self.save_video(frames, video_path)

        print(f"Saved episode {episode_idx} with {len(episode_data)} frames")

    def save_video(self, frames, path, fps=30):
        height, width = frames[0].shape[:2]
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(str(path), fourcc, fps, (width, height))

        for frame in frames:
            writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))

        writer.release()

Common Issues

Issue 1: Latency

Symptom: Delay between teleoperation input and robot response

Solutions: - Use wired connections instead of wireless - Reduce control frequency if needed - Optimize observation encoding - Use predictive display

Issue 2: Coordinate Frame Mismatch

Symptom: Robot moves in unexpected directions

Solution: Align coordinate frames

def align_frames(teleop_input, camera_frame, robot_frame):
    # Rotation from camera to robot frame
    R_cam_to_robot = compute_rotation_matrix(camera_frame, robot_frame)

    # Transform action
    action_robot_frame = R_cam_to_robot @ teleop_input

    return action_robot_frame

Issue 3: Unstable Control

Symptom: Jerky, oscillating movements

Solutions: - Add low-pass filter - Reduce control gains - Implement dead zones

class StabilizedControl:
    def __init__(self, cutoff_freq=5.0, dt=0.03):
        self.dt = dt
        self.alpha = dt / (dt + 1.0 / (2 * np.pi * cutoff_freq))
        self.prev_action = None

    def filter(self, action):
        if self.prev_action is None:
            self.prev_action = action
            return action

        # Low-pass filter
        filtered = self.alpha * action + (1 - self.alpha) * self.prev_action
        self.prev_action = filtered

        return filtered

Evaluation Metrics

Track these metrics during teleoperation:

class TeleopMetrics:
    def __init__(self):
        self.success_count = 0
        self.total_episodes = 0
        self.episode_lengths = []
        self.action_magnitudes = []

    def log_episode(self, episode_data, success):
        self.total_episodes += 1
        if success:
            self.success_count += 1

        self.episode_lengths.append(len(episode_data))

        actions = [step['action'] for step in episode_data]
        self.action_magnitudes.extend([np.linalg.norm(a) for a in actions])

    def report(self):
        print(f"Success Rate: {self.success_count / self.total_episodes * 100:.1f}%")
        print(f"Avg Episode Length: {np.mean(self.episode_lengths):.1f} steps")
        print(f"Avg Action Magnitude: {np.mean(self.action_magnitudes):.3f}")

Next Steps