πŸ“ AI Generated

Fine-Tuning Your First LLM with RL

A hands-on guide to fine-tuning a small language model using reinforcement learning. Build intuition by training a real model end-to-end.

Content Visibility

Fine-Tuning Your First LLM with RL

This guide walks you through fine-tuning a small language model using reinforcement learningβ€”from raw model to aligned assistant. You’ll implement everything from scratch, gaining deep intuition about what RLHF actually does under the hood.

ℹ️What You'll Build

By the end of this guide, you’ll have:

  • A fine-tuned LLM that answers questions in a specific style
  • A reward model trained on preference data
  • Complete PPO training pipeline
  • Practical understanding of KL penalties, reward hacking, and training dynamics

Time required: 2-3 hours (including training time on free Colab GPU)

Prerequisites

This guide assumes you’re comfortable with:

  • Python and PyTorch β€” you can write training loops
  • Transformers basics β€” you’ve used HuggingFace models before
  • RL fundamentals β€” you understand policies, rewards, and value functions

If you need to brush up on RL concepts, see:

The Big Picture

We’re going to teach a language model to answer questions in a helpful, concise style. The training has three phases:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                                                     β”‚
β”‚  1. SUPERVISED FINE-TUNING (SFT)                                   β”‚
β”‚     Start with a pre-trained model, show it examples of good       β”‚
β”‚     question-answer pairs. This teaches the format.                β”‚
β”‚                                                                     β”‚
β”‚  2. REWARD MODELING                                                β”‚
β”‚     Train a model to predict which answers humans prefer.          β”‚
β”‚     This captures "quality" in a differentiable signal.            β”‚
β”‚                                                                     β”‚
β”‚  3. RL OPTIMIZATION (PPO)                                          β”‚
β”‚     Use the reward model to guide the policy. The model            β”‚
β”‚     learns to generate answers that score highly.                  β”‚
β”‚                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Each phase builds on the previous. SFT gets the model into the right ballpark; reward modeling defines β€œgood”; PPO optimizes for it.

Our Training Setup

We’ll use:

  • Base model: Qwen/Qwen2.5-0.5B β€” small enough for free Colab, large enough to show interesting behavior
  • Dataset: Custom Q&A pairs with simulated preferences
  • Hardware: Single T4 GPU (free Colab tier)
  • Training time: ~1 hour total
πŸ’‘Why This Model?

Qwen2.5-0.5B is an excellent choice for learning:

  • 500M parameters β€” fits in Colab memory with room for training
  • Instruction-tuned variant available β€” we can compare before/after
  • Modern architecture β€” representative of current LLMs
  • Permissive license β€” free to experiment

Larger models follow the same process but need more compute.

Environment Setup

</>Implementation
# Install required packages
!pip install -q transformers datasets accelerate peft trl bitsandbytes
!pip install -q torch torchvision torchaudio

# Verify GPU is available
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

Expected output:

PyTorch version: 2.x.x
CUDA available: True
GPU: Tesla T4
Memory: 15.8 GB

Part 1: Supervised Fine-Tuning

The first step is teaching the model the format of good responses. We show it examples of well-structured Q&A pairs.

Why SFT First?

A raw pre-trained model predicts text based on internet data. It knows facts and language, but it doesn’t know:

  • That it should answer questions directly
  • What format users expect
  • How to be concise vs. verbose

SFT shows the model β€œhere’s what good answers look like.” It’s not optimizing for quality yetβ€”just teaching the pattern.

Think of it like teaching someone a new job. First they shadow an expert (SFT), then they get feedback on their own work (RL).

Creating Training Data

</>Implementation
# Example Q&A pairs for SFT
# In practice, you'd have hundreds or thousands

SFT_EXAMPLES = [
    {
        "question": "What is the capital of France?",
        "answer": "The capital of France is Paris."
    },
    {
        "question": "How does photosynthesis work?",
        "answer": "Photosynthesis is the process by which plants convert sunlight, water, and carbon dioxide into glucose and oxygen. It occurs in chloroplasts using chlorophyll pigments."
    },
    {
        "question": "What is machine learning?",
        "answer": "Machine learning is a subset of artificial intelligence where systems learn patterns from data rather than being explicitly programmed. Models improve through experience."
    },
    {
        "question": "Why is the sky blue?",
        "answer": "The sky appears blue because of Rayleigh scattering. Sunlight entering the atmosphere scatters off air molecules, with shorter blue wavelengths scattering more than longer red wavelengths."
    },
    {
        "question": "What is reinforcement learning?",
        "answer": "Reinforcement learning is a type of machine learning where an agent learns to make decisions by taking actions in an environment and receiving rewards or penalties. The goal is to maximize cumulative reward over time."
    },
]

