Policy Gradient Methods

Introduction to Policy Gradient Methods

So far, we’ve focused on value-based methods that learn value functions and derive policies from them. Policy gradient methods take a different approach: they directly optimize the policy without explicitly computing value functions.

Key insight: Instead of learning “how good is this state?” (value), learn “what should I do in this state?” (policy).

Why Policy Gradient Methods?

Advantages over Value-Based Methods

  1. Natural handling of continuous action spaces
    • Value-based: Need to find \max_a Q(s,a) (difficult in continuous spaces)
    • Policy-based: Directly sample from \pi(a|s)
  2. Stochastic policies
    • Can naturally represent stochastic optimal policies
    • Built-in exploration through policy stochasticity
  3. Smoother convergence
    • Small parameter changes lead to small policy changes
    • More stable than value-based methods in some cases
  4. Works with function approximation
    • Can directly optimize parameterized policies
    • No need for separate value function approximation

Disadvantages

  1. High variance
    • Gradient estimates can be very noisy
    • Requires many samples
  2. Slow convergence
    • Generally slower than value-based methods
    • Can get stuck in local optima
  3. Sample efficiency
    • Typically less sample efficient than value-based methods

Policy Parameterization

We parameterize the policy as \pi(a|s, \boldsymbol{\theta}) where \boldsymbol{\theta} are the parameters.

Discrete Actions: Softmax

\pi(a|s, \boldsymbol{\theta}) = \frac{\exp(\boldsymbol{\theta}^T \boldsymbol{\phi}(s, a))}{\sum_{a'} \exp(\boldsymbol{\theta}^T \boldsymbol{\phi}(s, a'))}

Where \boldsymbol{\phi}(s, a) are state-action features.

Continuous Actions: Gaussian

\pi(a|s, \boldsymbol{\theta}) = \frac{1}{\sqrt{2\pi\sigma^2}} \exp\left(-\frac{(a - \mu(s, \boldsymbol{\theta}))^2}{2\sigma^2}\right)

Where \mu(s, \boldsymbol{\theta}) is the mean (policy output) and \sigma is the standard deviation.

The Policy Gradient Theorem

Objective: Maximize expected return J(\boldsymbol{\theta}) = \mathbb{E}_{\pi_\theta}[G_t]

Policy Gradient Theorem: \nabla_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) = \mathbb{E}_{\pi_\theta}\left[\nabla_{\boldsymbol{\theta}} \log \pi(A_t|S_t, \boldsymbol{\theta}) \cdot G_t\right]

Intuition: - If return G_t is high, increase probability of action A_t in state S_t - If return G_t is low, decrease probability of action A_t in state S_t

REINFORCE Algorithm

Monte Carlo Policy Gradient

Initialize policy parameters θ
For each episode:
    Generate episode: S₀, A₀, R₁, S₁, A₁, R₂, ..., Sₜ₋₁, Aₜ₋₁, Rₜ
    For t = 0, 1, ..., T-1:
        G = sum of discounted rewards from time t
        θ = θ + α * G * ∇_θ log π(A_t|S_t, θ)

Update rule: \boldsymbol{\theta}_{t+1} = \boldsymbol{\theta}_t + \alpha G_t \nabla_{\boldsymbol{\theta}} \log \pi(A_t|S_t, \boldsymbol{\theta}_t)

REINFORCE with Baseline

Problem: High variance in gradient estimates.

Solution: Subtract a baseline b(s) from returns: \boldsymbol{\theta}_{t+1} = \boldsymbol{\theta}_t + \alpha (G_t - b(S_t)) \nabla_{\boldsymbol{\theta}} \log \pi(A_t|S_t, \boldsymbol{\theta}_t)

Common baseline: State value function V(s)

Why this works: \mathbb{E}[b(S_t) \nabla_{\boldsymbol{\theta}} \log \pi(A_t|S_t, \boldsymbol{\theta})] = 0

Actor-Critic Methods

Key insight: Use value function to reduce variance.

