Skip to content

Inverse Reinforcement Learning

Inverse Reinforcement Learning (IRL) learns the reward function that explains expert behavior, enabling generalization beyond demonstrations.

Overview

Inverse RL inverts the reinforcement learning problem:

  • Standard RL: Given reward → Learn policy
  • Inverse RL: Given policy (expert demos) → Learn reward

Key insight: Expert behavior reveals their underlying objectives

Advantages: - ✓ Learn transferable reward function - ✓ Generalize to new states/tasks - ✓ Understand expert intentions - ✓ Enable reward shaping

Challenges: - ✗Reward ambiguity (many rewards explain same behavior) - ✗Computationally expensive (requires RL in inner loop) - ✗Assumes expert is optimal - ✗Requires environment dynamics

Key algorithms: - MaxEnt IRL (Maximum Entropy IRL) - GAIL (Generative Adversarial Imitation Learning) - AIRL (Adversarial Inverse RL) - IQ-Learn (Implicit Q-Learning)

Mathematical Foundation

Problem Formulation

Given: - MDP without reward: \((S, A, P, \gamma)\) - Expert demonstrations: \(\tau^* = \{(s_0, a_0), (s_1, a_1), ...\}\)

Find reward function \(r(s, a)\) such that expert policy \(\pi^*\) is optimal.

Reward Ambiguity

Problem: Infinitely many rewards explain the same policy!

Examples: - \(r(s, a) = 0\) everywhere (degenerate) - \(r(s, a) = c\) (constant) for all expert actions - \(r'(s, a) = \alpha \cdot r(s, a)\) (scaling)

Solution approaches: 1. Maximum margin: Expert better than any other policy by margin 2. Maximum entropy: Among all explaining rewards, prefer max entropy policy 3. Apprenticeship learning: Match feature expectations

Maximum Entropy IRL

Key idea: Model expert as Boltzmann-rational (follows max-ent policy):

\[ \pi^*(a|s) = \frac{\exp(Q^*(s,a))}{\sum_{a'} \exp(Q^*(s,a'))} \]

Objective: Maximize likelihood of expert demonstrations:

\[ \max_r \mathbb{E}_{\tau \sim \pi_r^*} \left[ \log P(\tau | r) \right] \]

Where \(P(\tau|r) \propto \exp(r(\tau))\) and \(\pi_r^*\) is the max-ent optimal policy for reward \(r\).

Gradient: $$ \nabla_r \mathcal{L} = \mathbb{E}{\tau \sim \text{expert}} \left[ \nabla_r r(\tau) \right] - \mathbb{E} \left[ \nabla_r r(\tau) \right] $$

Intuition: Increase reward on expert trajectories, decrease on learned policy trajectories.

Complete Implementation