def format_for_training(example):
    """Format as instruction-following prompt."""
    return f"""Question: {example['question']}

Answer: {example['answer']}"""

# Preview
print(format_for_training(SFT_EXAMPLES[0]))

Loading and Preparing the Model

</>Implementation
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

MODEL_NAME = "Qwen/Qwen2.5-0.5B"

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# Load model with memory-efficient settings
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16,  # Half precision saves memory
    device_map="auto"           # Automatically place on GPU
)

print(f"Model loaded: {sum(p.numel() for p in model.parameters()) / 1e6:.1f}M parameters")

Key insight: We use float16 (half precision) to fit more in memory. This is standard practice for fine-tuning.

Parameter-Efficient Fine-Tuning with LoRA

Fine-tuning all 500M parameters would be slow and memory-intensive. Instead, we use LoRA (Low-Rank Adaptation):

  • Freeze the original model weights
  • Add small trainable matrices to attention layers
  • Only train the small matrices (~1% of parameters)

The result is nearly as good as full fine-tuning, but much cheaper.

βˆ‘Mathematical Details

LoRA decomposes weight updates as low-rank matrices:

Wnew=Woriginal+Ξ”W=Woriginal+BAW_{new} = W_{original} + \Delta W = W_{original} + BA

where:

  • Woriginal∈RdΓ—kW_{original} \in \mathbb{R}^{d \times k} is the frozen original weight
  • B∈RdΓ—rB \in \mathbb{R}^{d \times r} and A∈RrΓ—kA \in \mathbb{R}^{r \times k} are trainable
  • r<<min⁑(d,k)r << \min(d, k) is the β€œrank” (typically 8-64)

This reduces trainable parameters from dΓ—kd \times k to rΓ—(d+k)r \times (d + k).

For a 4096Γ—4096 attention matrix with rank 16:

  • Full: 16.7M parameters
  • LoRA: 131K parameters (125Γ— fewer)
</>Implementation
from peft import LoraConfig, get_peft_model, TaskType

# LoRA configuration
lora_config = LoraConfig(
    r=16,                          # Rank of update matrices
    lora_alpha=32,                 # Scaling factor
    target_modules=["q_proj", "v_proj"],  # Which layers to adapt
    lora_dropout=0.1,              # Dropout for regularization
    bias="none",                   # Don't train biases
    task_type=TaskType.CAUSAL_LM
)

# Apply LoRA to the model
model = get_peft_model(model, lora_config)

# Check trainable parameters
trainable, total = model.get_nb_trainable_parameters()
print(f"Trainable: {trainable:,} / {total:,} ({100 * trainable / total:.2f}%)")

Expected output:

Trainable: 851,968 / 495,308,800 (0.17%)

We’re only training 0.17% of the model!

The SFT Training Loop

</>Implementation
from torch.utils.data import DataLoader, Dataset
from torch.optim import AdamW
from tqdm import tqdm

class QADataset(Dataset):
    def __init__(self, examples, tokenizer, max_length=256):
        self.examples = examples
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        text = format_for_training(self.examples[idx])
        encodings = self.tokenizer(
            text,
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt"
        )
        return {
            "input_ids": encodings["input_ids"].squeeze(),
            "attention_mask": encodings["attention_mask"].squeeze(),
            "labels": encodings["input_ids"].squeeze()  # For causal LM, labels = inputs
        }

# Create dataset and dataloader
sft_dataset = QADataset(SFT_EXAMPLES, tokenizer)
sft_dataloader = DataLoader(sft_dataset, batch_size=2, shuffle=True)

# Optimizer
optimizer = AdamW(model.parameters(), lr=1e-4)

# Training loop
model.train()
num_epochs = 3