Actor: Policy \pi(a|s, \boldsymbol{\theta}) (what to do) Critic: Value function V(s, \boldsymbol{w}) (how good is this state)

One-Step Actor-Critic

Update rules: - Critic: \boldsymbol{w}_{t+1} = \boldsymbol{w}_t + \alpha_w \delta_t \nabla_{\boldsymbol{w}} V(S_t, \boldsymbol{w}_t) - Actor: \boldsymbol{\theta}_{t+1} = \boldsymbol{\theta}_t + \alpha_\theta \delta_t \nabla_{\boldsymbol{\theta}} \log \pi(A_t|S_t, \boldsymbol{\theta}_t)

Where \delta_t = R_{t+1} + \gamma V(S_{t+1}, \boldsymbol{w}_t) - V(S_t, \boldsymbol{w}_t) is the TD error.

Algorithm:

Initialize actor θ and critic w
For each episode:
    Initialize S
    For each step:
        Choose A ~ π(·|S, θ)
        Take action A, observe R, S'
        δ = R + γV(S', w) - V(S, w)
        w = w + α_w * δ * ∇_w V(S, w)
        θ = θ + α_θ * δ * ∇_θ log π(A|S, θ)
        S = S'

Advanced Policy Gradient Methods

Natural Policy Gradients

Problem: Parameter space doesn’t match policy space.

Solution: Use natural gradients that account for the geometry of the policy space.

Natural gradient: \tilde{\nabla}_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) = F(\boldsymbol{\theta})^{-1} \nabla_{\boldsymbol{\theta}} J(\boldsymbol{\theta})

Where F(\boldsymbol{\theta}) is the Fisher information matrix.

Trust Region Policy Optimization (TRPO)

Problem: Large policy updates can be harmful.

Solution: Constrain policy updates to a trust region: \max_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) \text{ subject to } KL(\pi_{\boldsymbol{\theta}_{\text{old}}} || \pi_{\boldsymbol{\theta}}) \leq \delta

Proximal Policy Optimization (PPO)

Simplified version of TRPO:

Clipped objective: L^{CLIP}(\boldsymbol{\theta}) = \mathbb{E}_t\left[\min\left(\frac{\pi_{\boldsymbol{\theta}}(A_t|S_t)}{\pi_{\boldsymbol{\theta}_{\text{old}}}(A_t|S_t)} A_t, \text{clip}\left(\frac{\pi_{\boldsymbol{\theta}}(A_t|S_t)}{\pi_{\boldsymbol{\theta}_{\text{old}}}(A_t|S_t)}, 1-\epsilon, 1+\epsilon\right) A_t\right)\right]

Where A_t is the advantage function and \epsilon is the clipping parameter.

Deterministic Policy Gradients

For continuous control: Use deterministic policies \mu(s|\boldsymbol{\theta})

Policy gradient: \nabla_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) = \mathbb{E}_{s \sim \rho}\left[\nabla_{\boldsymbol{\theta}} \mu(s|\boldsymbol{\theta}) \nabla_a Q(s,a)|_{a=\mu(s|\boldsymbol{\theta})}\right]

Deep Deterministic Policy Gradient (DDPG): - Actor: \mu(s|\boldsymbol{\theta}^\mu) (deterministic policy) - Critic: Q(s,a|\boldsymbol{\theta}^Q) (action-value function)

Interactive Example: REINFORCE on CartPole

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

