Evaluating RL Policies¶
Guide to evaluating and analyzing reinforcement learning policies.
Evaluation Metrics¶
Success Rate¶
def evaluate_success_rate(model, env, n_episodes=100):
successes = []
for _ in range(n_episodes):
obs, info = env.reset()
done = False
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, truncated, info = env.step(action)
done = done or truncated
successes.append(float(info.get('is_success', 0)))
return np.mean(successes), np.std(successes)
# Usage
success_rate, std = evaluate_success_rate(model, eval_env)
print(f"Success rate: {success_rate:.2%} ± {std:.2%}")
Episode Return¶
from stable_baselines3.common.evaluation import evaluate_policy
mean_reward, std_reward = evaluate_policy(
model,
eval_env,
n_eval_episodes=100,
deterministic=True
)
print(f"Mean reward: {mean_reward:.2f} ± {std_reward:.2f}")
Sample Efficiency¶
Track how quickly the agent learns:
# During training
returns_per_timestep = []
success_rates_per_timestep = []
# Plot learning curve
plt.plot(returns_per_timestep)
plt.xlabel('Environment Steps')
plt.ylabel('Average Return')
plt.title('Learning Curve')
Visualization¶
Recording Videos¶
from stable_baselines3.common.vec_env import VecVideoRecorder
# Wrap environment
video_folder = './videos/'
video_length = 500
env = VecVideoRecorder(
env,
video_folder,
record_video_trigger=lambda x: x == 0,
video_length=video_length
)
# Evaluate
obs = env.reset()
for _ in range(video_length):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
Action Distribution Analysis¶
def analyze_actions(model, env, n_steps=1000):
actions = []
obs = env.reset()
for _ in range(n_steps):
action, _ = model.predict(obs, deterministic=True)
actions.append(action)
obs, _, done, info = env.step(action)
if done:
obs = env.reset()
actions = np.array(actions)
# Plot distribution
fig, axes = plt.subplots(1, actions.shape[1], figsize=(15, 3))
for i, ax in enumerate(axes):
ax.hist(actions[:, i], bins=50)
ax.set_title(f'Action Dim {i}')
ax.set_xlabel('Value')
ax.set_ylabel('Frequency')
plt.tight_layout()
plt.show()
Sim-to-Real Evaluation¶
Domain Randomization Testing¶
def evaluate_robustness(model, env, n_episodes=50):
"""Evaluate policy under various domain randomization settings"""
results = {
'mass': [],
'friction': [],
'lighting': []
}
for param_name in results.keys():
for param_value in np.linspace(0.5, 1.5, 10):
# Set randomization parameter
env.set_randomization({param_name: param_value})
# Evaluate
success_rate, _ = evaluate_success_rate(model, env, n_episodes)
results[param_name].append((param_value, success_rate))
return results
# Plot robustness
for param_name, values in results.items():
x, y = zip(*values)
plt.plot(x, y, label=param_name)
plt.xlabel('Randomization Factor')
plt.ylabel('Success Rate')
plt.legend()
plt.show()
Ablation Studies¶
Component Analysis¶
# Evaluate with different components disabled
configs = [
{'name': 'Full Model', 'remove': []},
{'name': 'No Vision', 'remove': ['vision']},
{'name': 'No Proprioception', 'remove': ['state']},
{'name': 'No History', 'remove': ['history']}
]
results = {}
for config in configs:
model_variant = create_model_variant(model, config['remove'])
success_rate, _ = evaluate_success_rate(model_variant, env)
results[config['name']] = success_rate
# Plot
plt.bar(results.keys(), results.values())
plt.ylabel('Success Rate')
plt.title('Ablation Study')
plt.xticks(rotation=45)
plt.show()
Real-World Deployment¶
Safety Checks¶
class SafetyMonitor:
def __init__(self, limits):
self.limits = limits
self.violations = []
def check_action(self, action, state):
# Check workspace bounds
if not self._in_workspace(state['ee_position']):
self.violations.append('workspace_violation')
return self._clamp_action(action, state)
# Check velocity limits
if self._exceeds_velocity_limits(action):
self.violations.append('velocity_violation')
return self._scale_action(action)
# Check collision
if self._would_collide(action, state):
self.violations.append('collision_risk')
return np.zeros_like(action) # Stop
return action
def get_violation_stats(self):
return Counter(self.violations)
Benchmarking¶
Comparing Algorithms¶
algorithms = {
'PPO': PPO('MlpPolicy', env),
'SAC': SAC('MlpPolicy', env),
'TD3': TD3('MlpPolicy', env)
}
results = {}
for name, model in algorithms.items():
print(f"Training {name}...")
model.learn(total_timesteps=1_000_000)
print(f"Evaluating {name}...")
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100)
results[name] = {
'mean_reward': mean_reward,
'std_reward': std_reward
}
# Plot comparison
fig, ax = plt.subplots()
names = list(results.keys())
means = [results[n]['mean_reward'] for n in names]
stds = [results[n]['std_reward'] for n in names]
ax.bar(names, means, yerr=stds)
ax.set_ylabel('Mean Reward')
ax.set_title('Algorithm Comparison')
plt.show()
Statistical Significance¶
from scipy import stats
def compare_policies(policy1, policy2, env, n_episodes=100):
"""Statistical comparison of two policies"""
# Collect returns
returns1 = [evaluate_episode(policy1, env) for _ in range(n_episodes)]
returns2 = [evaluate_episode(policy2, env) for _ in range(n_episodes)]
# T-test
t_stat, p_value = stats.ttest_ind(returns1, returns2)
print(f"Policy 1: {np.mean(returns1):.2f} ± {np.std(returns1):.2f}")
print(f"Policy 2: {np.mean(returns2):.2f} ± {np.std(returns2):.2f}")
print(f"p-value: {p_value:.4f}")
if p_value < 0.05:
better_policy = 1 if np.mean(returns1) > np.mean(returns2) else 2
print(f"Policy {better_policy} is significantly better (p < 0.05)")
else:
print("No significant difference")
return p_value
Failure Analysis¶
def analyze_failures(model, env, n_episodes=100):
"""Analyze episodes where policy fails"""
failures = []
for episode_idx in range(n_episodes):
obs = env.reset()
trajectory = []
done = False
while not done:
action, _ = model.predict(obs, deterministic=True)
next_obs, reward, done, truncated, info = env.step(action)
trajectory.append({
'obs': obs,
'action': action,
'reward': reward,
'next_obs': next_obs
})
obs = next_obs
done = done or truncated
if not info.get('is_success', False):
failures.append({
'episode': episode_idx,
'trajectory': trajectory,
'final_state': obs,
'reason': info.get('failure_reason', 'unknown')
})
# Analyze failure modes
failure_reasons = Counter([f['reason'] for f in failures])
print("Failure modes:")
for reason, count in failure_reasons.most_common():
print(f" {reason}: {count} ({count/n_episodes*100:.1f}%)")
return failures
Next Steps¶
- Algorithms - Try different RL algorithms
- Training - Improve training procedure
- Simulators - Test in different environments