On-Policy Reinforcement Learning¶
On-policy algorithms learn from data collected by the current policy, providing stable and reliable training for robotics.
Overview¶
On-policy means the agent learns from experiences collected by the current policy: - Collect data with policy \(\pi_\theta\) - Update \(\theta\) to improve policy - Discard old data, collect new data with \(\pi_{\theta'}\)
Key algorithms: - PPO (Proximal Policy Optimization) - Most popular - TRPO (Trust Region Policy Optimization) - Theoretically grounded - A3C/A2C (Actor-Critic) - Parallelizable
Proximal Policy Optimization (PPO)¶
PPO is the most widely used RL algorithm in robotics due to its simplicity, stability, and performance.
Paper: Schulman et al., "Proximal Policy Optimization Algorithms", 2017
Mathematical Foundation¶
PPO optimizes a clipped surrogate objective:
Where: - \(r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}\) (probability ratio) - \(\hat{A}_t\) = advantage estimate - \(\epsilon\) = clip parameter (typically 0.2)
Intuition: Prevent large policy updates by clipping the ratio.
Complete Implementation¶
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
class PPO:
"""
Proximal Policy Optimization for continuous control
Reference: Schulman et al., 2017
"""
def __init__(
self,
state_dim,
action_dim,
hidden_dim=256,
lr=3e-4,
gamma=0.99,
gae_lambda=0.95,
clip_epsilon=0.2,
value_coef=0.5,
entropy_coef=0.01,
max_grad_norm=0.5,
ppo_epochs=10,
mini_batch_size=64
):
self.gamma = gamma
self.gae_lambda = gae_lambda
self.clip_epsilon = clip_epsilon
self.value_coef = value_coef
self.entropy_coef = entropy_coef
self.max_grad_norm = max_grad_norm
self.ppo_epochs = ppo_epochs
self.mini_batch_size = mini_batch_size
# Actor network (policy)
self.actor = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
)
self.actor_mean = nn.Linear(hidden_dim, action_dim)
self.actor_logstd = nn.Parameter(torch.zeros(action_dim))
# Critic network (value function)
self.critic = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1)
)
# Optimizer
self.optimizer = optim.Adam(
list(self.actor.parameters()) +
list(self.actor_mean.parameters()) +
[self.actor_logstd] +
list(self.critic.parameters()),
lr=lr
)
def get_action(self, state, deterministic=False):
"""Sample action from policy"""
state = torch.FloatTensor(state)
# Actor forward pass
features = self.actor(state)
mean = self.actor_mean(features)
std = torch.exp(self.actor_logstd)
if deterministic:
return mean.detach().numpy()
# Sample from Gaussian policy
dist = Normal(mean, std)
action = dist.sample()
return action.detach().numpy()
def evaluate_actions(self, states, actions):
"""Evaluate log prob and entropy of actions"""
features = self.actor(states)
mean = self.actor_mean(features)
std = torch.exp(self.actor_logstd)
dist = Normal(mean, std)
log_probs = dist.log_prob(actions).sum(dim=-1)
entropy = dist.entropy().sum(dim=-1)
return log_probs, entropy
def compute_gae(self, rewards, values, dones, next_value):
"""
Generalized Advantage Estimation (GAE)
Paper: Schulman et al., "High-Dimensional Continuous Control Using
Generalized Advantage Estimation", 2016
GAE(λ) provides a trade-off between bias and variance:
λ=0: high bias, low variance (TD)
λ=1: low bias, high variance (Monte Carlo)
"""
advantages = torch.zeros_like(rewards)
last_gae = 0
values = torch.cat([values, next_value.unsqueeze(0)])
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value_t = next_value
else:
next_value_t = values[t + 1]
# TD error
delta = rewards[t] + self.gamma * next_value_t * (1 - dones[t]) - values[t]
# GAE
advantages[t] = last_gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * last_gae
returns = advantages + values[:-1]
return advantages, returns
def update(self, rollout_buffer):
"""
PPO update using clipped objective
Performs multiple epochs of minibatch updates
"""
states = torch.FloatTensor(rollout_buffer['states'])
actions = torch.FloatTensor(rollout_buffer['actions'])
old_log_probs = torch.FloatTensor(rollout_buffer['log_probs'])
rewards = torch.FloatTensor(rollout_buffer['rewards'])
dones = torch.FloatTensor(rollout_buffer['dones'])
values = torch.FloatTensor(rollout_buffer['values'])
next_value = torch.FloatTensor([rollout_buffer['next_value']])
# Compute advantages and returns
advantages, returns = self.compute_gae(rewards, values, dones, next_value)
# Normalize advantages (important for stability)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# PPO epochs
for epoch in range(self.ppo_epochs):
# Generate random minibatches
indices = np.random.permutation(len(states))
for start in range(0, len(states), self.mini_batch_size):
end = start + self.mini_batch_size
batch_indices = indices[start:end]
batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_advantages = advantages[batch_indices]
batch_returns = returns[batch_indices]
# Evaluate current policy
log_probs, entropy = self.evaluate_actions(batch_states, batch_actions)
values_pred = self.critic(batch_states).squeeze(-1)
# Compute ratio
ratio = torch.exp(log_probs - batch_old_log_probs)
# Clipped surrogate objective
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Value loss (MSE)
value_loss = nn.MSELoss()(values_pred, batch_returns)
# Entropy bonus (encourages exploration)
entropy_loss = -entropy.mean()
# Total loss
loss = actor_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss
# Optimization step
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(
list(self.actor.parameters()) +
list(self.critic.parameters()),
self.max_grad_norm
)
self.optimizer.step()
return {
'actor_loss': actor_loss.item(),
'value_loss': value_loss.item(),
'entropy': -entropy_loss.item()
}
class RolloutBuffer:
"""Buffer for collecting on-policy rollouts"""
def __init__(self):
self.reset()
def reset(self):
self.states = []
self.actions = []
self.rewards = []
self.dones = []
self.values = []
self.log_probs = []
self.next_value = None
def add(self, state, action, reward, done, value, log_prob):
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.dones.append(done)
self.values.append(value)
self.log_probs.append(log_prob)
def get(self):
return {
'states': np.array(self.states),
'actions': np.array(self.actions),
'rewards': np.array(self.rewards),
'dones': np.array(self.dones),
'values': np.array(self.values),
'log_probs': np.array(self.log_probs),
'next_value': self.next_value
}
def train_ppo(env, num_timesteps=1_000_000, rollout_length=2048):
"""Training loop for PPO"""
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
ppo = PPO(state_dim, action_dim)
buffer = RolloutBuffer()
state = env.reset()
episode_reward = 0
episode_length = 0
for timestep in range(num_timesteps):
# Collect rollout
if timestep % rollout_length == 0 and timestep > 0:
# Compute next value for GAE
with torch.no_grad():
buffer.next_value = ppo.critic(torch.FloatTensor(state)).item()
# Update policy
metrics = ppo.update(buffer.get())
buffer.reset()
# Logging
if timestep % 10000 == 0:
print(f"Timestep {timestep}:")
print(f" Actor Loss: {metrics['actor_loss']:.4f}")
print(f" Value Loss: {metrics['value_loss']:.4f}")
print(f" Entropy: {metrics['entropy']:.4f}")
# Get action
action = ppo.get_action(state)
# Evaluate for PPO
with torch.no_grad():
state_tensor = torch.FloatTensor(state)
features = ppo.actor(state_tensor)
mean = ppo.actor_mean(features)
std = torch.exp(ppo.actor_logstd)
dist = Normal(mean, std)
action_tensor = torch.FloatTensor(action)
log_prob = dist.log_prob(action_tensor).sum().item()
value = ppo.critic(state_tensor).item()
# Environment step
next_state, reward, done, info = env.step(action)
# Store transition
buffer.add(state, action, reward, done, value, log_prob)
episode_reward += reward
episode_length += 1
if done:
print(f"Episode: Reward={episode_reward:.2f}, Length={episode_length}")
state = env.reset()
episode_reward = 0
episode_length = 0
else:
state = next_state
return ppo
Hyperparameter Tuning¶
Critical hyperparameters:
| Parameter | Typical Range | Robotics Default | Notes |
|---|---|---|---|
learning_rate |
1e-5 to 1e-3 | 3e-4 | Lower for fine control |
clip_epsilon |
0.1 to 0.3 | 0.2 | Larger = more aggressive updates |
gamma |
0.95 to 0.999 | 0.99 | Higher for long-horizon tasks |
gae_lambda |
0.9 to 0.98 | 0.95 | Trade-off bias/variance |
ppo_epochs |
3 to 15 | 10 | More epochs = better optimization |
mini_batch_size |
32 to 128 | 64 | Depends on rollout size |
Tuning tips:
# For manipulation tasks (short episodes)
ppo_manipulation = PPO(
lr=3e-4,
gamma=0.99,
gae_lambda=0.95,
clip_epsilon=0.2,
ppo_epochs=10
)
# For locomotion tasks (long episodes)
ppo_locomotion = PPO(
lr=1e-4,
gamma=0.995,
gae_lambda=0.97,
clip_epsilon=0.2,
ppo_epochs=15
)
# For highly stochastic environments
ppo_stochastic = PPO(
lr=3e-4,
gamma=0.99,
gae_lambda=0.9, # Lower for high variance
clip_epsilon=0.3, # Larger clips
entropy_coef=0.05 # More exploration
)
Trust Region Policy Optimization (TRPO)¶
TRPO guarantees monotonic policy improvement using a trust region constraint.
Paper: Schulman et al., "Trust Region Policy Optimization", ICML 2015
Mathematical Foundation¶
TRPO maximizes:
Subject to: $$ \mathbb{E}t [KL(\pi(\cdot|s_t) || \pi_\theta(\cdot|s_t))] \leq \delta $$}
Advantage over PPO: Theoretically guaranteed improvement Disadvantage: More complex, requires conjugate gradient
Key Difference from PPO¶
| Feature | PPO | TRPO |
|---|---|---|
| Constraint | Clip ratio | KL divergence |
| Implementation | Simple | Complex (CG) |
| Performance | Similar | Similar |
| Computation | Fast | Slower |
Recommendation: Use PPO for most robotics applications.
Actor-Critic (A2C/A3C)¶
Simpler on-policy algorithm using parallel workers.
class A2C:
"""Advantage Actor-Critic (synchronous)"""
def __init__(self, state_dim, action_dim):
# Actor and critic networks (shared backbone)
self.shared = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU()
)
self.actor = nn.Linear(256, action_dim)
self.critic = nn.Linear(256, 1)
self.optimizer = optim.Adam(self.parameters(), lr=1e-3)
def forward(self, state):
features = self.shared(state)
action_logits = self.actor(features)
value = self.critic(features)
return action_logits, value
def update(self, states, actions, rewards, next_states, dones):
"""A2C update"""
# Compute advantages
_, values = self.forward(states)
_, next_values = self.forward(next_states)
advantages = rewards + 0.99 * next_values * (1 - dones) - values
# Actor loss (policy gradient)
actor_loss = -(advantages.detach() * log_probs).mean()
# Critic loss
critic_loss = advantages.pow(2).mean()
# Update
loss = actor_loss + 0.5 * critic_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
Practical Tips¶
When to Use On-Policy¶
✓ Use when: - You need stable, reliable training - Sample efficiency is not critical (can collect lots of data) - Working in simulation - Policy stability matters (safety)
✗Avoid when: - Sample collection is expensive (real robot) - Need maximum sample efficiency - Environment is very high-dimensional
Common Issues¶
Problem: Training unstable - Solution: Reduce learning rate, increase clip_epsilon
Problem: No improvement - Solution: Check reward scaling, increase entropy coefficient
Problem: Catastrophic forgetting - Solution: Reduce PPO epochs, smaller mini-batches
References¶
Papers¶
- PPO: Schulman et al., "Proximal Policy Optimization Algorithms", arXiv 2017
- TRPO: Schulman et al., "Trust Region Policy Optimization", ICML 2015
- GAE: Schulman et al., "High-Dimensional Continuous Control Using Generalized Advantage Estimation", ICLR 2016
Books¶
- Sutton & Barto, "Reinforcement Learning: An Introduction", 2nd Edition, 2018
- Bertsekas, "Reinforcement Learning and Optimal Control", 2019
Code Implementations¶
- Stable-Baselines3: https://github.com/DLR-RM/stable-baselines3
- CleanRL: https://github.com/vwxyzjn/cleanrl
- SpinningUp: https://spinningup.openai.com/
Tutorials¶
- OpenAI Spinning Up: https://spinningup.openai.com/en/latest/
- Lil'Log PPO Tutorial: https://lilianweng.github.io/posts/2018-04-08-policy-gradient/
Next Steps¶
- Off-Policy Algorithms (SAC, TD3) - Sample-efficient alternatives
- Model-Based RL - Learn environment models
- Distributed Training - Scale up training
Framework Guides¶
- RSL-RL - Isaac Lab integration
- RL Games - Fast GPU RL
- Stable-Baselines3 - Easy-to-use RL