for epoch in range(num_epochs):
    total_loss = 0
    for batch in tqdm(sft_dataloader, desc=f"Epoch {epoch+1}"):
        batch = {k: v.to(model.device) for k, v in batch.items()}

        outputs = model(**batch)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(sft_dataloader)
    print(f"Epoch {epoch+1}: Average loss = {avg_loss:.4f}")

Testing the SFT Model

</>Implementation
def generate_response(model, tokenizer, question, max_new_tokens=100):
    """Generate a response to a question."""
    prompt = f"Question: {question}\n\nAnswer:"

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            temperature=0.7,
            do_sample=True,
            pad_token_id=tokenizer.pad_token_id
        )

    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    # Extract just the answer part
    if "Answer:" in response:
        response = response.split("Answer:")[-1].strip()
    return response

# Test on a new question
test_question = "What is the difference between supervised and unsupervised learning?"
print(f"Question: {test_question}\n")
print(f"Answer: {generate_response(model, tokenizer, test_question)}")

At this point, the model can answer questions in the right format. But is it good? That’s where reward modeling comes in.


Part 2: Reward Modeling

The reward model learns to predict which responses humans prefer. It transforms subjective human judgment into a numerical signal.

Creating Preference Data

In real RLHF, humans compare pairs of responses and pick which is better. We’ll simulate this with programmatic rules that capture β€œquality”:

  • Prefer responses that are complete sentences
  • Prefer responses that directly answer the question
  • Penalize responses that are too short or too long
  • Penalize responses with filler phrases

This is obviously simplifiedβ€”real preference data captures much subtler patterns. But the mechanics are the same.

</>Implementation
import random

# Generate preference pairs by comparing model outputs
def create_preference_data(model, tokenizer, prompts, n_samples=4):
    """
    Generate responses and create preference pairs.

    For each prompt, generate n_samples responses and create
    all possible pairs with simulated preferences.
    """
    preference_data = []

    for prompt in prompts:
        # Generate multiple responses
        responses = []
        for _ in range(n_samples):
            response = generate_response(model, tokenizer, prompt, max_new_tokens=100)
            responses.append(response)

        # Create pairs with simulated preferences
        for i in range(len(responses)):
            for j in range(i + 1, len(responses)):
                # Simulate human preference based on heuristics
                score_i = score_response(prompt, responses[i])
                score_j = score_response(prompt, responses[j])

                if score_i > score_j:
                    chosen, rejected = responses[i], responses[j]
                elif score_j > score_i:
                    chosen, rejected = responses[j], responses[i]
                else:
                    continue  # Skip ties

                preference_data.append({
                    "prompt": prompt,
                    "chosen": chosen,
                    "rejected": rejected
                })

    return preference_data

def score_response(prompt, response):
    """
    Heuristic scoring function to simulate human preferences.

    In real RLHF, this comes from actual human annotations.
    """
    score = 0

    # Prefer complete sentences
    if response.endswith(('.', '!', '?')):
        score += 2

    # Prefer reasonable length (20-200 chars)
    length = len(response)
    if 20 <= length <= 200:
        score += 2
    elif length < 20:
        score -= 2  # Too short
    elif length > 300:
        score -= 1  # Too long

    # Penalize filler phrases
    filler_phrases = ["I think", "maybe", "perhaps", "it depends"]
    for phrase in filler_phrases:
        if phrase.lower() in response.lower():
            score -= 1

    # Reward direct answers
    if response and response[0].isupper():
        score += 1

    return score

# Create training prompts
TRAINING_PROMPTS = [
    "What is Python?",
    "How do neural networks learn?",
    "Explain gradient descent.",
    "What is the purpose of activation functions?",
    "How does backpropagation work?",
    "What is overfitting?",
    "Explain the bias-variance tradeoff.",
    "What is a loss function?",
    "How do transformers work?",
    "What is attention in deep learning?",
]

# Generate preference data
print("Generating preference data...")
preference_data = create_preference_data(model, tokenizer, TRAINING_PROMPTS, n_samples=4)
print(f"Created {len(preference_data)} preference pairs")

# Preview a pair
if preference_data:
    example = preference_data[0]
    print(f"\nExample preference pair:")
    print(f"Prompt: {example['prompt']}")
    print(f"Chosen: {example['chosen'][:100]}...")
    print(f"Rejected: {example['rejected'][:100]}...")