# Simple CartPole-like environment
class SimpleCartPole:
    def __init__(self):
        self.reset()
        self.max_steps = 200
        
    def reset(self):
        self.state = np.random.uniform(-0.1, 0.1, 4)  # [position, velocity, angle, angular_velocity]
        self.steps = 0
        return self.state
    
    def step(self, action):
        # Simple physics simulation
        force = 10.0 if action == 1 else -10.0
        
        # Update state (simplified dynamics)
        x, x_dot, theta, theta_dot = self.state
        
        # Angular acceleration
        theta_ddot = (9.8 * np.sin(theta) + np.cos(theta) * (-force - 0.1 * theta_dot**2 * np.sin(theta))) / (4.0/3.0 - 0.1 * np.cos(theta)**2)
        
        # Linear acceleration
        x_ddot = (force + 0.1 * (theta_dot**2 * np.sin(theta) - theta_ddot * np.cos(theta))) / 1.1
        
        # Update velocities and positions
        dt = 0.02
        x_dot += x_ddot * dt
        x += x_dot * dt
        theta_dot += theta_ddot * dt
        theta += theta_dot * dt
        
        self.state = np.array([x, x_dot, theta, theta_dot])
        self.steps += 1
        
        # Check if episode is done
        done = (abs(x) > 2.4 or abs(theta) > 0.5 or self.steps >= self.max_steps)
        
        # Reward: +1 for each step the pole stays upright
        reward = 1.0 if not done else 0.0
        
        return self.state, reward, done

class PolicyNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=128):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size),
            nn.Softmax(dim=-1)
        )
    
    def forward(self, state):
        return self.network(state)

class REINFORCE:
    def __init__(self, state_size, action_size, lr=0.001):
        self.policy = PolicyNetwork(state_size, action_size)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        
    def select_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)
    
    def update(self, log_probs, rewards, gamma=0.99):
        # Calculate returns (discounted cumulative rewards)
        returns = []
        G = 0
        for reward in reversed(rewards):
            G = reward + gamma * G
            returns.insert(0, G)
        
        # Convert to tensor and normalize
        returns = torch.tensor(returns, dtype=torch.float32)
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        
        # Calculate policy loss
        policy_loss = []
        for log_prob, G in zip(log_probs, returns):
            policy_loss.append(-log_prob * G)
        
        # Update policy
        self.optimizer.zero_grad()
        policy_loss = torch.stack(policy_loss).sum()
        policy_loss.backward()
        self.optimizer.step()
        
        return policy_loss.item()

