Fine-Tuning Your First LLM with RL
This guide walks you through fine-tuning a small language model using reinforcement learningβfrom raw model to aligned assistant. Youβll implement everything from scratch, gaining deep intuition about what RLHF actually does under the hood.
By the end of this guide, youβll have:
- A fine-tuned LLM that answers questions in a specific style
- A reward model trained on preference data
- Complete PPO training pipeline
- Practical understanding of KL penalties, reward hacking, and training dynamics
Time required: 2-3 hours (including training time on free Colab GPU)
Prerequisites
This guide assumes youβre comfortable with:
- Python and PyTorch β you can write training loops
- Transformers basics β youβve used HuggingFace models before
- RL fundamentals β you understand policies, rewards, and value functions
If you need to brush up on RL concepts, see:
- Policy Gradient Methods β how policies are optimized
- PPO β the algorithm weβll use
- RLHF: Reward Modeling β learning from preferences
The Big Picture
Weβre going to teach a language model to answer questions in a helpful, concise style. The training has three phases:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β 1. SUPERVISED FINE-TUNING (SFT) β
β Start with a pre-trained model, show it examples of good β
β question-answer pairs. This teaches the format. β
β β
β 2. REWARD MODELING β
β Train a model to predict which answers humans prefer. β
β This captures "quality" in a differentiable signal. β
β β
β 3. RL OPTIMIZATION (PPO) β
β Use the reward model to guide the policy. The model β
β learns to generate answers that score highly. β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββEach phase builds on the previous. SFT gets the model into the right ballpark; reward modeling defines βgoodβ; PPO optimizes for it.
Our Training Setup
Weβll use:
- Base model:
Qwen/Qwen2.5-0.5Bβ small enough for free Colab, large enough to show interesting behavior - Dataset: Custom Q&A pairs with simulated preferences
- Hardware: Single T4 GPU (free Colab tier)
- Training time: ~1 hour total
Qwen2.5-0.5B is an excellent choice for learning:
- 500M parameters β fits in Colab memory with room for training
- Instruction-tuned variant available β we can compare before/after
- Modern architecture β representative of current LLMs
- Permissive license β free to experiment
Larger models follow the same process but need more compute.
Environment Setup
# Install required packages
!pip install -q transformers datasets accelerate peft trl bitsandbytes
!pip install -q torch torchvision torchaudio
# Verify GPU is available
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")Expected output:
PyTorch version: 2.x.x
CUDA available: True
GPU: Tesla T4
Memory: 15.8 GBPart 1: Supervised Fine-Tuning
The first step is teaching the model the format of good responses. We show it examples of well-structured Q&A pairs.
Why SFT First?
A raw pre-trained model predicts text based on internet data. It knows facts and language, but it doesnβt know:
- That it should answer questions directly
- What format users expect
- How to be concise vs. verbose
SFT shows the model βhereβs what good answers look like.β Itβs not optimizing for quality yetβjust teaching the pattern.
Think of it like teaching someone a new job. First they shadow an expert (SFT), then they get feedback on their own work (RL).
Creating Training Data
# Example Q&A pairs for SFT
# In practice, you'd have hundreds or thousands
SFT_EXAMPLES = [
{
"question": "What is the capital of France?",
"answer": "The capital of France is Paris."
},
{
"question": "How does photosynthesis work?",
"answer": "Photosynthesis is the process by which plants convert sunlight, water, and carbon dioxide into glucose and oxygen. It occurs in chloroplasts using chlorophyll pigments."
},
{
"question": "What is machine learning?",
"answer": "Machine learning is a subset of artificial intelligence where systems learn patterns from data rather than being explicitly programmed. Models improve through experience."
},
{
"question": "Why is the sky blue?",
"answer": "The sky appears blue because of Rayleigh scattering. Sunlight entering the atmosphere scatters off air molecules, with shorter blue wavelengths scattering more than longer red wavelengths."
},
{
"question": "What is reinforcement learning?",
"answer": "Reinforcement learning is a type of machine learning where an agent learns to make decisions by taking actions in an environment and receiving rewards or penalties. The goal is to maximize cumulative reward over time."
},
]
def format_for_training(example):
"""Format as instruction-following prompt."""
return f"""Question: {example['question']}
Answer: {example['answer']}"""
# Preview
print(format_for_training(SFT_EXAMPLES[0]))Loading and Preparing the Model
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
MODEL_NAME = "Qwen/Qwen2.5-0.5B"
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model with memory-efficient settings
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
torch_dtype=torch.float16, # Half precision saves memory
device_map="auto" # Automatically place on GPU
)
print(f"Model loaded: {sum(p.numel() for p in model.parameters()) / 1e6:.1f}M parameters")Key insight: We use float16 (half precision) to fit more in memory. This is standard practice for fine-tuning.
Parameter-Efficient Fine-Tuning with LoRA
Fine-tuning all 500M parameters would be slow and memory-intensive. Instead, we use LoRA (Low-Rank Adaptation):
- Freeze the original model weights
- Add small trainable matrices to attention layers
- Only train the small matrices (~1% of parameters)
The result is nearly as good as full fine-tuning, but much cheaper.
LoRA decomposes weight updates as low-rank matrices:
where:
- is the frozen original weight
- and are trainable
- is the βrankβ (typically 8-64)
This reduces trainable parameters from to .
For a 4096Γ4096 attention matrix with rank 16:
- Full: 16.7M parameters
- LoRA: 131K parameters (125Γ fewer)
from peft import LoraConfig, get_peft_model, TaskType
# LoRA configuration
lora_config = LoraConfig(
r=16, # Rank of update matrices
lora_alpha=32, # Scaling factor
target_modules=["q_proj", "v_proj"], # Which layers to adapt
lora_dropout=0.1, # Dropout for regularization
bias="none", # Don't train biases
task_type=TaskType.CAUSAL_LM
)
# Apply LoRA to the model
model = get_peft_model(model, lora_config)
# Check trainable parameters
trainable, total = model.get_nb_trainable_parameters()
print(f"Trainable: {trainable:,} / {total:,} ({100 * trainable / total:.2f}%)")Expected output:
Trainable: 851,968 / 495,308,800 (0.17%)Weβre only training 0.17% of the model!
The SFT Training Loop
from torch.utils.data import DataLoader, Dataset
from torch.optim import AdamW
from tqdm import tqdm
class QADataset(Dataset):
def __init__(self, examples, tokenizer, max_length=256):
self.examples = examples
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
text = format_for_training(self.examples[idx])
encodings = self.tokenizer(
text,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
return {
"input_ids": encodings["input_ids"].squeeze(),
"attention_mask": encodings["attention_mask"].squeeze(),
"labels": encodings["input_ids"].squeeze() # For causal LM, labels = inputs
}
# Create dataset and dataloader
sft_dataset = QADataset(SFT_EXAMPLES, tokenizer)
sft_dataloader = DataLoader(sft_dataset, batch_size=2, shuffle=True)
# Optimizer
optimizer = AdamW(model.parameters(), lr=1e-4)
# Training loop
model.train()
num_epochs = 3
for epoch in range(num_epochs):
total_loss = 0
for batch in tqdm(sft_dataloader, desc=f"Epoch {epoch+1}"):
batch = {k: v.to(model.device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(sft_dataloader)
print(f"Epoch {epoch+1}: Average loss = {avg_loss:.4f}")Testing the SFT Model
def generate_response(model, tokenizer, question, max_new_tokens=100):
"""Generate a response to a question."""
prompt = f"Question: {question}\n\nAnswer:"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=0.7,
do_sample=True,
pad_token_id=tokenizer.pad_token_id
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract just the answer part
if "Answer:" in response:
response = response.split("Answer:")[-1].strip()
return response
# Test on a new question
test_question = "What is the difference between supervised and unsupervised learning?"
print(f"Question: {test_question}\n")
print(f"Answer: {generate_response(model, tokenizer, test_question)}")At this point, the model can answer questions in the right format. But is it good? Thatβs where reward modeling comes in.
Part 2: Reward Modeling
The reward model learns to predict which responses humans prefer. It transforms subjective human judgment into a numerical signal.
Creating Preference Data
In real RLHF, humans compare pairs of responses and pick which is better. Weβll simulate this with programmatic rules that capture βqualityβ:
- Prefer responses that are complete sentences
- Prefer responses that directly answer the question
- Penalize responses that are too short or too long
- Penalize responses with filler phrases
This is obviously simplifiedβreal preference data captures much subtler patterns. But the mechanics are the same.
import random
# Generate preference pairs by comparing model outputs
def create_preference_data(model, tokenizer, prompts, n_samples=4):
"""
Generate responses and create preference pairs.
For each prompt, generate n_samples responses and create
all possible pairs with simulated preferences.
"""
preference_data = []
for prompt in prompts:
# Generate multiple responses
responses = []
for _ in range(n_samples):
response = generate_response(model, tokenizer, prompt, max_new_tokens=100)
responses.append(response)
# Create pairs with simulated preferences
for i in range(len(responses)):
for j in range(i + 1, len(responses)):
# Simulate human preference based on heuristics
score_i = score_response(prompt, responses[i])
score_j = score_response(prompt, responses[j])
if score_i > score_j:
chosen, rejected = responses[i], responses[j]
elif score_j > score_i:
chosen, rejected = responses[j], responses[i]
else:
continue # Skip ties
preference_data.append({
"prompt": prompt,
"chosen": chosen,
"rejected": rejected
})
return preference_data
def score_response(prompt, response):
"""
Heuristic scoring function to simulate human preferences.
In real RLHF, this comes from actual human annotations.
"""
score = 0
# Prefer complete sentences
if response.endswith(('.', '!', '?')):
score += 2
# Prefer reasonable length (20-200 chars)
length = len(response)
if 20 <= length <= 200:
score += 2
elif length < 20:
score -= 2 # Too short
elif length > 300:
score -= 1 # Too long
# Penalize filler phrases
filler_phrases = ["I think", "maybe", "perhaps", "it depends"]
for phrase in filler_phrases:
if phrase.lower() in response.lower():
score -= 1
# Reward direct answers
if response and response[0].isupper():
score += 1
return score
# Create training prompts
TRAINING_PROMPTS = [
"What is Python?",
"How do neural networks learn?",
"Explain gradient descent.",
"What is the purpose of activation functions?",
"How does backpropagation work?",
"What is overfitting?",
"Explain the bias-variance tradeoff.",
"What is a loss function?",
"How do transformers work?",
"What is attention in deep learning?",
]
# Generate preference data
print("Generating preference data...")
preference_data = create_preference_data(model, tokenizer, TRAINING_PROMPTS, n_samples=4)
print(f"Created {len(preference_data)} preference pairs")
# Preview a pair
if preference_data:
example = preference_data[0]
print(f"\nExample preference pair:")
print(f"Prompt: {example['prompt']}")
print(f"Chosen: {example['chosen'][:100]}...")
print(f"Rejected: {example['rejected'][:100]}...")The Bradley-Terry Model
The reward model is trained using the Bradley-Terry model for pairwise comparisons. Given two responses and to prompt , the probability that is preferred:
where:
- is the learned reward function
- is the sigmoid function
The training loss is the negative log-likelihood:
where is the winning (chosen) response and is the losing (rejected) response.
Key insight: We only learn relative rewards. The absolute scale doesnβt matterβonly the difference between responses.
For more on this, see Reward Modeling.
Building the Reward Model
import torch.nn as nn
import torch.nn.functional as F
class RewardModel(nn.Module):
"""
Reward model for RLHF.
Takes (prompt, response) and outputs a scalar reward.
Architecture: base LM + linear head on last token.
"""
def __init__(self, base_model_name, device="cuda"):
super().__init__()
self.device = device
# Load a fresh copy of the base model
self.base = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16,
device_map="auto"
)
# Freeze base model
for param in self.base.parameters():
param.requires_grad = False
# Add reward head
hidden_size = self.base.config.hidden_size
self.reward_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Linear(hidden_size // 2, 1)
).to(device).half() # Match base model precision
def forward(self, input_ids, attention_mask=None):
"""
Compute reward for input sequence.
Returns scalar reward per sequence in batch.
"""
# Get hidden states from base model
outputs = self.base(
input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
# Use last token's hidden state
hidden = outputs.hidden_states[-1]
if attention_mask is not None:
# Find position of last non-padding token
seq_lengths = attention_mask.sum(dim=1) - 1
batch_size = hidden.shape[0]
last_hidden = hidden[torch.arange(batch_size, device=hidden.device), seq_lengths]
else:
last_hidden = hidden[:, -1, :]
# Compute reward
reward = self.reward_head(last_hidden).squeeze(-1)
return reward
# Initialize reward model
reward_model = RewardModel(MODEL_NAME)
print(f"Reward model initialized")Training the Reward Model
class PreferenceDataset(Dataset):
"""Dataset for reward model training."""
def __init__(self, preference_data, tokenizer, max_length=256):
self.data = preference_data
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
# Encode chosen response
chosen_text = f"Question: {item['prompt']}\n\nAnswer: {item['chosen']}"
chosen_enc = self.tokenizer(
chosen_text,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
# Encode rejected response
rejected_text = f"Question: {item['prompt']}\n\nAnswer: {item['rejected']}"
rejected_enc = self.tokenizer(
rejected_text,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
return {
"chosen_ids": chosen_enc["input_ids"].squeeze(),
"chosen_mask": chosen_enc["attention_mask"].squeeze(),
"rejected_ids": rejected_enc["input_ids"].squeeze(),
"rejected_mask": rejected_enc["attention_mask"].squeeze(),
}
# Create dataset
rm_dataset = PreferenceDataset(preference_data, tokenizer)
rm_dataloader = DataLoader(rm_dataset, batch_size=2, shuffle=True)
# Training
rm_optimizer = AdamW(reward_model.reward_head.parameters(), lr=1e-4)
reward_model.train()
num_epochs = 5
for epoch in range(num_epochs):
total_loss = 0
total_correct = 0
total_pairs = 0
for batch in tqdm(rm_dataloader, desc=f"RM Epoch {epoch+1}"):
# Move to device
chosen_ids = batch["chosen_ids"].to(reward_model.device)
chosen_mask = batch["chosen_mask"].to(reward_model.device)
rejected_ids = batch["rejected_ids"].to(reward_model.device)
rejected_mask = batch["rejected_mask"].to(reward_model.device)
# Get rewards
r_chosen = reward_model(chosen_ids, chosen_mask)
r_rejected = reward_model(rejected_ids, rejected_mask)
# Bradley-Terry loss
loss = -F.logsigmoid(r_chosen - r_rejected).mean()
# Backward pass
rm_optimizer.zero_grad()
loss.backward()
rm_optimizer.step()
# Track metrics
total_loss += loss.item()
total_correct += (r_chosen > r_rejected).sum().item()
total_pairs += len(r_chosen)
accuracy = total_correct / total_pairs
avg_loss = total_loss / len(rm_dataloader)
print(f"Epoch {epoch+1}: Loss = {avg_loss:.4f}, Accuracy = {accuracy:.2%}")What to expect: Accuracy should climb above 70-80% as the reward model learns to distinguish good from bad responses.
Part 3: RL Optimization with PPO
Now we use the reward model to optimize the policy. This is where the magic happens.
The RLHF Objective
We want to maximize:
where:
- is the reward model score
- is the SFT model (frozen reference)
- is the KL penalty coefficient
The KL penalty is crucial: Without it, the policy will find degenerate outputs that fool the reward model but arenβt actually good. See PPO for Language Models for details.
Think of it as:
βGenerate responses that score highly, but donβt stray too far from what you learned in SFT.β
The KL penalty is like a leashβit lets the model improve but prevents it from going crazy.
What happens without KL penalty:
- Model finds weird tokens that spike reward
- Responses become repetitive or nonsensical
- Reward goes up but quality goes down
This is reward hacking, and itβs surprisingly easy to trigger.
Setting Up PPO
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np
@dataclass
class PPOConfig:
"""Configuration for PPO training."""
# PPO hyperparameters
clip_range: float = 0.2
ppo_epochs: int = 4
# KL penalty
kl_coef: float = 0.1
target_kl: float = 0.02
# Training
learning_rate: float = 1e-5
batch_size: int = 4
max_new_tokens: int = 100
temperature: float = 0.7
# GAE
gamma: float = 1.0
lam: float = 0.95
config = PPOConfig()The PPO Training Loop
class PPOTrainer:
"""
Simplified PPO trainer for RLHF.
This implementation focuses on clarity over efficiency.
Production code would optimize memory and compute.
"""
def __init__(self, policy, ref_policy, reward_model, tokenizer, config):
self.policy = policy
self.ref_policy = ref_policy
self.reward_model = reward_model
self.tokenizer = tokenizer
self.config = config
# Freeze reference policy
for param in self.ref_policy.parameters():
param.requires_grad = False
# Optimizer for policy only
self.optimizer = AdamW(
[p for p in self.policy.parameters() if p.requires_grad],
lr=config.learning_rate
)
self.kl_coef = config.kl_coef
def generate_and_score(self, prompts: List[str]) -> Dict:
"""
Generate responses and compute rewards.
Returns dict with responses, log probs, rewards, etc.
"""
self.policy.eval()
all_responses = []
all_log_probs = []
all_rewards = []
all_ref_log_probs = []
for prompt in prompts:
formatted = f"Question: {prompt}\n\nAnswer:"
inputs = self.tokenizer(formatted, return_tensors="pt").to(self.policy.device)
# Generate with the policy
with torch.no_grad():
outputs = self.policy.generate(
**inputs,
max_new_tokens=self.config.max_new_tokens,
temperature=self.config.temperature,
do_sample=True,
return_dict_in_generate=True,
output_scores=True,
pad_token_id=self.tokenizer.pad_token_id
)
# Extract generated tokens (excluding prompt)
prompt_len = inputs['input_ids'].shape[1]
response_ids = outputs.sequences[0, prompt_len:]
# Compute log probs for generated tokens
log_probs = []
for i, score in enumerate(outputs.scores):
probs = F.softmax(score[0] / self.config.temperature, dim=-1)
token_id = response_ids[i]
log_prob = torch.log(probs[token_id] + 1e-10)
log_probs.append(log_prob.item())
# Get reference log probs
with torch.no_grad():
full_ids = outputs.sequences
ref_outputs = self.ref_policy(full_ids)
ref_logits = ref_outputs.logits[0, prompt_len-1:-1] # Align with response
ref_probs = F.softmax(ref_logits / self.config.temperature, dim=-1)
ref_log_probs = [
torch.log(ref_probs[i, response_ids[i]] + 1e-10).item()
for i in range(len(response_ids))
]
# Compute reward
with torch.no_grad():
full_text = self.tokenizer.decode(outputs.sequences[0])
reward_input = self.tokenizer(
full_text, return_tensors="pt"
).to(self.reward_model.device)
reward = self.reward_model(
reward_input['input_ids'],
reward_input['attention_mask']
).item()
# Decode response
response = self.tokenizer.decode(response_ids, skip_special_tokens=True)
all_responses.append(response)
all_log_probs.append(log_probs)
all_ref_log_probs.append(ref_log_probs)
all_rewards.append(reward)
return {
"prompts": prompts,
"responses": all_responses,
"log_probs": all_log_probs,
"ref_log_probs": all_ref_log_probs,
"rewards": all_rewards
}
def compute_advantages(self, rewards: List[float], log_probs: List[List[float]],
ref_log_probs: List[List[float]]) -> Tuple[List, List]:
"""
Compute advantages with KL penalty.
KL penalty is applied per-token, reward at end.
"""
all_advantages = []
all_returns = []
for i in range(len(rewards)):
n_tokens = len(log_probs[i])
# Per-token rewards: KL penalty + final reward
token_rewards = []
for t in range(n_tokens):
kl_penalty = -self.kl_coef * (log_probs[i][t] - ref_log_probs[i][t])
r = kl_penalty
if t == n_tokens - 1: # Add reward score at last token
r += rewards[i]
token_rewards.append(r)
# Compute returns (simplified: no value function, just cumulative reward)
returns = []
cumulative = 0
for r in reversed(token_rewards):
cumulative = r + self.config.gamma * cumulative
returns.insert(0, cumulative)
# Advantages = returns (no baseline in this simplified version)
advantages = returns
all_advantages.append(advantages)
all_returns.append(returns)
return all_advantages, all_returns
def ppo_update(self, data: Dict, advantages: List, returns: List) -> Dict:
"""
Perform PPO policy update.
"""
self.policy.train()
metrics = {"pg_loss": [], "kl": [], "clip_frac": []}
for epoch in range(self.config.ppo_epochs):
for i, prompt in enumerate(data["prompts"]):
if len(data["log_probs"][i]) == 0:
continue
formatted = f"Question: {prompt}\n\nAnswer: {data['responses'][i]}"
inputs = self.tokenizer(formatted, return_tensors="pt").to(self.policy.device)
# Forward pass
outputs = self.policy(**inputs)
logits = outputs.logits
# Get new log probs for response tokens
prompt_formatted = f"Question: {prompt}\n\nAnswer:"
prompt_len = len(self.tokenizer(prompt_formatted)['input_ids'])
response_ids = inputs['input_ids'][0, prompt_len:]
new_log_probs = []
for t in range(len(response_ids)):
probs = F.softmax(logits[0, prompt_len + t - 1] / self.config.temperature, dim=-1)
lp = torch.log(probs[response_ids[t]] + 1e-10)
new_log_probs.append(lp)
if len(new_log_probs) == 0:
continue
new_log_probs = torch.stack(new_log_probs)
old_log_probs = torch.tensor(data["log_probs"][i][:len(new_log_probs)],
device=self.policy.device)
advs = torch.tensor(advantages[i][:len(new_log_probs)],
device=self.policy.device, dtype=torch.float16)
# Normalize advantages
advs = (advs - advs.mean()) / (advs.std() + 1e-8)
# Probability ratio
ratio = torch.exp(new_log_probs - old_log_probs)
# Clipped objective
pg_loss1 = -advs * ratio
pg_loss2 = -advs * torch.clamp(ratio, 1 - self.config.clip_range,
1 + self.config.clip_range)
pg_loss = torch.max(pg_loss1, pg_loss2).mean()
# Backward
self.optimizer.zero_grad()
pg_loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0)
self.optimizer.step()
# Track metrics
with torch.no_grad():
kl = (old_log_probs - new_log_probs).mean().item()
clip_frac = ((ratio - 1).abs() > self.config.clip_range).float().mean().item()
metrics["pg_loss"].append(pg_loss.item())
metrics["kl"].append(kl)
metrics["clip_frac"].append(clip_frac)
return {k: np.mean(v) if v else 0 for k, v in metrics.items()}
def train_step(self, prompts: List[str]) -> Dict:
"""Complete training step: generate, score, update."""
# Generate and score
data = self.generate_and_score(prompts)
# Compute advantages
advantages, returns = self.compute_advantages(
data["rewards"], data["log_probs"], data["ref_log_probs"]
)
# PPO update
metrics = self.ppo_update(data, advantages, returns)
# Add reward stats
metrics["mean_reward"] = np.mean(data["rewards"])
metrics["mean_response_len"] = np.mean([len(r) for r in data["responses"]])
# Adaptive KL coefficient
if metrics["kl"] > self.config.target_kl * 1.5:
self.kl_coef *= 1.2
elif metrics["kl"] < self.config.target_kl / 1.5:
self.kl_coef /= 1.2
self.kl_coef = max(0.01, min(1.0, self.kl_coef))
metrics["kl_coef"] = self.kl_coef
return metricsRunning RL Training
# Create reference policy (frozen copy of SFT model)
ref_policy = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
torch_dtype=torch.float16,
device_map="auto"
)
# Note: In practice, you'd save and reload the SFT model as reference
# Initialize trainer
ppo_trainer = PPOTrainer(
policy=model,
ref_policy=ref_policy,
reward_model=reward_model,
tokenizer=tokenizer,
config=config
)
# Training prompts (you'd want many more in practice)
rl_prompts = TRAINING_PROMPTS * 5 # Repeat for more training
# Training loop
num_iterations = 20
print("Starting RL training...")
for iteration in range(num_iterations):
# Sample batch of prompts
batch_prompts = random.sample(rl_prompts, min(config.batch_size, len(rl_prompts)))
# Training step
metrics = ppo_trainer.train_step(batch_prompts)
# Logging
print(f"Iteration {iteration+1}/{num_iterations}:")
print(f" Reward: {metrics['mean_reward']:.3f}")
print(f" KL: {metrics['kl']:.4f}")
print(f" PG Loss: {metrics['pg_loss']:.4f}")
print(f" KL Coef: {metrics['kl_coef']:.4f}")What to look for:
- Reward increasing: Model is learning to generate preferred responses
- KL staying moderate: Model isnβt diverging too much from reference
- Loss decreasing: Policy updates are working
Part 4: Evaluation and Analysis
Comparing Before and After
# Evaluation prompts (different from training)
EVAL_PROMPTS = [
"What is the difference between a list and a tuple in Python?",
"Explain how convolutional neural networks work.",
"What is the purpose of regularization in machine learning?",
"How does batch normalization help training?",
"What is the vanishing gradient problem?",
]
print("=" * 60)
print("COMPARISON: Before vs After RL Training")
print("=" * 60)
for prompt in EVAL_PROMPTS[:3]:
print(f"\nQuestion: {prompt}\n")
# Generate with reference (pre-RL)
ref_response = generate_response(ref_policy, tokenizer, prompt)
print(f"Before RL: {ref_response[:200]}...")
# Get reward
full_text = f"Question: {prompt}\n\nAnswer: {ref_response}"
inputs = tokenizer(full_text, return_tensors="pt").to(reward_model.device)
with torch.no_grad():
ref_reward = reward_model(inputs['input_ids']).item()
print(f"Reward: {ref_reward:.3f}")
print()
# Generate with trained policy
rl_response = generate_response(model, tokenizer, prompt)
print(f"After RL: {rl_response[:200]}...")
# Get reward
full_text = f"Question: {prompt}\n\nAnswer: {rl_response}"
inputs = tokenizer(full_text, return_tensors="pt").to(reward_model.device)
with torch.no_grad():
rl_reward = reward_model(inputs['input_ids']).item()
print(f"Reward: {rl_reward:.3f}")
print("-" * 60)Checking for Reward Hacking
Watch for these patterns that indicate reward hacking:
- Repetitive responses: Same phrases or structures regardless of prompt
- Excessive length: Model learned βlonger = higher rewardβ
- Format gaming: Overuse of lists, headers, or other structural elements
- Nonsense with high reward: Responses score well but make no sense
If you see these, try:
- Increasing the KL coefficient
- Improving the reward model
- Adding explicit penalties for problematic behaviors
def check_for_reward_hacking(model, tokenizer, prompts):
"""Check for common reward hacking patterns."""
responses = [generate_response(model, tokenizer, p) for p in prompts]
print("Reward Hacking Check:")
print("-" * 40)
# Check for repetition
unique_responses = set(responses)
repetition_rate = 1 - len(unique_responses) / len(responses)
print(f"Repetition rate: {repetition_rate:.1%}")
if repetition_rate > 0.3:
print(" β οΈ High repetition - possible mode collapse")
# Check for length distribution
lengths = [len(r) for r in responses]
avg_length = np.mean(lengths)
std_length = np.std(lengths)
print(f"Average length: {avg_length:.0f} chars (std: {std_length:.0f})")
if avg_length > 300:
print(" β οΈ Responses are very long - possible length gaming")
# Check for common patterns
pattern_counts = {}
for response in responses:
# Check for overused phrases
for phrase in ["I think", "In conclusion", "First,", "Additionally,"]:
if phrase in response:
pattern_counts[phrase] = pattern_counts.get(phrase, 0) + 1
overused = [p for p, c in pattern_counts.items() if c > len(prompts) * 0.5]
if overused:
print(f" β οΈ Overused phrases: {overused}")
print("-" * 40)
return responses
# Run the check
test_prompts = [
"What is a function?",
"Explain variables.",
"What is a class?",
"How do loops work?",
"What is an algorithm?",
]
_ = check_for_reward_hacking(model, tokenizer, test_prompts)Key Takeaways
-
SFT teaches format, RL teaches quality: SFT gets the model into the right ballpark; RL fine-tunes for what humans actually prefer.
-
Reward models are imperfect proxies: They capture patterns in preferences, including biases. Always evaluate beyond the reward signal.
-
KL penalty prevents reward hacking: Without it, models find degenerate optima. The penalty keeps the model grounded.
-
LoRA makes fine-tuning accessible: You can train powerful models on consumer hardware by only updating a small fraction of parameters.
-
Evaluation is hard: High reward doesnβt mean good quality. Always do qualitative evaluation alongside metrics.
Next Steps
To go deeper:
- RLHF Chapter β Full theoretical treatment
- PPO β Deep dive into the algorithm
- Reward Modeling β Bradley-Terry and preference learning
To experiment:
- Try different reward model architectures
- Experiment with KL coefficient values
- Add more training data
- Try DPO as an alternative to PPO
Production considerations:
- Use larger models (7B+) for better quality
- Collect real human preferences
- Implement ensemble reward models
- Add safety evaluations
Full Notebook
The complete, runnable notebook is available here:
Download Jupyter Notebook
Or run directly in Colab:
Open in Colab