The Bradley-Terry Model

βˆ‘Mathematical Details

The reward model is trained using the Bradley-Terry model for pairwise comparisons. Given two responses y1y_1 and y2y_2 to prompt xx, the probability that y1y_1 is preferred:

P(y1≻y2∣x)=Οƒ(r(x,y1)βˆ’r(x,y2))P(y_1 \succ y_2 | x) = \sigma(r(x, y_1) - r(x, y_2))

where:

  • r(x,y)r(x, y) is the learned reward function
  • Οƒ\sigma is the sigmoid function

The training loss is the negative log-likelihood:

L=βˆ’E(x,yw,yl)[log⁑σ(r(x,yw)βˆ’r(x,yl))]L = -\mathbb{E}_{(x, y_w, y_l)} \left[ \log \sigma(r(x, y_w) - r(x, y_l)) \right]

where ywy_w is the winning (chosen) response and yly_l is the losing (rejected) response.

Key insight: We only learn relative rewards. The absolute scale doesn’t matterβ€”only the difference between responses.

For more on this, see Reward Modeling.

Building the Reward Model

</>Implementation
import torch.nn as nn
import torch.nn.functional as F

class RewardModel(nn.Module):
    """
    Reward model for RLHF.

    Takes (prompt, response) and outputs a scalar reward.
    Architecture: base LM + linear head on last token.
    """

    def __init__(self, base_model_name, device="cuda"):
        super().__init__()
        self.device = device

        # Load a fresh copy of the base model
        self.base = AutoModelForCausalLM.from_pretrained(
            base_model_name,
            torch_dtype=torch.float16,
            device_map="auto"
        )

        # Freeze base model
        for param in self.base.parameters():
            param.requires_grad = False

        # Add reward head
        hidden_size = self.base.config.hidden_size
        self.reward_head = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Linear(hidden_size // 2, 1)
        ).to(device).half()  # Match base model precision

    def forward(self, input_ids, attention_mask=None):
        """
        Compute reward for input sequence.

        Returns scalar reward per sequence in batch.
        """
        # Get hidden states from base model
        outputs = self.base(
            input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )

        # Use last token's hidden state
        hidden = outputs.hidden_states[-1]

        if attention_mask is not None:
            # Find position of last non-padding token
            seq_lengths = attention_mask.sum(dim=1) - 1
            batch_size = hidden.shape[0]
            last_hidden = hidden[torch.arange(batch_size, device=hidden.device), seq_lengths]
        else:
            last_hidden = hidden[:, -1, :]

        # Compute reward
        reward = self.reward_head(last_hidden).squeeze(-1)
        return reward

# Initialize reward model
reward_model = RewardModel(MODEL_NAME)
print(f"Reward model initialized")

Training the Reward Model

</>Implementation
class PreferenceDataset(Dataset):
    """Dataset for reward model training."""

    def __init__(self, preference_data, tokenizer, max_length=256):
        self.data = preference_data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]

        # Encode chosen response
        chosen_text = f"Question: {item['prompt']}\n\nAnswer: {item['chosen']}"
        chosen_enc = self.tokenizer(
            chosen_text,
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt"
        )

        # Encode rejected response
        rejected_text = f"Question: {item['prompt']}\n\nAnswer: {item['rejected']}"
        rejected_enc = self.tokenizer(
            rejected_text,
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt"
        )

        return {
            "chosen_ids": chosen_enc["input_ids"].squeeze(),
            "chosen_mask": chosen_enc["attention_mask"].squeeze(),
            "rejected_ids": rejected_enc["input_ids"].squeeze(),
            "rejected_mask": rejected_enc["attention_mask"].squeeze(),
        }

# Create dataset
rm_dataset = PreferenceDataset(preference_data, tokenizer)
rm_dataloader = DataLoader(rm_dataset, batch_size=2, shuffle=True)

# Training
rm_optimizer = AdamW(reward_model.reward_head.parameters(), lr=1e-4)

reward_model.train()
num_epochs = 5

