Collecting Demonstration Data¶
Guide to collecting high-quality demonstration data for imitation learning.
Collection Methods¶
Teleoperation¶
Control robot remotely to create demonstrations.
class TeleoperationInterface:
def __init__(self, robot, input_device):
self.robot = robot
self.device = input_device # Joystick, SpaceMouse, VR controller
self.demonstrations = []
def collect_demonstration(self):
demo = {
'observations': [],
'actions': [],
'metadata': {}
}
obs = self.robot.reset()
done = False
while not done:
# Get teleoperation input
action = self.device.get_action()
# Scale and apply action
robot_action = self.scale_action(action)
next_obs, reward, done, info = self.robot.step(robot_action)
# Record
demo['observations'].append(obs)
demo['actions'].append(robot_action)
obs = next_obs
# Save if successful
if info.get('is_success', False):
self.demonstrations.append(demo)
return demo
Kinesthetic Teaching¶
Physically guide robot through motions.
class KinestheticTeaching:
def __init__(self, robot):
self.robot = robot
def start_recording(self):
# Enable gravity compensation mode
self.robot.set_compliance_mode(True)
self.recording_data = []
def record_step(self):
# Record current state and compute action
current_state = self.robot.get_state()
self.recording_data.append({
'timestamp': time.time(),
'joint_positions': current_state['q'],
'joint_velocities': current_state['dq'],
'ee_pose': current_state['ee_pose']
})
def stop_recording(self):
self.robot.set_compliance_mode(False)
# Compute actions from trajectory
actions = self.compute_actions_from_trajectory(self.recording_data)
return self.recording_data, actions
VR Interface¶
Use VR controllers for intuitive teleoperation.
class VRTeleoperation:
def __init__(self, robot):
self.robot = robot
self.vr_system = initialize_vr()
def collect_demo(self):
obs = self.robot.reset()
demo = []
while True:
# Get VR controller pose
controller_pose = self.vr_system.get_controller_pose()
# Map to robot action
target_ee_pose = self.map_vr_to_robot(controller_pose)
action = self.robot.compute_ik(target_ee_pose)
# Execute
next_obs, _, done, _ = self.robot.step(action)
demo.append({
'obs': obs,
'action': action,
'vr_pose': controller_pose
})
if self.vr_system.button_pressed('trigger'):
self.robot.close_gripper()
elif self.vr_system.button_pressed('grip'):
break # End demonstration
obs = next_obs
return demo
Data Quality¶
Quality Metrics¶
def assess_demonstration_quality(demo):
metrics = {}
# 1. Smoothness (low jerk)
actions = np.array([step['action'] for step in demo])
jerk = np.diff(actions, n=2, axis=0)
metrics['smoothness'] = -np.mean(np.abs(jerk))
# 2. Success
metrics['success'] = demo[-1].get('is_success', False)
# 3. Efficiency (short trajectory)
metrics['efficiency'] = 1.0 / len(demo)
# 4. Safety (no collisions)
metrics['safety'] = not any(step.get('collision', False) for step in demo)
# Overall score
metrics['quality_score'] = (
0.3 * metrics['smoothness'] +
0.4 * metrics['success'] +
0.2 * metrics['efficiency'] +
0.1 * metrics['safety']
)
return metrics
Data Filtering¶
def filter_demonstrations(demos, quality_threshold=0.7):
filtered = []
for demo in demos:
quality = assess_demonstration_quality(demo)
if quality['quality_score'] >= quality_threshold:
filtered.append(demo)
else:
print(f"Rejecting demo with quality {quality['quality_score']:.2f}")
print(f"Kept {len(filtered)}/{len(demos)} demonstrations")
return filtered
Data Augmentation¶
def augment_demonstration(demo, augmentation_config):
augmented_demos = [demo] # Include original
# Time warping
if augmentation_config.get('time_warp', False):
for factor in [0.8, 0.9, 1.1, 1.2]:
augmented_demos.append(time_warp_demo(demo, factor))
# Noise injection
if augmentation_config.get('noise', False):
for noise_level in [0.01, 0.02]:
augmented_demos.append(add_noise_to_demo(demo, noise_level))
# Spatial perturbation
if augmentation_config.get('spatial', False):
for delta in generate_spatial_perturbations():
augmented_demos.append(perturb_demo(demo, delta))
return augmented_demos
def time_warp_demo(demo, factor):
"""Speed up or slow down demonstration"""
indices = np.linspace(0, len(demo)-1, int(len(demo) * factor))
return [demo[int(i)] for i in indices]
def add_noise_to_demo(demo, noise_level):
"""Add small noise to actions"""
noisy_demo = []
for step in demo:
noisy_step = step.copy()
noisy_step['action'] = step['action'] + np.random.randn(*step['action'].shape) * noise_level
noisy_demo.append(noisy_step)
return noisy_demo
Saving in LeRobot Format¶
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import pandas as pd
def save_demos_lerobot_format(demonstrations, output_dir):
"""Save demonstrations in LeRobot format"""
os.makedirs(output_dir / 'episodes', exist_ok=True)
os.makedirs(output_dir / 'videos', exist_ok=True)
for episode_idx, demo in enumerate(demonstrations):
# Create episode dataframe
episode_data = []
for step_idx, step in enumerate(demo):
episode_data.append({
'episode_index': episode_idx,
'frame_index': step_idx,
'timestamp': step['timestamp'],
'observation.state': step['obs']['state'],
'observation.image': f"videos/episode_{episode_idx:06d}_frame_{step_idx:06d}.png",
'action': step['action'],
'next.done': step_idx == len(demo) - 1
})
# Save episode parquet
df = pd.DataFrame(episode_data)
df.to_parquet(output_dir / f'episodes/episode_{episode_idx:06d}.parquet')
# Save images
for step_idx, step in enumerate(demo):
if 'image' in step['obs']:
cv2.imwrite(
str(output_dir / f"videos/episode_{episode_idx:06d}_frame_{step_idx:06d}.png"),
step['obs']['image']
)
# Save metadata
metadata = {
'total_episodes': len(demonstrations),
'total_frames': sum(len(demo) for demo in demonstrations),
'fps': 30,
'robot': 'franka',
'task': 'pick_and_place'
}
import json
with open(output_dir / 'meta/info.json', 'w') as f:
json.dump(metadata, f)
Best Practices¶
Collection Guidelines¶
- Consistency: Same strategy for same situations
- Diversity: Cover different scenarios and edge cases
- Quality: Smooth, successful demonstrations
- Quantity: Start with 50-100, add more if needed
Common Mistakes¶
| Mistake | Impact | Solution |
|---|---|---|
| Inconsistent expert | Confuses policy | Use single expert or filter |
| Limited diversity | Poor generalization | Vary object poses, lighting |
| Too fast | Hard to track | Record at high frequency |
| No failures | Can't learn recovery | Include some corrective demos |