Behavioral Cloning¶
Behavioral Cloning (BC) is the simplest imitation learning method: directly learn a policy from expert demonstrations using supervised learning.
Overview¶
Behavioral Cloning treats imitation learning as supervised learning:
- Input: Expert demonstrations \((s, a)\) pairs
- Output: Policy \(\pi_\theta(a|s)\) that mimics expert
- Training: Standard supervised learning (cross-entropy, MSE)
Advantages: - ✓ Simple to implement (just supervised learning) - ✓ No reward function needed - ✓ Works well with large datasets - ✓ Offline learning (safe)
Challenges: - ✗Distribution shift (compounding errors) - ✗Requires large, diverse datasets - ✗Doesn't generalize beyond demonstrations - ✗Causal confusion
Mathematical Foundation¶
¶
Basic Formulation
Given expert demonstrations \(\mathcal{D} = \{(s_i, a_i)\}_{i=1}^N\), learn policy \(\pi_\theta\) by minimizing:
For continuous actions: $$ \mathcal{L}(\theta) = \mathbb{E}{(s,a) \sim \mathcal{D}} \left[ ||a - \pi\theta(s)||^2 \right] $$
For discrete actions: $$ \mathcal{L}(\theta) = \mathbb{E}{(s,a) \sim \mathcal{D}} \left[ -\log \pi\theta(a|s) \right] $$
Distribution Shift Problem¶
The key challenge: Training distribution \(p_{expert}(s)\) ≠Deployment distribution \(p_{\pi_\theta}(s)\)
If policy makes small error at state \(s_1\): - Reaches slightly different state \(s_2'\) (not in training data) - Makes another error → reaches \(s_3'\) - Errors compound exponentially over time
Result: Behavioral cloning error grows as \(O(T^2)\) where \(T\) is horizon length.
Complete Implementation¶
Continuous Control¶
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader
class BehavioralCloning:
"""
Behavioral Cloning for continuous control
Simple supervised learning approach to imitation learning
"""
def __init__(
self,
state_dim,
action_dim,
hidden_dims=[256, 256],
activation='relu',
lr=1e-3,
batch_size=256,
device='cuda'
):
self.state_dim = state_dim
self.action_dim = action_dim
self.batch_size = batch_size
self.device = device
# Build policy network
self.policy = self._build_network(
state_dim, action_dim, hidden_dims, activation
).to(device)
# Optimizer
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
def _build_network(self, input_dim, output_dim, hidden_dims, activation):
"""Build MLP policy network"""
layers = []
# Activation function
if activation == 'relu':
act_fn = nn.ReLU
elif activation == 'tanh':
act_fn = nn.Tanh
elif activation == 'elu':
act_fn = nn.ELU
else:
raise ValueError(f"Unknown activation: {activation}")
# Hidden layers
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(act_fn())
prev_dim = hidden_dim
# Output layer
layers.append(nn.Linear(prev_dim, output_dim))
# Optional: Add tanh to bound actions to [-1, 1]
# layers.append(nn.Tanh())
return nn.Sequential(*layers)
def train(self, expert_dataset, num_epochs=100, val_split=0.1):
"""
Train BC policy on expert demonstrations
Args:
expert_dataset: Dataset of (state, action) pairs
num_epochs: Number of training epochs
val_split: Validation split fraction
"""
# Split into train/val
val_size = int(len(expert_dataset) * val_split)
train_size = len(expert_dataset) - val_size
train_dataset, val_dataset = torch.utils.data.random_split(
expert_dataset, [train_size, val_size]
)
train_loader = DataLoader(
train_dataset, batch_size=self.batch_size, shuffle=True
)
val_loader = DataLoader(
val_dataset, batch_size=self.batch_size, shuffle=False
)
best_val_loss = float('inf')
for epoch in range(num_epochs):
# Training
train_loss = self._train_epoch(train_loader)
# Validation
val_loss = self._validate(val_loader)
# Save best model
if val_loss < best_val_loss:
best_val_loss = val_loss
self.save('best_bc_policy.pth')
if epoch % 10 == 0:
print(f"Epoch {epoch}/{num_epochs}: "
f"Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}")
return train_loss, val_loss
def _train_epoch(self, dataloader):
"""Single training epoch"""
self.policy.train()
total_loss = 0.0
for states, actions in dataloader:
states = states.to(self.device)
actions = actions.to(self.device)
# Forward pass
pred_actions = self.policy(states)
# MSE loss
loss = nn.MSELoss()(pred_actions, actions)
# Backward pass
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def _validate(self, dataloader):
"""Validation"""
self.policy.eval()
total_loss = 0.0
with torch.no_grad():
for states, actions in dataloader:
states = states.to(self.device)
actions = actions.to(self.device)
pred_actions = self.policy(states)
loss = nn.MSELoss()(pred_actions, actions)
total_loss += loss.item()
return total_loss / len(dataloader)
def predict(self, state):
"""Predict action for given state"""
self.policy.eval()
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
action = self.policy(state_tensor).cpu().numpy()[0]
return action
def save(self, path):
"""Save policy"""
torch.save({
'policy_state_dict': self.policy.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
}, path)
def load(self, path):
"""Load policy"""
checkpoint = torch.load(path)
self.policy.load_state_dict(checkpoint['policy_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
class ExpertDataset(Dataset):
"""Dataset of expert demonstrations"""
def __init__(self, states, actions):
self.states = torch.FloatTensor(states)
self.actions = torch.FloatTensor(actions)
def __len__(self):
return len(self.states)
def __getitem__(self, idx):
return self.states[idx], self.actions[idx]
def collect_expert_data(env, expert_policy, num_episodes=100):
"""Collect expert demonstrations"""
states = []
actions = []
for episode in range(num_episodes):
state = env.reset()
done = False
while not done:
# Get expert action
action = expert_policy(state)
# Store transition
states.append(state)
actions.append(action)
# Environment step
state, reward, done, info = env.step(action)
return np.array(states), np.array(actions)
def train_bc_from_demonstrations(states, actions, state_dim, action_dim):
"""Complete BC training pipeline"""
# Create dataset
dataset = ExpertDataset(states, actions)
# Create BC agent
bc = BehavioralCloning(
state_dim=state_dim,
action_dim=action_dim,
hidden_dims=[256, 256],
lr=1e-3,
batch_size=256
)
# Train
train_loss, val_loss = bc.train(dataset, num_epochs=100)
print(f"Training complete. Final validation loss: {val_loss:.4f}")
return bc
Discrete Actions¶
For discrete action spaces (e.g., robotics with discrete primitives):
class BehavioralCloningDiscrete:
"""BC for discrete actions (classification)"""
def __init__(self, state_dim, num_actions, hidden_dims=[256, 256], lr=1e-3, device='cuda'):
self.device = device
# Build classifier
layers = []
prev_dim = state_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, num_actions))
self.policy = nn.Sequential(*layers).to(device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
def train_epoch(self, dataloader):
"""Training with cross-entropy loss"""
self.policy.train()
total_loss = 0.0
total_correct = 0
total_samples = 0
for states, actions in dataloader:
states = states.to(self.device)
actions = actions.to(self.device).long()
# Forward pass
logits = self.policy(states)
# Cross-entropy loss
loss = nn.CrossEntropyLoss()(logits, actions)
# Backward pass
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Accuracy
pred_actions = torch.argmax(logits, dim=1)
correct = (pred_actions == actions).sum().item()
total_loss += loss.item()
total_correct += correct
total_samples += len(actions)
accuracy = total_correct / total_samples
return total_loss / len(dataloader), accuracy
def predict(self, state):
"""Predict action (greedy)"""
self.policy.eval()
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
logits = self.policy(state_tensor)
action = torch.argmax(logits, dim=1).item()
return action
Improvements to Basic BC¶
1. Data Augmentation¶
Increase dataset diversity:
def augment_state(state, noise_std=0.01):
"""Add noise to states for data augmentation"""
noise = np.random.normal(0, noise_std, size=state.shape)
return state + noise
def augment_dataset(states, actions, augmentation_factor=3):
"""Augment dataset by adding noisy copies"""
aug_states = [states]
aug_actions = [actions]
for _ in range(augmentation_factor - 1):
noisy_states = augment_state(states, noise_std=0.01)
aug_states.append(noisy_states)
aug_actions.append(actions)
return np.concatenate(aug_states), np.concatenate(aug_actions)
2. Dropout Regularization¶
Prevent overfitting:
class BCWithDropout(nn.Module):
"""BC policy with dropout"""
def __init__(self, state_dim, action_dim, dropout_rate=0.1):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(256, 256),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(256, action_dim)
)
def forward(self, state):
return self.network(state)
3. Action Regularization¶
Encourage smooth actions:
def train_with_action_regularization(model, dataloader, optimizer, reg_coef=0.01):
"""Train with action smoothness regularization"""
for states, actions in dataloader:
# Prediction loss
pred_actions = model(states)
pred_loss = nn.MSELoss()(pred_actions, actions)
# Action regularization (L2 norm)
action_norm = torch.mean(pred_actions ** 2)
# Total loss
loss = pred_loss + reg_coef * action_norm
optimizer.zero_grad()
loss.backward()
optimizer.step()
4. Ensemble Methods¶
Use ensemble to improve robustness:
class BCEnsemble:
"""Ensemble of BC policies"""
def __init__(self, state_dim, action_dim, num_models=5, hidden_dims=[256, 256]):
self.models = [
BehavioralCloning(state_dim, action_dim, hidden_dims)
for _ in range(num_models)
]
def train_ensemble(self, dataset, num_epochs=100):
"""Train all models in ensemble"""
for i, model in enumerate(self.models):
print(f"Training model {i+1}/{len(self.models)}")
model.train(dataset, num_epochs=num_epochs)
def predict(self, state):
"""Predict by averaging ensemble"""
predictions = [model.predict(state) for model in self.models]
return np.mean(predictions, axis=0)
def predict_with_uncertainty(self, state):
"""Predict with uncertainty estimate"""
predictions = np.array([model.predict(state) for model in self.models])
mean = np.mean(predictions, axis=0)
std = np.std(predictions, axis=0)
return mean, std
Advanced Techniques¶
Goal-Conditioned BC¶
For multi-task imitation:
class GoalConditionedBC(nn.Module):
"""BC with goal conditioning"""
def __init__(self, state_dim, goal_dim, action_dim):
super().__init__()
# Concatenate state and goal
input_dim = state_dim + goal_dim
self.network = nn.Sequential(
nn.Linear(input_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim)
)
def forward(self, state, goal):
x = torch.cat([state, goal], dim=-1)
return self.network(x)
# Training
def train_goal_conditioned(model, states, goals, actions):
"""Train goal-conditioned policy"""
optimizer = optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(num_epochs):
# Forward pass
pred_actions = model(states, goals)
# Loss
loss = nn.MSELoss()(pred_actions, actions)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
BC from Observations (BCO)¶
Learn from observational data (no actions):
class BCO:
"""
Behavioral Cloning from Observations
Learn policy from state-only trajectories using
inverse dynamics model
"""
def __init__(self, state_dim, action_dim):
# Forward dynamics: (s_t, a_t) -> s_{t+1}
self.forward_model = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, state_dim)
)
# Inverse dynamics: (s_t, s_{t+1}) -> a_t
self.inverse_model = nn.Sequential(
nn.Linear(state_dim * 2, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim)
)
# Policy: s_t -> a_t
self.policy = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim)
)
def train_inverse_model(self, states, next_states, actions):
"""Train inverse dynamics model"""
optimizer = optim.Adam(self.inverse_model.parameters(), lr=1e-3)
for epoch in range(num_epochs):
# Predict actions from state transitions
state_pairs = torch.cat([states, next_states], dim=-1)
pred_actions = self.inverse_model(state_pairs)
loss = nn.MSELoss()(pred_actions, actions)
optimizer.zero_grad()
loss.backward()
optimizer.step()
def train_policy_from_observations(self, state_trajectories):
"""Train policy from observation-only data"""
optimizer = optim.Adam(self.policy.parameters(), lr=1e-3)
for epoch in range(num_epochs):
for traj in state_trajectories:
states = traj[:-1]
next_states = traj[1:]
# Infer actions using inverse model
state_pairs = torch.cat([states, next_states], dim=-1)
with torch.no_grad():
inferred_actions = self.inverse_model(state_pairs)
# Train policy to match inferred actions
pred_actions = self.policy(states)
loss = nn.MSELoss()(pred_actions, inferred_actions)
optimizer.zero_grad()
loss.backward()
optimizer.step()
Practical Tips¶
Dataset Requirements¶
Minimum dataset size (rule of thumb):
# For simple tasks (CartPole, Reacher)
min_demos = 10-50
# For manipulation (pick-and-place)
min_demos = 100-500
# For locomotion (quadruped walking)
min_demos = 500-2000
# For complex tasks (mobile manipulation)
min_demos = 2000-10000
Data quality matters more than quantity: - ✓ Diverse start states - ✓ Successful demonstrations only (no failures) - ✓ Consistent expert behavior - ✓ Cover full state distribution
Hyperparameter Tuning¶
# Conservative (prevent overfitting)
bc_conservative = BehavioralCloning(
hidden_dims=[128, 128],
lr=1e-4,
batch_size=128
)
# Standard (balanced)
bc_standard = BehavioralCloning(
hidden_dims=[256, 256],
lr=1e-3,
batch_size=256
)
# Aggressive (large dataset)
bc_aggressive = BehavioralCloning(
hidden_dims=[512, 512, 256],
lr=3e-3,
batch_size=512
)
Evaluation¶
def evaluate_bc_policy(policy, env, num_episodes=10):
"""Evaluate BC policy"""
successes = 0
total_rewards = []
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
done = False
while not done:
action = policy.predict(state)
state, reward, done, info = env.step(action)
episode_reward += reward
total_rewards.append(episode_reward)
if info.get('is_success', False):
successes += 1
success_rate = successes / num_episodes
mean_reward = np.mean(total_rewards)
print(f"Success Rate: {success_rate:.2%}")
print(f"Mean Reward: {mean_reward:.2f}")
return success_rate, mean_reward
Common Issues & Solutions¶
Problem: Poor Generalization¶
Symptoms: Works on training states, fails on novel states
Solutions:
# 1. Data augmentation
states_aug, actions_aug = augment_dataset(states, actions, augmentation_factor=5)
# 2. More diverse demonstrations
# Collect demos from different start states, experts, conditions
# 3. Regularization
model_with_dropout = BCWithDropout(state_dim, action_dim, dropout_rate=0.2)
# 4. Ensemble
ensemble = BCEnsemble(state_dim, action_dim, num_models=5)
Problem: Compounding Errors¶
Symptoms: Policy diverges over long horizons
Solutions:
# 1. Use DAgger instead (collect on-policy data)
# See dagger.md
# 2. Limit horizon
max_timesteps = 100 # Cap episode length during deployment
# 3. Add noise injection during training
def train_with_noise_injection(policy, states, actions):
for state, action in zip(states, actions):
# Add noise to simulate distribution shift
noisy_state = state + np.random.normal(0, 0.05, state.shape)
# Train on noisy states
loss = policy.train_step(noisy_state, action)
Problem: Causal Confusion¶
Symptoms: Policy learns spurious correlations
Example: Robot learns to "open door when door is open" instead of "open door by turning handle"
Solutions:
# 1. Remove confounding observations
# Only include task-relevant observations
# 2. Temporal data augmentation
# Shuffle temporal order to break spurious correlations
# 3. Counterfactual data augmentation
def counterfactual_augmentation(states, actions):
# Modify states while keeping actions same
# to break spurious correlations
aug_states = states.copy()
# E.g., randomize background while keeping task objects same
aug_states[:, background_dims] = np.random.random(background_dims)
return aug_states, actions
When to Use Behavioral Cloning¶
Use BC when: - ✓ You have large, diverse expert dataset - ✓ Task is relatively simple (short horizon) - ✓ Expert policy is deterministic - ✓ Safety critical (offline learning) - ✓ No reward function available
Don't use BC when: - ✗Limited demonstration data - ✗Long-horizon tasks (use DAgger or RL) - ✗Highly stochastic task - ✗Need to exceed expert performance
References¶
Papers¶
- Behavioral Cloning: Pomerleau, "ALVINN: An Autonomous Land Vehicle in a Neural Network", NeurIPS 1989
- Distribution Shift: Ross et al., "A Reduction of Imitation Learning and Structured Prediction to No-Regret Online Learning", AISTATS 2011
- BCO: Torabi et al., "Behavioral Cloning from Observation", IJCAI 2018
Books¶
- Sutton & Barto, "Reinforcement Learning: An Introduction", 2nd Edition, 2018
- Chapter on imitation learning
Code¶
- Imitation Library: https://github.com/HumanCompatibleAI/imitation
- CleanRL BC: https://github.com/vwxyzjn/cleanrl/blob/master/cleanrl/bc.py
- SB3 BC: Part of Imitation library above
Next Steps¶
- DAgger - Fix distribution shift problem
- Inverse RL - Learn reward function from demonstrations
- Diffusion Policies - State-of-the-art IL with diffusion
- ACT - Action chunking for better performance