MaxEnt IRL

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class MaxEntIRL:
    """
    Maximum Entropy Inverse Reinforcement Learning

    Reference: Ziebart et al., AAAI 2008
    """
    def __init__(
        self,
        state_dim,
        action_dim,
        reward_hidden_dims=[256, 256],
        rl_algorithm='SAC',  # or 'PPO'
        lr=1e-3,
        gamma=0.99,
        device='cuda'
    ):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.device = device

        # Reward network (parameterized reward function)
        self.reward_net = self._build_reward_network(
            state_dim, action_dim, reward_hidden_dims
        ).to(device)

        self.reward_optimizer = optim.Adam(self.reward_net.parameters(), lr=lr)

        # RL algorithm for learning policy from current reward
        if rl_algorithm == 'SAC':
            from stable_baselines3 import SAC
            self.rl_algorithm_class = SAC
        elif rl_algorithm == 'PPO':
            from stable_baselines3 import PPO
            self.rl_algorithm_class = PPO
        else:
            raise ValueError(f"Unknown RL algorithm: {rl_algorithm}")

    def _build_reward_network(self, state_dim, action_dim, hidden_dims):
        """Build reward network r(s, a)"""
        layers = []

        input_dim = state_dim + action_dim
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim

        # Output single scalar reward
        layers.append(nn.Linear(prev_dim, 1))

        return nn.Sequential(*layers)

    def get_reward(self, state, action):
        """Compute reward for (state, action) pair"""
        state_tensor = torch.FloatTensor(state).to(self.device)
        action_tensor = torch.FloatTensor(action).to(self.device)

        x = torch.cat([state_tensor, action_tensor], dim=-1)
        reward = self.reward_net(x)

        return reward

    def train(
        self,
        env,
        expert_trajectories,
        num_iterations=100,
        rl_steps_per_iteration=10000
    ):
        """
        Train IRL

        Args:
            env: Gym environment (with reward function wrapped)
            expert_trajectories: List of expert trajectories
            num_iterations: Number of IRL iterations
            rl_steps_per_iteration: RL training steps per iteration
        """
        print("Starting MaxEnt IRL training...")

        for iteration in range(num_iterations):
            print(f"\n=== IRL Iteration {iteration + 1}/{num_iterations} ===")

            # Step 1: Train policy with current reward estimate
            print("Training policy with current reward...")

            # Wrap environment to use learned reward
            wrapped_env = RewardWrapper(env, self.reward_net, self.device)

            # Train RL agent
            policy = self.rl_algorithm_class(
                "MlpPolicy",
                wrapped_env,
                verbose=0
            )
            policy.learn(total_timesteps=rl_steps_per_iteration)

            # Step 2: Collect trajectories from learned policy
            print("Collecting trajectories from learned policy...")
            learned_trajectories = self._collect_trajectories(
                wrapped_env, policy, num_trajectories=len(expert_trajectories)
            )

            # Step 3: Update reward function
            print("Updating reward function...")
            reward_loss = self._update_reward(
                expert_trajectories, learned_trajectories
            )

            print(f"Reward loss: {reward_loss:.6f}")

            # Step 4: Evaluate
            if iteration % 10 == 0:
                eval_reward = self._evaluate_policy(env, policy, num_episodes=10)
                print(f"Evaluation reward: {eval_reward:.2f}")

        return self.reward_net, policy

    def _collect_trajectories(self, env, policy, num_trajectories=10):
        """Collect trajectories from policy"""
        trajectories = []

        for _ in range(num_trajectories):
            trajectory = []
            obs = env.reset()
            done = False

            while not done:
                action, _ = policy.predict(obs, deterministic=False)
                trajectory.append((obs, action))
                obs, reward, done, info = env.step(action)

            trajectories.append(trajectory)

        return trajectories

    def _update_reward(self, expert_trajectories, learned_trajectories):
        """
        Update reward function using MaxEnt IRL gradient

        Gradient: E[∇r|expert] - E[∇r|learned policy]
        """
        # Compute expert feature expectations
        expert_states, expert_actions = self._flatten_trajectories(expert_trajectories)
        expert_states = torch.FloatTensor(expert_states).to(self.device)
        expert_actions = torch.FloatTensor(expert_actions).to(self.device)

        # Compute learned policy feature expectations
        learned_states, learned_actions = self._flatten_trajectories(learned_trajectories)
        learned_states = torch.FloatTensor(learned_states).to(self.device)
        learned_actions = torch.FloatTensor(learned_actions).to(self.device)

        # Forward pass
        expert_x = torch.cat([expert_states, expert_actions], dim=-1)
        expert_rewards = self.reward_net(expert_x)

        learned_x = torch.cat([learned_states, learned_actions], dim=-1)
        learned_rewards = self.reward_net(learned_x)

        # MaxEnt IRL loss: maximize expert rewards, minimize learned policy rewards
        loss = -expert_rewards.mean() + learned_rewards.mean()

        # Add regularization (optional)
        l2_reg = 0.01 * sum(p.pow(2).sum() for p in self.reward_net.parameters())
        loss += l2_reg

        # Backward pass
        self.reward_optimizer.zero_grad()
        loss.backward()
        self.reward_optimizer.step()

        return loss.item()

    def _flatten_trajectories(self, trajectories):
        """Flatten list of trajectories into states and actions"""
        states = []
        actions = []

        for trajectory in trajectories:
            for state, action in trajectory:
                states.append(state)
                actions.append(action)

        return np.array(states), np.array(actions)

    def _evaluate_policy(self, env, policy, num_episodes=10):
        """Evaluate policy in true environment"""
        total_rewards = []

        for _ in range(num_episodes):
            obs = env.reset()
            episode_reward = 0
            done = False

            while not done:
                action, _ = policy.predict(obs, deterministic=True)
                obs, reward, done, info = env.step(action)
                episode_reward += reward

            total_rewards.append(episode_reward)

        return np.mean(total_rewards)


