Skip to content

Evaluating RL Policies

Guide to evaluating and analyzing reinforcement learning policies.

Evaluation Metrics

Success Rate

def evaluate_success_rate(model, env, n_episodes=100):
    successes = []

    for _ in range(n_episodes):
        obs, info = env.reset()
        done = False

        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, truncated, info = env.step(action)
            done = done or truncated

        successes.append(float(info.get('is_success', 0)))

    return np.mean(successes), np.std(successes)

# Usage
success_rate, std = evaluate_success_rate(model, eval_env)
print(f"Success rate: {success_rate:.2%} ± {std:.2%}")

Episode Return

from stable_baselines3.common.evaluation import evaluate_policy

mean_reward, std_reward = evaluate_policy(
    model,
    eval_env,
    n_eval_episodes=100,
    deterministic=True
)

print(f"Mean reward: {mean_reward:.2f} ± {std_reward:.2f}")

Sample Efficiency

Track how quickly the agent learns:

# During training
returns_per_timestep = []
success_rates_per_timestep = []

# Plot learning curve
plt.plot(returns_per_timestep)
plt.xlabel('Environment Steps')
plt.ylabel('Average Return')
plt.title('Learning Curve')

Visualization

Recording Videos

from stable_baselines3.common.vec_env import VecVideoRecorder

# Wrap environment
video_folder = './videos/'
video_length = 500

env = VecVideoRecorder(
    env,
    video_folder,
    record_video_trigger=lambda x: x == 0,
    video_length=video_length
)

# Evaluate
obs = env.reset()
for _ in range(video_length):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)

Action Distribution Analysis

def analyze_actions(model, env, n_steps=1000):
    actions = []
    obs = env.reset()

    for _ in range(n_steps):
        action, _ = model.predict(obs, deterministic=True)
        actions.append(action)
        obs, _, done, info = env.step(action)
        if done:
            obs = env.reset()

    actions = np.array(actions)

    # Plot distribution
    fig, axes = plt.subplots(1, actions.shape[1], figsize=(15, 3))
    for i, ax in enumerate(axes):
        ax.hist(actions[:, i], bins=50)
        ax.set_title(f'Action Dim {i}')
        ax.set_xlabel('Value')
        ax.set_ylabel('Frequency')

    plt.tight_layout()
    plt.show()

Sim-to-Real Evaluation

Domain Randomization Testing

def evaluate_robustness(model, env, n_episodes=50):
    """Evaluate policy under various domain randomization settings"""
    results = {
        'mass': [],
        'friction': [],
        'lighting': []
    }

    for param_name in results.keys():
        for param_value in np.linspace(0.5, 1.5, 10):
            # Set randomization parameter
            env.set_randomization({param_name: param_value})

            # Evaluate
            success_rate, _ = evaluate_success_rate(model, env, n_episodes)
            results[param_name].append((param_value, success_rate))

    return results

# Plot robustness
for param_name, values in results.items():
    x, y = zip(*values)
    plt.plot(x, y, label=param_name)

plt.xlabel('Randomization Factor')
plt.ylabel('Success Rate')
plt.legend()
plt.show()

Ablation Studies

Component Analysis

# Evaluate with different components disabled
configs = [
    {'name': 'Full Model', 'remove': []},
    {'name': 'No Vision', 'remove': ['vision']},
    {'name': 'No Proprioception', 'remove': ['state']},
    {'name': 'No History', 'remove': ['history']}
]

results = {}
for config in configs:
    model_variant = create_model_variant(model, config['remove'])
    success_rate, _ = evaluate_success_rate(model_variant, env)
    results[config['name']] = success_rate

# Plot
plt.bar(results.keys(), results.values())
plt.ylabel('Success Rate')
plt.title('Ablation Study')
plt.xticks(rotation=45)
plt.show()

Real-World Deployment

Safety Checks

class SafetyMonitor:
    def __init__(self, limits):
        self.limits = limits
        self.violations = []

    def check_action(self, action, state):
        # Check workspace bounds
        if not self._in_workspace(state['ee_position']):
            self.violations.append('workspace_violation')
            return self._clamp_action(action, state)

        # Check velocity limits
        if self._exceeds_velocity_limits(action):
            self.violations.append('velocity_violation')
            return self._scale_action(action)

        # Check collision
        if self._would_collide(action, state):
            self.violations.append('collision_risk')
            return np.zeros_like(action)  # Stop

        return action

    def get_violation_stats(self):
        return Counter(self.violations)

Benchmarking

Comparing Algorithms

algorithms = {
    'PPO': PPO('MlpPolicy', env),
    'SAC': SAC('MlpPolicy', env),
    'TD3': TD3('MlpPolicy', env)
}

results = {}

for name, model in algorithms.items():
    print(f"Training {name}...")
    model.learn(total_timesteps=1_000_000)

    print(f"Evaluating {name}...")
    mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100)

    results[name] = {
        'mean_reward': mean_reward,
        'std_reward': std_reward
    }

# Plot comparison
fig, ax = plt.subplots()
names = list(results.keys())
means = [results[n]['mean_reward'] for n in names]
stds = [results[n]['std_reward'] for n in names]

ax.bar(names, means, yerr=stds)
ax.set_ylabel('Mean Reward')
ax.set_title('Algorithm Comparison')
plt.show()

Statistical Significance

from scipy import stats

def compare_policies(policy1, policy2, env, n_episodes=100):
    """Statistical comparison of two policies"""
    # Collect returns
    returns1 = [evaluate_episode(policy1, env) for _ in range(n_episodes)]
    returns2 = [evaluate_episode(policy2, env) for _ in range(n_episodes)]

    # T-test
    t_stat, p_value = stats.ttest_ind(returns1, returns2)

    print(f"Policy 1: {np.mean(returns1):.2f} ± {np.std(returns1):.2f}")
    print(f"Policy 2: {np.mean(returns2):.2f} ± {np.std(returns2):.2f}")
    print(f"p-value: {p_value:.4f}")

    if p_value < 0.05:
        better_policy = 1 if np.mean(returns1) > np.mean(returns2) else 2
        print(f"Policy {better_policy} is significantly better (p < 0.05)")
    else:
        print("No significant difference")

    return p_value

Failure Analysis

def analyze_failures(model, env, n_episodes=100):
    """Analyze episodes where policy fails"""
    failures = []

    for episode_idx in range(n_episodes):
        obs = env.reset()
        trajectory = []
        done = False

        while not done:
            action, _ = model.predict(obs, deterministic=True)
            next_obs, reward, done, truncated, info = env.step(action)
            trajectory.append({
                'obs': obs,
                'action': action,
                'reward': reward,
                'next_obs': next_obs
            })
            obs = next_obs
            done = done or truncated

        if not info.get('is_success', False):
            failures.append({
                'episode': episode_idx,
                'trajectory': trajectory,
                'final_state': obs,
                'reason': info.get('failure_reason', 'unknown')
            })

    # Analyze failure modes
    failure_reasons = Counter([f['reason'] for f in failures])
    print("Failure modes:")
    for reason, count in failure_reasons.most_common():
        print(f"  {reason}: {count} ({count/n_episodes*100:.1f}%)")

    return failures

Next Steps