class REINFORCEWithBaseline:
    def __init__(self, state_size, action_size, lr=0.001):
        self.policy = PolicyNetwork(state_size, action_size)
        self.value_net = nn.Sequential(
            nn.Linear(state_size, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
    
    def select_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)
    
    def update(self, states, log_probs, rewards, gamma=0.99):
        # Calculate returns
        returns = []
        G = 0
        for reward in reversed(rewards):
            G = reward + gamma * G
            returns.insert(0, G)
        
        states = torch.tensor(np.array(states), dtype=torch.float32)
        returns = torch.tensor(returns, dtype=torch.float32)
        
        # Get value estimates
        values = self.value_net(states).squeeze()
        
        # Calculate advantages
        advantages = returns - values
        
        # Update value network
        value_loss = nn.MSELoss()(values, returns)
        self.value_optimizer.zero_grad()
        value_loss.backward()
        self.value_optimizer.step()
        
        # Update policy network
        policy_loss = []
        for log_prob, advantage in zip(log_probs, advantages.detach()):
            policy_loss.append(-log_prob * advantage)
        
        self.policy_optimizer.zero_grad()
        policy_loss = torch.stack(policy_loss).sum()
        policy_loss.backward()
        self.policy_optimizer.step()
        
        return policy_loss.item(), value_loss.item()

def train_reinforce(agent_type='vanilla', num_episodes=500):
    env = SimpleCartPole()
    
    if agent_type == 'vanilla':
        agent = REINFORCE(state_size=4, action_size=2, lr=0.01)
    else:
        agent = REINFORCEWithBaseline(state_size=4, action_size=2, lr=0.01)
    
    episode_rewards = []
    episode_lengths = []
    policy_losses = []
    value_losses = []
    
    for episode in range(num_episodes):
        state = env.reset()
        log_probs = []
        rewards = []
        states = []
        
        while True:
            action, log_prob = agent.select_action(state)
            next_state, reward, done = env.step(action)
            
            log_probs.append(log_prob)
            rewards.append(reward)
            states.append(state)
            
            state = next_state
            
            if done:
                break
        
        # Update agent
        if agent_type == 'vanilla':
            policy_loss = agent.update(log_probs, rewards)
            policy_losses.append(policy_loss)
            value_losses.append(0)  # No value network
        else:
            policy_loss, value_loss = agent.update(states, log_probs, rewards)
            policy_losses.append(policy_loss)
            value_losses.append(value_loss)
        
        episode_rewards.append(sum(rewards))
        episode_lengths.append(len(rewards))
        
        if episode % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")
    
    return episode_rewards, episode_lengths, policy_losses, value_losses

# Train both variants
print("Training REINFORCE (vanilla)...")
vanilla_rewards, vanilla_lengths, vanilla_p_losses, _ = train_reinforce('vanilla', 400)

print("\nTraining REINFORCE with Baseline...")
baseline_rewards, baseline_lengths, baseline_p_losses, baseline_v_losses = train_reinforce('baseline', 400)

# Create comprehensive visualizations
fig, axes = plt.subplots(2, 3, figsize=(18, 10))

# Learning curves
window = 20
vanilla_smooth = np.convolve(vanilla_rewards, np.ones(window)/window, mode='valid')
baseline_smooth = np.convolve(baseline_rewards, np.ones(window)/window, mode='valid')

axes[0, 0].plot(range(window-1, len(vanilla_rewards)), vanilla_smooth, 
                label='REINFORCE', linewidth=2, color='blue')
axes[0, 0].plot(range(window-1, len(baseline_rewards)), baseline_smooth, 
                label='REINFORCE + Baseline', linewidth=2, color='red')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Episode Reward')
axes[0, 0].set_title('Learning Progress (20-episode moving average)')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Episode lengths
vanilla_length_smooth = np.convolve(vanilla_lengths, np.ones(window)/window, mode='valid')
baseline_length_smooth = np.convolve(baseline_lengths, np.ones(window)/window, mode='valid')

axes[0, 1].plot(range(window-1, len(vanilla_lengths)), vanilla_length_smooth, 
                label='REINFORCE', linewidth=2, color='blue')
axes[0, 1].plot(range(window-1, len(baseline_lengths)), baseline_length_smooth, 
                label='REINFORCE + Baseline', linewidth=2, color='red')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Episode Length')
axes[0, 1].set_title('Episode Length Over Time')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Policy losses
vanilla_p_smooth = np.convolve(vanilla_p_losses, np.ones(window)/window, mode='valid')
baseline_p_smooth = np.convolve(baseline_p_losses, np.ones(window)/window, mode='valid')

axes[0, 2].plot(range(window-1, len(vanilla_p_losses)), vanilla_p_smooth, 
                label='REINFORCE', linewidth=2, color='blue')
axes[0, 2].plot(range(window-1, len(baseline_p_losses)), baseline_p_smooth, 
                label='REINFORCE + Baseline', linewidth=2, color='red')
axes[0, 2].set_xlabel('Episode')
axes[0, 2].set_ylabel('Policy Loss')
axes[0, 2].set_title('Policy Loss Over Time')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)

# Value loss for baseline agent
baseline_v_smooth = np.convolve(baseline_v_losses, np.ones(window)/window, mode='valid')
axes[1, 0].plot(range(window-1, len(baseline_v_losses)), baseline_v_smooth, 
                linewidth=2, color='green')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Value Loss')
axes[1, 0].set_title('Value Network Loss (Baseline Agent)')
axes[1, 0].grid(True, alpha=0.3)

# Policy visualization - show action probabilities over different states
def visualize_policy(agent, title):
    test_states = []
    for pos in [-1, 0, 1]:
        for angle in [-0.2, 0, 0.2]:
            test_states.append([pos, 0, angle, 0])  # position, velocity, angle, angular_velocity
    
    test_states = np.array(test_states)
    
    with torch.no_grad():
        state_tensor = torch.tensor(test_states, dtype=torch.float32)
        if hasattr(agent, 'policy'):
            action_probs = agent.policy(state_tensor).numpy()
        else:
            action_probs = agent.policy(state_tensor).numpy()
    
    return action_probs