for epoch in range(num_epochs):
    total_loss = 0
    total_correct = 0
    total_pairs = 0

    for batch in tqdm(rm_dataloader, desc=f"RM Epoch {epoch+1}"):
        # Move to device
        chosen_ids = batch["chosen_ids"].to(reward_model.device)
        chosen_mask = batch["chosen_mask"].to(reward_model.device)
        rejected_ids = batch["rejected_ids"].to(reward_model.device)
        rejected_mask = batch["rejected_mask"].to(reward_model.device)

        # Get rewards
        r_chosen = reward_model(chosen_ids, chosen_mask)
        r_rejected = reward_model(rejected_ids, rejected_mask)

        # Bradley-Terry loss
        loss = -F.logsigmoid(r_chosen - r_rejected).mean()

        # Backward pass
        rm_optimizer.zero_grad()
        loss.backward()
        rm_optimizer.step()

        # Track metrics
        total_loss += loss.item()
        total_correct += (r_chosen > r_rejected).sum().item()
        total_pairs += len(r_chosen)

    accuracy = total_correct / total_pairs
    avg_loss = total_loss / len(rm_dataloader)
    print(f"Epoch {epoch+1}: Loss = {avg_loss:.4f}, Accuracy = {accuracy:.2%}")

What to expect: Accuracy should climb above 70-80% as the reward model learns to distinguish good from bad responses.


Part 3: RL Optimization with PPO

Now we use the reward model to optimize the policy. This is where the magic happens.

The RLHF Objective

βˆ‘Mathematical Details

We want to maximize:

J(ΞΈ)=Ex∼D,yβˆΌΟ€ΞΈ[rΟ•(x,y)]βˆ’Ξ²β‹…DKL(πθβˆ₯Ο€ref)J(\theta) = \mathbb{E}_{x \sim D, y \sim \pi_\theta} \left[ r_\phi(x, y) \right] - \beta \cdot D_{KL}(\pi_\theta \| \pi_{ref})

where:

  • rΟ•(x,y)r_\phi(x, y) is the reward model score
  • Ο€ref\pi_{ref} is the SFT model (frozen reference)
  • Ξ²\beta is the KL penalty coefficient

The KL penalty is crucial: Without it, the policy will find degenerate outputs that fool the reward model but aren’t actually good. See PPO for Language Models for details.

Think of it as:

β€œGenerate responses that score highly, but don’t stray too far from what you learned in SFT.”

The KL penalty is like a leashβ€”it lets the model improve but prevents it from going crazy.

What happens without KL penalty:

  • Model finds weird tokens that spike reward
  • Responses become repetitive or nonsensical
  • Reward goes up but quality goes down

This is reward hacking, and it’s surprisingly easy to trigger.

Setting Up PPO

</>Implementation
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np

@dataclass
class PPOConfig:
    """Configuration for PPO training."""
    # PPO hyperparameters
    clip_range: float = 0.2
    ppo_epochs: int = 4

    # KL penalty
    kl_coef: float = 0.1
    target_kl: float = 0.02

    # Training
    learning_rate: float = 1e-5
    batch_size: int = 4
    max_new_tokens: int = 100
    temperature: float = 0.7

    # GAE
    gamma: float = 1.0
    lam: float = 0.95

config = PPOConfig()

The PPO Training Loop