class RewardWrapper(gym.Wrapper):
    """Wrap environment to use learned reward function"""
    def __init__(self, env, reward_net, device):
        super().__init__(env)
        self.reward_net = reward_net
        self.device = device
        self.last_obs = None

    def reset(self):
        self.last_obs = self.env.reset()
        return self.last_obs

    def step(self, action):
        obs, true_reward, done, info = self.env.step(action)

        # Compute learned reward
        state_tensor = torch.FloatTensor(self.last_obs).unsqueeze(0).to(self.device)
        action_tensor = torch.FloatTensor(action).unsqueeze(0).to(self.device)
        x = torch.cat([state_tensor, action_tensor], dim=-1)

        with torch.no_grad():
            learned_reward = self.reward_net(x).item()

        self.last_obs = obs

        # Return learned reward instead of true reward
        return obs, learned_reward, done, info

GAIL: Generative Adversarial Imitation Learning

GAIL is a more practical IRL variant using adversarial training:

Paper: Ho & Ermon, "Generative Adversarial Imitation Learning", NeurIPS 2016

Key Idea

Train discriminator to distinguish expert from policy, train policy to fool discriminator:

  • Discriminator: \(D(s, a) \rightarrow [0, 1]\) (expert vs policy)
  • Policy: Maximize \(\mathbb{E}_{\pi} [\log D(s, a)]\)

Reward signal: \(r(s, a) = -\log(1 - D(s, a))\)

Implementation

