Skip to content

Collecting Demonstration Data

Guide to collecting high-quality demonstration data for imitation learning.

Collection Methods

Teleoperation

Control robot remotely to create demonstrations.

class TeleoperationInterface:
    def __init__(self, robot, input_device):
        self.robot = robot
        self.device = input_device  # Joystick, SpaceMouse, VR controller
        self.demonstrations = []

    def collect_demonstration(self):
        demo = {
            'observations': [],
            'actions': [],
            'metadata': {}
        }

        obs = self.robot.reset()
        done = False

        while not done:
            # Get teleoperation input
            action = self.device.get_action()

            # Scale and apply action
            robot_action = self.scale_action(action)
            next_obs, reward, done, info = self.robot.step(robot_action)

            # Record
            demo['observations'].append(obs)
            demo['actions'].append(robot_action)

            obs = next_obs

        # Save if successful
        if info.get('is_success', False):
            self.demonstrations.append(demo)

        return demo

Kinesthetic Teaching

Physically guide robot through motions.

class KinestheticTeaching:
    def __init__(self, robot):
        self.robot = robot

    def start_recording(self):
        # Enable gravity compensation mode
        self.robot.set_compliance_mode(True)
        self.recording_data = []

    def record_step(self):
        # Record current state and compute action
        current_state = self.robot.get_state()
        self.recording_data.append({
            'timestamp': time.time(),
            'joint_positions': current_state['q'],
            'joint_velocities': current_state['dq'],
            'ee_pose': current_state['ee_pose']
        })

    def stop_recording(self):
        self.robot.set_compliance_mode(False)

        # Compute actions from trajectory
        actions = self.compute_actions_from_trajectory(self.recording_data)

        return self.recording_data, actions

VR Interface

Use VR controllers for intuitive teleoperation.

class VRTeleoperation:
    def __init__(self, robot):
        self.robot = robot
        self.vr_system = initialize_vr()

    def collect_demo(self):
        obs = self.robot.reset()
        demo = []

        while True:
            # Get VR controller pose
            controller_pose = self.vr_system.get_controller_pose()

            # Map to robot action
            target_ee_pose = self.map_vr_to_robot(controller_pose)
            action = self.robot.compute_ik(target_ee_pose)

            # Execute
            next_obs, _, done, _ = self.robot.step(action)

            demo.append({
                'obs': obs,
                'action': action,
                'vr_pose': controller_pose
            })

            if self.vr_system.button_pressed('trigger'):
                self.robot.close_gripper()
            elif self.vr_system.button_pressed('grip'):
                break  # End demonstration

            obs = next_obs

        return demo

Data Quality

Quality Metrics

def assess_demonstration_quality(demo):
    metrics = {}

    # 1. Smoothness (low jerk)
    actions = np.array([step['action'] for step in demo])
    jerk = np.diff(actions, n=2, axis=0)
    metrics['smoothness'] = -np.mean(np.abs(jerk))

    # 2. Success
    metrics['success'] = demo[-1].get('is_success', False)

    # 3. Efficiency (short trajectory)
    metrics['efficiency'] = 1.0 / len(demo)

    # 4. Safety (no collisions)
    metrics['safety'] = not any(step.get('collision', False) for step in demo)

    # Overall score
    metrics['quality_score'] = (
        0.3 * metrics['smoothness'] +
        0.4 * metrics['success'] +
        0.2 * metrics['efficiency'] +
        0.1 * metrics['safety']
    )

    return metrics

Data Filtering

def filter_demonstrations(demos, quality_threshold=0.7):
    filtered = []

    for demo in demos:
        quality = assess_demonstration_quality(demo)

        if quality['quality_score'] >= quality_threshold:
            filtered.append(demo)
        else:
            print(f"Rejecting demo with quality {quality['quality_score']:.2f}")

    print(f"Kept {len(filtered)}/{len(demos)} demonstrations")
    return filtered

Data Augmentation

def augment_demonstration(demo, augmentation_config):
    augmented_demos = [demo]  # Include original

    # Time warping
    if augmentation_config.get('time_warp', False):
        for factor in [0.8, 0.9, 1.1, 1.2]:
            augmented_demos.append(time_warp_demo(demo, factor))

    # Noise injection
    if augmentation_config.get('noise', False):
        for noise_level in [0.01, 0.02]:
            augmented_demos.append(add_noise_to_demo(demo, noise_level))

    # Spatial perturbation
    if augmentation_config.get('spatial', False):
        for delta in generate_spatial_perturbations():
            augmented_demos.append(perturb_demo(demo, delta))

    return augmented_demos

def time_warp_demo(demo, factor):
    """Speed up or slow down demonstration"""
    indices = np.linspace(0, len(demo)-1, int(len(demo) * factor))
    return [demo[int(i)] for i in indices]

def add_noise_to_demo(demo, noise_level):
    """Add small noise to actions"""
    noisy_demo = []
    for step in demo:
        noisy_step = step.copy()
        noisy_step['action'] = step['action'] + np.random.randn(*step['action'].shape) * noise_level
        noisy_demo.append(noisy_step)
    return noisy_demo

Saving in LeRobot Format

from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import pandas as pd

def save_demos_lerobot_format(demonstrations, output_dir):
    """Save demonstrations in LeRobot format"""

    os.makedirs(output_dir / 'episodes', exist_ok=True)
    os.makedirs(output_dir / 'videos', exist_ok=True)

    for episode_idx, demo in enumerate(demonstrations):
        # Create episode dataframe
        episode_data = []

        for step_idx, step in enumerate(demo):
            episode_data.append({
                'episode_index': episode_idx,
                'frame_index': step_idx,
                'timestamp': step['timestamp'],
                'observation.state': step['obs']['state'],
                'observation.image': f"videos/episode_{episode_idx:06d}_frame_{step_idx:06d}.png",
                'action': step['action'],
                'next.done': step_idx == len(demo) - 1
            })

        # Save episode parquet
        df = pd.DataFrame(episode_data)
        df.to_parquet(output_dir / f'episodes/episode_{episode_idx:06d}.parquet')

        # Save images
        for step_idx, step in enumerate(demo):
            if 'image' in step['obs']:
                cv2.imwrite(
                    str(output_dir / f"videos/episode_{episode_idx:06d}_frame_{step_idx:06d}.png"),
                    step['obs']['image']
                )

    # Save metadata
    metadata = {
        'total_episodes': len(demonstrations),
        'total_frames': sum(len(demo) for demo in demonstrations),
        'fps': 30,
        'robot': 'franka',
        'task': 'pick_and_place'
    }

    import json
    with open(output_dir / 'meta/info.json', 'w') as f:
        json.dump(metadata, f)

Best Practices

Collection Guidelines

  1. Consistency: Same strategy for same situations
  2. Diversity: Cover different scenarios and edge cases
  3. Quality: Smooth, successful demonstrations
  4. Quantity: Start with 50-100, add more if needed

Common Mistakes

Mistake Impact Solution
Inconsistent expert Confuses policy Use single expert or filter
Limited diversity Poor generalization Vary object poses, lighting
Too fast Hard to track Record at high frequency
No failures Can't learn recovery Include some corrective demos

Next Steps