</>Implementation
class PPOTrainer:
    """
    Simplified PPO trainer for RLHF.

    This implementation focuses on clarity over efficiency.
    Production code would optimize memory and compute.
    """

    def __init__(self, policy, ref_policy, reward_model, tokenizer, config):
        self.policy = policy
        self.ref_policy = ref_policy
        self.reward_model = reward_model
        self.tokenizer = tokenizer
        self.config = config

        # Freeze reference policy
        for param in self.ref_policy.parameters():
            param.requires_grad = False

        # Optimizer for policy only
        self.optimizer = AdamW(
            [p for p in self.policy.parameters() if p.requires_grad],
            lr=config.learning_rate
        )

        self.kl_coef = config.kl_coef

    def generate_and_score(self, prompts: List[str]) -> Dict:
        """
        Generate responses and compute rewards.

        Returns dict with responses, log probs, rewards, etc.
        """
        self.policy.eval()

        all_responses = []
        all_log_probs = []
        all_rewards = []
        all_ref_log_probs = []

        for prompt in prompts:
            formatted = f"Question: {prompt}\n\nAnswer:"
            inputs = self.tokenizer(formatted, return_tensors="pt").to(self.policy.device)

            # Generate with the policy
            with torch.no_grad():
                outputs = self.policy.generate(
                    **inputs,
                    max_new_tokens=self.config.max_new_tokens,
                    temperature=self.config.temperature,
                    do_sample=True,
                    return_dict_in_generate=True,
                    output_scores=True,
                    pad_token_id=self.tokenizer.pad_token_id
                )

            # Extract generated tokens (excluding prompt)
            prompt_len = inputs['input_ids'].shape[1]
            response_ids = outputs.sequences[0, prompt_len:]

            # Compute log probs for generated tokens
            log_probs = []
            for i, score in enumerate(outputs.scores):
                probs = F.softmax(score[0] / self.config.temperature, dim=-1)
                token_id = response_ids[i]
                log_prob = torch.log(probs[token_id] + 1e-10)
                log_probs.append(log_prob.item())

            # Get reference log probs
            with torch.no_grad():
                full_ids = outputs.sequences
                ref_outputs = self.ref_policy(full_ids)
                ref_logits = ref_outputs.logits[0, prompt_len-1:-1]  # Align with response
                ref_probs = F.softmax(ref_logits / self.config.temperature, dim=-1)
                ref_log_probs = [
                    torch.log(ref_probs[i, response_ids[i]] + 1e-10).item()
                    for i in range(len(response_ids))
                ]

            # Compute reward
            with torch.no_grad():
                full_text = self.tokenizer.decode(outputs.sequences[0])
                reward_input = self.tokenizer(
                    full_text, return_tensors="pt"
                ).to(self.reward_model.device)
                reward = self.reward_model(
                    reward_input['input_ids'],
                    reward_input['attention_mask']
                ).item()

            # Decode response
            response = self.tokenizer.decode(response_ids, skip_special_tokens=True)

            all_responses.append(response)
            all_log_probs.append(log_probs)
            all_ref_log_probs.append(ref_log_probs)
            all_rewards.append(reward)

        return {
            "prompts": prompts,
            "responses": all_responses,
            "log_probs": all_log_probs,
            "ref_log_probs": all_ref_log_probs,
            "rewards": all_rewards
        }

    def compute_advantages(self, rewards: List[float], log_probs: List[List[float]],
                          ref_log_probs: List[List[float]]) -> Tuple[List, List]:
        """
        Compute advantages with KL penalty.

        KL penalty is applied per-token, reward at end.
        """
        all_advantages = []
        all_returns = []

        for i in range(len(rewards)):
            n_tokens = len(log_probs[i])

            # Per-token rewards: KL penalty + final reward
            token_rewards = []
            for t in range(n_tokens):
                kl_penalty = -self.kl_coef * (log_probs[i][t] - ref_log_probs[i][t])
                r = kl_penalty
                if t == n_tokens - 1:  # Add reward score at last token
                    r += rewards[i]
                token_rewards.append(r)

            # Compute returns (simplified: no value function, just cumulative reward)
            returns = []
            cumulative = 0
            for r in reversed(token_rewards):
                cumulative = r + self.config.gamma * cumulative
                returns.insert(0, cumulative)

            # Advantages = returns (no baseline in this simplified version)
            advantages = returns

            all_advantages.append(advantages)
            all_returns.append(returns)

        return all_advantages, all_returns

    def ppo_update(self, data: Dict, advantages: List, returns: List) -> Dict:
        """
        Perform PPO policy update.
        """
        self.policy.train()

        metrics = {"pg_loss": [], "kl": [], "clip_frac": []}

        for epoch in range(self.config.ppo_epochs):
            for i, prompt in enumerate(data["prompts"]):
                if len(data["log_probs"][i]) == 0:
                    continue

                formatted = f"Question: {prompt}\n\nAnswer: {data['responses'][i]}"
                inputs = self.tokenizer(formatted, return_tensors="pt").to(self.policy.device)

                # Forward pass
                outputs = self.policy(**inputs)
                logits = outputs.logits

                # Get new log probs for response tokens
                prompt_formatted = f"Question: {prompt}\n\nAnswer:"
                prompt_len = len(self.tokenizer(prompt_formatted)['input_ids'])
                response_ids = inputs['input_ids'][0, prompt_len:]

                new_log_probs = []
                for t in range(len(response_ids)):
                    probs = F.softmax(logits[0, prompt_len + t - 1] / self.config.temperature, dim=-1)
                    lp = torch.log(probs[response_ids[t]] + 1e-10)
                    new_log_probs.append(lp)

                if len(new_log_probs) == 0:
                    continue

                new_log_probs = torch.stack(new_log_probs)
                old_log_probs = torch.tensor(data["log_probs"][i][:len(new_log_probs)],
                                            device=self.policy.device)
                advs = torch.tensor(advantages[i][:len(new_log_probs)],
                                   device=self.policy.device, dtype=torch.float16)

                # Normalize advantages
                advs = (advs - advs.mean()) / (advs.std() + 1e-8)

                # Probability ratio
                ratio = torch.exp(new_log_probs - old_log_probs)

                # Clipped objective
                pg_loss1 = -advs * ratio
                pg_loss2 = -advs * torch.clamp(ratio, 1 - self.config.clip_range,
                                               1 + self.config.clip_range)
                pg_loss = torch.max(pg_loss1, pg_loss2).mean()

                # Backward
                self.optimizer.zero_grad()
                pg_loss.backward()
                torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0)
                self.optimizer.step()

                # Track metrics
                with torch.no_grad():
                    kl = (old_log_probs - new_log_probs).mean().item()
                    clip_frac = ((ratio - 1).abs() > self.config.clip_range).float().mean().item()

                metrics["pg_loss"].append(pg_loss.item())
                metrics["kl"].append(kl)
                metrics["clip_frac"].append(clip_frac)

        return {k: np.mean(v) if v else 0 for k, v in metrics.items()}

    def train_step(self, prompts: List[str]) -> Dict:
        """Complete training step: generate, score, update."""
        # Generate and score
        data = self.generate_and_score(prompts)

        # Compute advantages
        advantages, returns = self.compute_advantages(
            data["rewards"], data["log_probs"], data["ref_log_probs"]
        )

        # PPO update
        metrics = self.ppo_update(data, advantages, returns)

        # Add reward stats
        metrics["mean_reward"] = np.mean(data["rewards"])
        metrics["mean_response_len"] = np.mean([len(r) for r in data["responses"]])

        # Adaptive KL coefficient
        if metrics["kl"] > self.config.target_kl * 1.5:
            self.kl_coef *= 1.2
        elif metrics["kl"] < self.config.target_kl / 1.5:
            self.kl_coef /= 1.2
        self.kl_coef = max(0.01, min(1.0, self.kl_coef))

        metrics["kl_coef"] = self.kl_coef

        return metrics