class GAIL:
    """
    Generative Adversarial Imitation Learning

    Reference: Ho & Ermon, NeurIPS 2016
    """
    def __init__(
        self,
        state_dim,
        action_dim,
        disc_hidden_dims=[256, 256],
        lr_disc=3e-4,
        lr_policy=3e-4,
        device='cuda'
    ):
        self.device = device

        # Discriminator: D(s,a) ∈ [0,1]
        # Output 1 for expert, 0 for policy
        self.discriminator = self._build_discriminator(
            state_dim, action_dim, disc_hidden_dims
        ).to(device)

        self.disc_optimizer = optim.Adam(
            self.discriminator.parameters(), lr=lr_disc
        )

        # Policy (use any RL algorithm, e.g., PPO)
        from stable_baselines3 import PPO
        self.policy_class = PPO

    def _build_discriminator(self, state_dim, action_dim, hidden_dims):
        """Build discriminator network"""
        layers = []

        input_dim = state_dim + action_dim
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.Tanh())
            prev_dim = hidden_dim

        # Output probability: expert (1) vs policy (0)
        layers.append(nn.Linear(prev_dim, 1))
        layers.append(nn.Sigmoid())

        return nn.Sequential(*layers)

    def train(
        self,
        env,
        expert_trajectories,
        num_iterations=100,
        policy_steps=2048,
        disc_steps=4
    ):
        """Train GAIL"""
        print("Starting GAIL training...")

        # Create policy with GAIL reward wrapper
        gail_env = GAILRewardWrapper(env, self.discriminator, self.device)
        policy = self.policy_class("MlpPolicy", gail_env, verbose=1)

        for iteration in range(num_iterations):
            print(f"\n=== GAIL Iteration {iteration + 1}/{num_iterations} ===")

            # Step 1: Collect policy trajectories
            policy.learn(total_timesteps=policy_steps)

            policy_trajectories = self._collect_trajectories(
                env, policy, num_trajectories=len(expert_trajectories)
            )

            # Step 2: Update discriminator
            for _ in range(disc_steps):
                disc_loss = self._update_discriminator(
                    expert_trajectories, policy_trajectories
                )

            print(f"Discriminator loss: {disc_loss:.6f}")

            # Step 3: Evaluate
            if iteration % 10 == 0:
                eval_reward = self._evaluate(env, policy, num_episodes=10)
                print(f"Evaluation reward: {eval_reward:.2f}")

        return policy

    def _update_discriminator(self, expert_traj, policy_traj):
        """
        Update discriminator using binary cross-entropy

        Expert samples → label 1
        Policy samples → label 0
        """
        # Flatten trajectories
        expert_states, expert_actions = self._flatten_trajectories(expert_traj)
        policy_states, policy_actions = self._flatten_trajectories(policy_traj)

        expert_states = torch.FloatTensor(expert_states).to(self.device)
        expert_actions = torch.FloatTensor(expert_actions).to(self.device)
        policy_states = torch.FloatTensor(policy_states).to(self.device)
        policy_actions = torch.FloatTensor(policy_actions).to(self.device)

        # Discriminator predictions
        expert_x = torch.cat([expert_states, expert_actions], dim=-1)
        expert_preds = self.discriminator(expert_x)

        policy_x = torch.cat([policy_states, policy_actions], dim=-1)
        policy_preds = self.discriminator(policy_x)

        # Binary cross-entropy loss
        expert_loss = nn.BCELoss()(expert_preds, torch.ones_like(expert_preds))
        policy_loss = nn.BCELoss()(policy_preds, torch.zeros_like(policy_preds))

        loss = expert_loss + policy_loss

        # Gradient penalty (optional, for stability)
        gp = self._gradient_penalty(expert_x, policy_x)
        loss += 10.0 * gp

        # Backward pass
        self.disc_optimizer.zero_grad()
        loss.backward()
        self.disc_optimizer.step()

        return loss.item()

    def _gradient_penalty(self, expert_data, policy_data):
        """Gradient penalty for discriminator stability"""
        alpha = torch.rand(expert_data.size(0), 1).to(self.device)
        interpolates = alpha * expert_data + (1 - alpha) * policy_data
        interpolates.requires_grad_(True)

        disc_interpolates = self.discriminator(interpolates)

        gradients = torch.autograd.grad(
            outputs=disc_interpolates,
            inputs=interpolates,
            grad_outputs=torch.ones_like(disc_interpolates),
            create_graph=True,
            retain_graph=True
        )[0]

        gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
        return gradient_penalty

    def _collect_trajectories(self, env, policy, num_trajectories):
        """Collect trajectories from policy"""
        trajectories = []

        for _ in range(num_trajectories):
            trajectory = []
            obs = env.reset()
            done = False

            while not done:
                action, _ = policy.predict(obs, deterministic=False)
                trajectory.append((obs, action))
                obs, reward, done, info = env.step(action)

            trajectories.append(trajectory)

        return trajectories

    def _flatten_trajectories(self, trajectories):
        """Flatten trajectories to states and actions"""
        states = []
        actions = []

        for traj in trajectories:
            for state, action in traj:
                states.append(state)
                actions.append(action)

        return np.array(states), np.array(actions)

    def _evaluate(self, env, policy, num_episodes):
        """Evaluate policy"""
        rewards = []

        for _ in range(num_episodes):
            obs = env.reset()
            episode_reward = 0
            done = False

            while not done:
                action, _ = policy.predict(obs, deterministic=True)
                obs, reward, done, info = env.step(action)
                episode_reward += reward

            rewards.append(episode_reward)

        return np.mean(rewards)


class GAILRewardWrapper(gym.Wrapper):
    """Wrap environment to use GAIL discriminator as reward"""
    def __init__(self, env, discriminator, device):
        super().__init__(env)
        self.discriminator = discriminator
        self.device = device
        self.last_obs = None

    def reset(self):
        self.last_obs = self.env.reset()
        return self.last_obs

    def step(self, action):
        obs, true_reward, done, info = self.env.step(action)

        # GAIL reward: -log(1 - D(s,a))
        state_tensor = torch.FloatTensor(self.last_obs).unsqueeze(0).to(self.device)
        action_tensor = torch.FloatTensor(action).unsqueeze(0).to(self.device)
        x = torch.cat([state_tensor, action_tensor], dim=-1)

        with torch.no_grad():
            d = self.discriminator(x).item()
            gail_reward = -np.log(max(1 - d, 1e-8))  # Avoid log(0)

        self.last_obs = obs

        return obs, gail_reward, done, info

AIRL: Adversarial Inverse RL

AIRL recovers disentangled reward function (separates reward from dynamics):

Paper: Fu et al., "Learning Robust Rewards with Adversarial Inverse Reinforcement Learning", ICLR 2018

Key Advantage

GAIL reward depends on dynamics; AIRL extracts transferable reward:

\[ D(s, a, s') = \frac{\exp(f(s, a, s'))}{\exp(f(s, a, s')) + \pi(a|s)} \]

Where \(f(s, a, s') = r(s, a) + \gamma V(s') - V(s)\)

Implementation (Simplified)

class AIRL:
    """
    Adversarial Inverse Reinforcement Learning

    Learns disentangled reward function
    """
    def __init__(self, state_dim, action_dim, device='cuda'):
        self.device = device

        # Reward function r(s, a)
        self.reward_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.Tanh(),
            nn.Linear(256, 256),
            nn.Tanh(),
            nn.Linear(256, 1)
        ).to(device)

        # Value function V(s)
        self.value_net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.Tanh(),
            nn.Linear(256, 256),
            nn.Tanh(),
            nn.Linear(256, 1)
        ).to(device)

        self.optimizer = optim.Adam(
            list(self.reward_net.parameters()) +
            list(self.value_net.parameters()),
            lr=3e-4
        )

    def discriminator(self, state, action, next_state):
        """AIRL discriminator"""
        state_tensor = torch.FloatTensor(state).to(self.device)
        action_tensor = torch.FloatTensor(action).to(self.device)
        next_state_tensor = torch.FloatTensor(next_state).to(self.device)

        # Compute f(s, a, s') = r(s,a) + γV(s') - V(s)
        sa = torch.cat([state_tensor, action_tensor], dim=-1)
        r = self.reward_net(sa)
        v = self.value_net(state_tensor)
        v_next = self.value_net(next_state_tensor)

        f = r + 0.99 * v_next - v

        # Discriminator output
        # D = exp(f) / (exp(f) + π(a|s))
        # Approximation: D ≈ sigmoid(f)
        d = torch.sigmoid(f)

        return d

    # Training similar to GAIL but using discriminator with next_state

Practical Tips

Choosing an IRL Method

Method Computational Cost Reward Transfer Best For
MaxEnt IRL Very High (needs RL in loop) Good Small state spaces
GAIL Medium Poor (entangled) General imitation
AIRL Medium-High Excellent Transfer learning
IQ-Learn Low Good Large-scale problems

Hyperparameter Tuning

# GAIL hyperparameters
gail_config = {
    'disc_hidden_dims': [256, 256],
    'lr_disc': 3e-4,
    'policy_steps': 2048,  # Per iteration
    'disc_steps': 4,  # Discriminator updates per policy update
    'gradient_penalty_coef': 10.0
}

# For sparse reward tasks
gail_config['policy_steps'] = 4096  # More exploration
gail_config['disc_steps'] = 8  # More discriminator training

Common Issues

Problem: Discriminator overfits

# Solution: Add gradient penalty, dropout
def build_discriminator_with_dropout():
    return nn.Sequential(
        nn.Linear(input_dim, 256),
        nn.Tanh(),
        nn.Dropout(0.2),  # Dropout
        nn.Linear(256, 256),
        nn.Tanh(),
        nn.Dropout(0.2),
        nn.Linear(256, 1),
        nn.Sigmoid()
    )

Problem: Training unstable

# Solution: Spectral normalization
from torch.nn.utils import spectral_norm

def build_stable_discriminator():
    return nn.Sequential(
        spectral_norm(nn.Linear(input_dim, 256)),
        nn.Tanh(),
        spectral_norm(nn.Linear(256, 256)),
        nn.Tanh(),
        spectral_norm(nn.Linear(256, 1)),
        nn.Sigmoid()
    )

References

Papers

  1. MaxEnt IRL: Ziebart et al., "Maximum Entropy Inverse Reinforcement Learning", AAAI 2008
  2. GAIL: Ho & Ermon, "Generative Adversarial Imitation Learning", NeurIPS 2016 (arXiv)
  3. AIRL: Fu et al., "Learning Robust Rewards with Adversarial Inverse RL", ICLR 2018 (arXiv)
  4. IQ-Learn: Garg et al., "IQ-Learn: Inverse soft-Q Learning for Imitation", NeurIPS 2021

Books

  1. Abbeel & Ng, "Apprenticeship Learning via Inverse RL", ICML 2004 (Foundational)

Code

  • Imitation Library: https://github.com/HumanCompatibleAI/imitation
  • Includes GAIL, AIRL implementations
  • IQ-Learn: https://github.com/Div99/IQ-Learn
  • SQIL: https://github.com/saxenasaurabh/SQIL

Next Steps