# Create a trained baseline agent for visualization
env = SimpleCartPole()
vis_agent = REINFORCEWithBaseline(state_size=4, action_size=2, lr=0.01)

# Train briefly for visualization
for _ in range(100):
    state = env.reset()
    log_probs, rewards, states = [], [], []
    
    while True:
        action, log_prob = vis_agent.select_action(state)
        next_state, reward, done = env.step(action)
        
        log_probs.append(log_prob)
        rewards.append(reward)
        states.append(state)
        
        state = next_state
        if done:
            break
    
    vis_agent.update(states, log_probs, rewards)

# Visualize learned policy
action_probs = visualize_policy(vis_agent, 'Trained Policy')
im = axes[1, 1].imshow(action_probs.T, cmap='RdYlBu', aspect='auto')
axes[1, 1].set_title('Action Probabilities\n(Blue=Left, Red=Right)')
axes[1, 1].set_xlabel('State Configuration')
axes[1, 1].set_ylabel('Action')
axes[1, 1].set_yticks([0, 1])
axes[1, 1].set_yticklabels(['Left', 'Right'])
plt.colorbar(im, ax=axes[1, 1])

# Performance comparison
final_vanilla = np.mean(vanilla_rewards[-50:])
final_baseline = np.mean(baseline_rewards[-50:])

axes[1, 2].bar(['REINFORCE', 'REINFORCE\n+ Baseline'], [final_vanilla, final_baseline], 
               color=['blue', 'red'], alpha=0.7)
axes[1, 2].set_ylabel('Final Average Reward')
axes[1, 2].set_title('Final Performance Comparison\n(Last 50 Episodes)')
axes[1, 2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nFinal Performance Summary:")
print(f"REINFORCE (vanilla): {final_vanilla:.2f}")
print(f"REINFORCE + Baseline: {final_baseline:.2f}")
print(f"Improvement with baseline: {((final_baseline - final_vanilla) / final_vanilla * 100):.1f}%")

print(f"\nKey Insights:")
print(f"• Baseline reduces variance and improves learning stability")
print(f"• Policy gradients learn stochastic policies directly")
print(f"• Higher rewards lead to increased action probabilities")
print(f"• Variance reduction is crucial for policy gradient methods")
Training REINFORCE (vanilla)...
Episode 0, Average Reward: 34.00
Episode 100, Average Reward: 16.28
Episode 200, Average Reward: 16.03
Episode 300, Average Reward: 16.04

Training REINFORCE with Baseline...
Episode 0, Average Reward: 30.00
Episode 100, Average Reward: 42.02
Episode 200, Average Reward: 16.44
Episode 300, Average Reward: 29.41

REINFORCE Learning on CartPole Environment

Final Performance Summary:
REINFORCE (vanilla): 15.90
REINFORCE + Baseline: 82.18
Improvement with baseline: 416.9%

Key Insights:
• Baseline reduces variance and improves learning stability
• Policy gradients learn stochastic policies directly
• Higher rewards lead to increased action probabilities
• Variance reduction is crucial for policy gradient methods

Applications

  1. Continuous control: Robot locomotion, manipulation
  2. Game playing: AlphaGo, OpenAI Five
  3. Dialogue systems: Conversational AI
  4. Recommendation systems: Content recommendation
  5. Autonomous driving: Path planning, decision making

Key Takeaways

  1. Direct policy optimization: Learn policy directly without value functions
  2. Natural for continuous actions: Handle continuous action spaces naturally
  3. High variance challenge: Requires variance reduction techniques
  4. Actor-critic combination: Combines benefits of policy and value methods
  5. Modern deep RL: Foundation for state-of-the-art algorithms

Policy gradient methods provide a powerful framework for reinforcement learning, especially in continuous control and when stochastic policies are beneficial. They form the foundation for many modern deep RL algorithms!