Running RL Training

</>Implementation
# Create reference policy (frozen copy of SFT model)
ref_policy = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16,
    device_map="auto"
)
# Note: In practice, you'd save and reload the SFT model as reference

# Initialize trainer
ppo_trainer = PPOTrainer(
    policy=model,
    ref_policy=ref_policy,
    reward_model=reward_model,
    tokenizer=tokenizer,
    config=config
)

# Training prompts (you'd want many more in practice)
rl_prompts = TRAINING_PROMPTS * 5  # Repeat for more training

# Training loop
num_iterations = 20

print("Starting RL training...")
for iteration in range(num_iterations):
    # Sample batch of prompts
    batch_prompts = random.sample(rl_prompts, min(config.batch_size, len(rl_prompts)))

    # Training step
    metrics = ppo_trainer.train_step(batch_prompts)

    # Logging
    print(f"Iteration {iteration+1}/{num_iterations}:")
    print(f"  Reward: {metrics['mean_reward']:.3f}")
    print(f"  KL: {metrics['kl']:.4f}")
    print(f"  PG Loss: {metrics['pg_loss']:.4f}")
    print(f"  KL Coef: {metrics['kl_coef']:.4f}")

What to look for:

  • Reward increasing: Model is learning to generate preferred responses
  • KL staying moderate: Model isn’t diverging too much from reference
  • Loss decreasing: Policy updates are working

Part 4: Evaluation and Analysis

Comparing Before and After

