Skip to content

DAgger & Interactive Imitation Learning

DAgger (Dataset Aggregation) solves behavioral cloning's distribution shift problem through iterative expert feedback.

Overview

DAgger addresses the fundamental limitation of behavioral cloning: distribution shift between expert and learned policy.

Key idea: Train on states visited by the learned policy (not just expert states):

  1. Train policy \(\pi\) on current dataset \(\mathcal{D}\)
  2. Execute \(\pi\) to collect states \(s \sim \pi\)
  3. Query expert for actions \(a^* = \pi^*(s)\) at those states
  4. Aggregate: \(\mathcal{D} \leftarrow \mathcal{D} \cup \{(s, a^*)\}\)
  5. Repeat

Advantages over BC: - ✓ No distribution shift (trains on learned policy's distribution) - ✓ Provably reduces error to \(O(T)\) instead of \(O(T^2)\) - ✓ Needs fewer expert demos initially - ✓ Recovers from errors gracefully

Challenges: - ✗Requires expert to be available during training - ✗Can be expensive (expert labeling cost) - ✗Assumes expert can label any state (may be infeasible)

Mathematical Foundation

Error Bounds

Behavioral Cloning error: $$ \mathbb{E}[\text{cost}(\pi_{BC})] \leq \mathbb{E}[\text{cost}(\pi^*)] + O(\epsilon T^2) $$

DAgger error (with \(N\) iterations): $$ \mathbb{E}[\text{cost}(\pi_{DAgger})] \leq \mathbb{E}[\text{cost}(\pi^*)] + O\left(\frac{\epsilon T}{N}\right) $$

Where: - \(\epsilon\) = supervised learning error - \(T\) = horizon length - \(N\) = number of DAgger iterations

Key insight: Quadratic → Linear improvement!

Algorithm

Initialize dataset D with expert demonstrations
for iteration i = 1 to N do:
    Train policy π_i on D
    Execute π_i to collect trajectories
    Query expert for actions at visited states
    Aggregate: D ← D ∪ {(s, a*)}
end for
return π_N

Complete Implementation

Core DAgger Algorithm

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque

class DAgger:
    """
    Dataset Aggregation (DAgger)

    Reference: Ross et al., AISTATS 2011
    """
    def __init__(
        self,
        state_dim,
        action_dim,
        expert_policy,
        hidden_dims=[256, 256],
        lr=1e-3,
        batch_size=256,
        device='cuda'
    ):
        self.expert_policy = expert_policy
        self.batch_size = batch_size
        self.device = device

        # Build policy network (same as BC)
        self.policy = self._build_network(
            state_dim, action_dim, hidden_dims
        ).to(device)

        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

        # Aggregated dataset
        self.dataset_states = []
        self.dataset_actions = []

    def _build_network(self, input_dim, output_dim, hidden_dims):
        """Build MLP policy"""
        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim

        layers.append(nn.Linear(prev_dim, output_dim))

        return nn.Sequential(*layers)

    def dagger_iteration(self, env, num_rollouts=50, mixing_ratio=1.0):
        """
        Single DAgger iteration

        Args:
            env: Environment
            num_rollouts: Number of trajectories to collect
            mixing_ratio: β parameter (1.0 = always expert, 0.0 = never expert)
        """
        new_states = []
        new_actions = []

        for rollout in range(num_rollouts):
            state = env.reset()
            done = False

            while not done:
                # Decide whether to use expert or learned policy
                use_expert = np.random.random() < mixing_ratio

                if use_expert:
                    # Execute expert action
                    action = self.expert_policy(state)
                else:
                    # Execute learned policy action
                    action = self.predict(state)

                # Always query expert for label
                expert_action = self.expert_policy(state)

                # Store (state, expert_action) pair
                new_states.append(state)
                new_actions.append(expert_action)

                # Environment step
                state, reward, done, info = env.step(action)

        # Aggregate into dataset
        self.dataset_states.extend(new_states)
        self.dataset_actions.extend(new_actions)

        print(f"Dataset size: {len(self.dataset_states)}")

    def train_on_aggregated_dataset(self, num_epochs=10):
        """Train policy on full aggregated dataset"""
        states = torch.FloatTensor(np.array(self.dataset_states)).to(self.device)
        actions = torch.FloatTensor(np.array(self.dataset_actions)).to(self.device)

        dataset = torch.utils.data.TensorDataset(states, actions)
        dataloader = torch.utils.data.DataLoader(
            dataset, batch_size=self.batch_size, shuffle=True
        )

        for epoch in range(num_epochs):
            total_loss = 0.0

            for batch_states, batch_actions in dataloader:
                # Forward pass
                pred_actions = self.policy(batch_states)

                # MSE loss
                loss = nn.MSELoss()(pred_actions, batch_actions)

                # Backward pass
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                total_loss += loss.item()

            if epoch % 5 == 0:
                avg_loss = total_loss / len(dataloader)
                print(f"Epoch {epoch}: Loss = {avg_loss:.6f}")

    def train(
        self,
        env,
        num_iterations=10,
        rollouts_per_iteration=50,
        epochs_per_iteration=10,
        mixing_schedule='linear'
    ):
        """
        Full DAgger training loop

        Args:
            env: Environment
            num_iterations: Number of DAgger iterations
            rollouts_per_iteration: Trajectories per iteration
            epochs_per_iteration: Training epochs per iteration
            mixing_schedule: 'linear', 'exponential', or 'constant'
        """
        print("Starting DAgger training...")

        for iteration in range(num_iterations):
            print(f"\n=== DAgger Iteration {iteration + 1}/{num_iterations} ===")

            # Compute mixing ratio (β)
            if mixing_schedule == 'linear':
                mixing_ratio = 1.0 - (iteration / num_iterations)
            elif mixing_schedule == 'exponential':
                mixing_ratio = 0.5 ** iteration
            else:  # constant
                mixing_ratio = 1.0

            print(f"Mixing ratio (β): {mixing_ratio:.3f}")

            # Collect data
            self.dagger_iteration(
                env,
                num_rollouts=rollouts_per_iteration,
                mixing_ratio=mixing_ratio
            )

            # Train on aggregated dataset
            self.train_on_aggregated_dataset(num_epochs=epochs_per_iteration)

            # Evaluate
            if iteration % 2 == 0:
                avg_reward = self.evaluate(env, num_episodes=10)
                print(f"Average reward: {avg_reward:.2f}")

        print("\nDAgger training complete!")

    def predict(self, state):
        """Predict action"""
        self.policy.eval()
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            action = self.policy(state_tensor).cpu().numpy()[0]
        return action

    def evaluate(self, env, num_episodes=10):
        """Evaluate current policy"""
        total_rewards = []

        for episode in range(num_episodes):
            state = env.reset()
            episode_reward = 0
            done = False

            while not done:
                action = self.predict(state)
                state, reward, done, info = env.step(action)
                episode_reward += reward

            total_rewards.append(episode_reward)

        return np.mean(total_rewards)

    def save(self, path):
        """Save policy"""
        torch.save({
            'policy_state_dict': self.policy.state_dict(),
            'dataset_states': self.dataset_states,
            'dataset_actions': self.dataset_actions
        }, path)

    def load(self, path):
        """Load policy"""
        checkpoint = torch.load(path)
        self.policy.load_state_dict(checkpoint['policy_state_dict'])
        self.dataset_states = checkpoint['dataset_states']
        self.dataset_actions = checkpoint['dataset_actions']

Example Usage

import gymnasium as gym

# Create environment
env = gym.make('HalfCheetah-v4')

# Define expert policy (e.g., pre-trained RL agent)
def expert_policy(state):
    # Your expert implementation
    # Could be: pre-trained SAC, human teleop, etc.
    return expert_model.predict(state)

# Create DAgger agent
dagger = DAgger(
    state_dim=env.observation_space.shape[0],
    action_dim=env.action_space.shape[0],
    expert_policy=expert_policy,
    hidden_dims=[256, 256],
    lr=1e-3
)

# Train
dagger.train(
    env=env,
    num_iterations=10,
    rollouts_per_iteration=50,
    epochs_per_iteration=10,
    mixing_schedule='linear'
)

# Evaluate
final_reward = dagger.evaluate(env, num_episodes=100)
print(f"Final average reward: {final_reward:.2f}")

# Save
dagger.save('dagger_policy.pth')

Variants & Improvements

SafeDAgger

Safe exploration during data collection:

class SafeDAgger(DAgger):
    """DAgger with safety constraints"""

    def dagger_iteration(
        self,
        env,
        num_rollouts=50,
        mixing_ratio=1.0,
        safety_threshold=0.1
    ):
        """DAgger iteration with safety fallback"""
        new_states = []
        new_actions = []

        for rollout in range(num_rollouts):
            state = env.reset()
            done = False

            while not done:
                # Get learned policy action
                learned_action = self.predict(state)

                # Get expert action
                expert_action = self.expert_policy(state)

                # Check if learned action is safe
                action_diff = np.linalg.norm(learned_action - expert_action)

                if action_diff > safety_threshold:
                    # Use expert action (unsafe deviation)
                    action = expert_action
                else:
                    # Use learned action (safe)
                    use_expert = np.random.random() < mixing_ratio
                    action = expert_action if use_expert else learned_action

                # Store expert label
                new_states.append(state)
                new_actions.append(expert_action)

                state, reward, done, info = env.step(action)

        self.dataset_states.extend(new_states)
        self.dataset_actions.extend(new_actions)

HG-DAgger (Human-Gated DAgger)

Query expert only when uncertain:

class HGDAgger(DAgger):
    """Human-Gated DAgger - query expert selectively"""

    def __init__(self, *args, uncertainty_threshold=0.1, **kwargs):
        super().__init__(*args, **kwargs)
        self.uncertainty_threshold = uncertainty_threshold

        # Build ensemble for uncertainty estimation
        self.ensemble = nn.ModuleList([
            self._build_network(args[0], args[1], kwargs.get('hidden_dims', [256, 256]))
            for _ in range(5)
        ])

    def estimate_uncertainty(self, state):
        """Estimate uncertainty using ensemble disagreement"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)

        predictions = []
        for model in self.ensemble:
            model.eval()
            with torch.no_grad():
                pred = model(state_tensor).cpu().numpy()[0]
                predictions.append(pred)

        predictions = np.array(predictions)
        uncertainty = np.std(predictions, axis=0).mean()

        return uncertainty

    def dagger_iteration(self, env, num_rollouts=50):
        """Query expert only when uncertain"""
        new_states = []
        new_actions = []
        num_queries = 0

        for rollout in range(num_rollouts):
            state = env.reset()
            done = False

            while not done:
                # Get learned action
                action = self.predict(state)

                # Estimate uncertainty
                uncertainty = self.estimate_uncertainty(state)

                # Query expert if uncertain
                if uncertainty > self.uncertainty_threshold:
                    expert_action = self.expert_policy(state)
                    new_states.append(state)
                    new_actions.append(expert_action)
                    num_queries += 1

                    # Use expert action
                    action = expert_action

                state, reward, done, info = env.step(action)

        print(f"Expert queries: {num_queries}")

        self.dataset_states.extend(new_states)
        self.dataset_actions.extend(new_actions)

DART (Disturbance-based Reward for Adversarial Robust Imitation Learning)

Add noise injection for robustness:

class DART(DAgger):
    """
    DAgger with Adversarial Noise Injection

    Reference: Mandlekar et al., CoRL 2020
    """

    def dagger_iteration(
        self,
        env,
        num_rollouts=50,
        noise_level=0.1,
        noise_schedule='constant'
    ):
        """DAgger with adversarial noise"""
        new_states = []
        new_actions = []

        for rollout in range(num_rollouts):
            state = env.reset()
            done = False
            timestep = 0

            while not done:
                # Add noise to action
                if noise_schedule == 'decay':
                    current_noise = noise_level * (0.9 ** timestep)
                else:
                    current_noise = noise_level

                # Get policy action
                base_action = self.predict(state)

                # Add Gaussian noise
                noise = np.random.normal(0, current_noise, size=base_action.shape)
                noisy_action = base_action + noise
                noisy_action = np.clip(noisy_action, -1, 1)

                # Query expert
                expert_action = self.expert_policy(state)

                # Store expert label
                new_states.append(state)
                new_actions.append(expert_action)

                # Execute noisy action
                state, reward, done, info = env.step(noisy_action)
                timestep += 1

        self.dataset_states.extend(new_states)
        self.dataset_actions.extend(new_actions)

Practical Considerations

Expert Query Budget

Limit expert labeling cost:

class BudgetedDAgger(DAgger):
    """DAgger with limited expert budget"""

    def __init__(self, *args, total_budget=10000, **kwargs):
        super().__init__(*args, **kwargs)
        self.total_budget = total_budget
        self.queries_used = 0

    def dagger_iteration(self, env, num_rollouts=50, prioritize_states=True):
        """Prioritize which states to query"""
        # Collect states
        collected_states = []
        collected_info = []

        for rollout in range(num_rollouts):
            state = env.reset()
            done = False

            while not done:
                action = self.predict(state)

                # Store state and metadata
                collected_states.append(state)
                collected_info.append({
                    'timestep': len(collected_states),
                    'rollout': rollout
                })

                state, reward, done, info = env.step(action)

        # Prioritize states to query
        if prioritize_states:
            # Query states with high uncertainty (using ensemble)
            uncertainties = [
                self.estimate_uncertainty(s) for s in collected_states
            ]
            sorted_indices = np.argsort(uncertainties)[::-1]
        else:
            # Random sampling
            sorted_indices = np.random.permutation(len(collected_states))

        # Query up to budget
        queries_remaining = self.total_budget - self.queries_used
        num_to_query = min(queries_remaining, len(collected_states))

        for idx in sorted_indices[:num_to_query]:
            state = collected_states[idx]
            expert_action = self.expert_policy(state)

            self.dataset_states.append(state)
            self.dataset_actions.append(expert_action)
            self.queries_used += 1

        print(f"Queries used: {self.queries_used}/{self.total_budget}")

Asynchronous Expert Queries

For human experts (slower feedback):

class AsyncDAgger(DAgger):
    """DAgger with asynchronous expert labeling"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.query_queue = deque()
        self.labeled_queue = deque()

    def collect_states_for_labeling(self, env, num_rollouts=50):
        """Collect states, queue for expert labeling"""
        for rollout in range(num_rollouts):
            state = env.reset()
            done = False

            while not done:
                action = self.predict(state)

                # Add to query queue
                self.query_queue.append({
                    'state': state.copy(),
                    'timestamp': time.time()
                })

                state, reward, done, info = env.step(action)

        print(f"Queued {len(self.query_queue)} states for labeling")

    def process_expert_labels(self, labeled_data):
        """Process expert-labeled data"""
        for item in labeled_data:
            state = item['state']
            action = item['action']

            self.dataset_states.append(state)
            self.dataset_actions.append(action)

        print(f"Processed {len(labeled_data)} expert labels")

    def train_async(self, env, check_interval=60):
        """Asynchronous training loop"""
        iteration = 0

        while True:
            # Collect states
            self.collect_states_for_labeling(env, num_rollouts=10)

            # Wait for expert labels
            print(f"Waiting for expert labels... (checking every {check_interval}s)")
            time.sleep(check_interval)

            # Check for new labels (placeholder - implement your labeling interface)
            labeled_data = check_for_new_labels()

            if len(labeled_data) > 0:
                # Process labels
                self.process_expert_labels(labeled_data)

                # Train on updated dataset
                self.train_on_aggregated_dataset(num_epochs=10)

                iteration += 1
                print(f"Completed iteration {iteration}")

Comparison with Other Methods

DAgger vs BC vs DAgger Variants

Method Sample Complexity Expert Queries Stability Best For
BC High 0 (after initial) High Large offline datasets
DAgger Medium Many Medium Interactive expert available
SafeDAgger Medium Many Very High Safety-critical applications
HG-DAgger Low Few Medium Limited expert budget
DART Medium Many Very High Robust real-world deployment

Common Issues & Solutions

Problem: Expert Fatigue

Symptoms: Expert quality degrades over time

Solutions:

# 1. Batch queries
batch_size = 100
query_expert_in_batches(states, batch_size)

# 2. Prioritize important queries (HG-DAgger)
query_only_uncertain_states()

# 3. Limit session length
max_queries_per_session = 500

Problem: Policy Oscillation

Symptoms: Policy keeps changing, doesn't converge

Solutions:

# 1. Reduce learning rate over time
def get_lr_schedule(iteration):
    initial_lr = 1e-3
    return initial_lr * (0.9 ** iteration)

# 2. Use exponential moving average
class EMAPolicy:
    def __init__(self, policy, alpha=0.1):
        self.policy = policy
        self.ema_params = copy.deepcopy(list(policy.parameters()))
        self.alpha = alpha

    def update_ema(self):
        for ema_param, param in zip(self.ema_params, self.policy.parameters()):
            ema_param.data = (
                self.alpha * param.data +
                (1 - self.alpha) * ema_param.data
            )

# 3. Early stopping
if validation_loss_not_improving_for_n_iterations(n=3):
    break

Problem: Expensive Expert

Symptoms: Expert labeling is prohibitively expensive

Solutions:

# 1. Use BC initialization
# Start with large BC dataset, then fine-tune with DAgger

# 2. Active learning (HG-DAgger)
# Query only uncertain/important states

# 3. Synthetic expert
# Use simulation/planning as "expert"
def synthetic_expert(state):
    # MPC, sampling-based planning, etc.
    return mpc_planner.plan(state)

When to Use DAgger

Use DAgger when: - ✓ Expert available during training - ✓ Expert can label any state (even off-distribution) - ✓ Long-horizon tasks (BC fails) - ✓ Limited initial demonstration data

Use HG-DAgger when: - ✓ Expert expensive/limited - ✓ Want to minimize expert queries - ✓ Can estimate uncertainty

Use DART when: - ✓ Need robustness to perturbations - ✓ Deploying to real world - ✓ Safety is critical

References

Papers

  1. DAgger: Ross et al., "A Reduction of Imitation Learning and Structured Prediction to No-Regret Online Learning", AISTATS 2011 (PDF)
  2. SafeDAgger: Zhang & Cho, "Deep Imitation Learning for Complex Manipulation Tasks from Virtual Reality Teleoperation", ICRA 2018
  3. HG-DAgger: Kelly et al., "HG-DAgger: Interactive Imitation Learning with Human Experts", ICRA 2019
  4. DART: Mandlekar et al., "IRIS: Implicit Reinforcement without Interaction at Scale for Learning Control from Offline Robot Manipulation Data", CoRL 2020

Books

  1. Sutton & Barto, "Reinforcement Learning: An Introduction", 2nd Edition
  2. Section on imitation learning

Code

  • Imitation Library: https://github.com/HumanCompatibleAI/imitation
  • Includes DAgger implementation
  • IRIS (includes DART): https://github.com/NVlabs/IRIS

Tutorials

  • Interactive IL Tutorial: https://sites.google.com/view/icra19-ilbc

Next Steps