</>Implementation
# Evaluation prompts (different from training)
EVAL_PROMPTS = [
    "What is the difference between a list and a tuple in Python?",
    "Explain how convolutional neural networks work.",
    "What is the purpose of regularization in machine learning?",
    "How does batch normalization help training?",
    "What is the vanishing gradient problem?",
]

print("=" * 60)
print("COMPARISON: Before vs After RL Training")
print("=" * 60)

for prompt in EVAL_PROMPTS[:3]:
    print(f"\nQuestion: {prompt}\n")

    # Generate with reference (pre-RL)
    ref_response = generate_response(ref_policy, tokenizer, prompt)
    print(f"Before RL: {ref_response[:200]}...")

    # Get reward
    full_text = f"Question: {prompt}\n\nAnswer: {ref_response}"
    inputs = tokenizer(full_text, return_tensors="pt").to(reward_model.device)
    with torch.no_grad():
        ref_reward = reward_model(inputs['input_ids']).item()
    print(f"Reward: {ref_reward:.3f}")

    print()

    # Generate with trained policy
    rl_response = generate_response(model, tokenizer, prompt)
    print(f"After RL: {rl_response[:200]}...")

    # Get reward
    full_text = f"Question: {prompt}\n\nAnswer: {rl_response}"
    inputs = tokenizer(full_text, return_tensors="pt").to(reward_model.device)
    with torch.no_grad():
        rl_reward = reward_model(inputs['input_ids']).item()
    print(f"Reward: {rl_reward:.3f}")

    print("-" * 60)

Checking for Reward Hacking

</>Implementation
def check_for_reward_hacking(model, tokenizer, prompts):
    """Check for common reward hacking patterns."""
    responses = [generate_response(model, tokenizer, p) for p in prompts]

    print("Reward Hacking Check:")
    print("-" * 40)

    # Check for repetition
    unique_responses = set(responses)
    repetition_rate = 1 - len(unique_responses) / len(responses)
    print(f"Repetition rate: {repetition_rate:.1%}")
    if repetition_rate > 0.3:
        print("  ⚠️ High repetition - possible mode collapse")

    # Check for length distribution
    lengths = [len(r) for r in responses]
    avg_length = np.mean(lengths)
    std_length = np.std(lengths)
    print(f"Average length: {avg_length:.0f} chars (std: {std_length:.0f})")
    if avg_length > 300:
        print("  ⚠️ Responses are very long - possible length gaming")

    # Check for common patterns
    pattern_counts = {}
    for response in responses:
        # Check for overused phrases
        for phrase in ["I think", "In conclusion", "First,", "Additionally,"]:
            if phrase in response:
                pattern_counts[phrase] = pattern_counts.get(phrase, 0) + 1

    overused = [p for p, c in pattern_counts.items() if c > len(prompts) * 0.5]
    if overused:
        print(f"  ⚠️ Overused phrases: {overused}")

    print("-" * 40)
    return responses

# Run the check
test_prompts = [
    "What is a function?",
    "Explain variables.",
    "What is a class?",
    "How do loops work?",
    "What is an algorithm?",
]
_ = check_for_reward_hacking(model, tokenizer, test_prompts)

Key Takeaways

ℹ️What You've Learned
  1. SFT teaches format, RL teaches quality: SFT gets the model into the right ballpark; RL fine-tunes for what humans actually prefer.

  2. Reward models are imperfect proxies: They capture patterns in preferences, including biases. Always evaluate beyond the reward signal.

  3. KL penalty prevents reward hacking: Without it, models find degenerate optima. The penalty keeps the model grounded.

  4. LoRA makes fine-tuning accessible: You can train powerful models on consumer hardware by only updating a small fraction of parameters.

  5. Evaluation is hard: High reward doesn’t mean good quality. Always do qualitative evaluation alongside metrics.

Next Steps

To go deeper:

To experiment:

  • Try different reward model architectures
  • Experiment with KL coefficient values
  • Add more training data
  • Try DPO as an alternative to PPO

Production considerations:

  • Use larger models (7B+) for better quality
  • Collect real human preferences
  • Implement ensemble reward models
  • Add safety evaluations

Full Notebook

The complete, runnable notebook is available here:

Download Jupyter Notebook

Or run directly in Colab:

Open in Colab