File size: 9,846 Bytes
fb188bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# ==============================================================================
# PiT_MNIST_V1.0.py
#
# ML-Engineer LLM Agent Implementation
#
# Description:
# This script implements a Pixel Transformer (PiT) for MNIST classification,
# based on the paper "An Image is Worth More Than 16x16 Patches"
# (arXiv:2406.09415). It treats each pixel as an individual token, forgoing
# the patch-based approach of traditional Vision Transformers.
#
# Designed for Google Colab using the sample_data/mnist_*.csv files.
# ==============================================================================
import torch
import torch.nn as nn
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import math
# --- 1. Configuration & Hyperparameters ---
# These parameters are chosen to be reasonable for the MNIST task and
# inspired by the "Tiny" or "Small" variants in the paper.
CONFIG = {
"train_file": "/content/sample_data/mnist_train_small.csv",
"test_file": "/content/sample_data/mnist_test.csv",
"image_size": 28,
"num_classes": 10,
"embed_dim": 128, # 'd' in the paper. Dimension for each pixel embedding.
"num_layers": 6, # Number of Transformer Encoder layers.
"num_heads": 8, # Number of heads in Multi-Head Self-Attention. Must be a divisor of embed_dim.
"mlp_dim": 512, # Hidden dimension of the MLP block inside the Transformer. (4 * embed_dim is common)
"dropout": 0.1,
"batch_size": 128,
"epochs": 25, # Increased epochs for better convergence on the small dataset.
"learning_rate": 1e-4,
"device": "cuda" if torch.cuda.is_available() else "cpu",
}
CONFIG["sequence_length"] = CONFIG["image_size"] * CONFIG["image_size"] # 784 for MNIST
print("--- Configuration ---")
for key, value in CONFIG.items():
print(f"{key}: {value}")
print("---------------------\n")
# --- 2. Data Loading and Preprocessing ---
class MNIST_CSV_Dataset(Dataset):
"""Custom PyTorch Dataset for loading MNIST data from CSV files."""
def __init__(self, file_path):
df = pd.read_csv(file_path)
self.labels = torch.tensor(df.iloc[:, 0].values, dtype=torch.long)
# Normalize pixel values to [0, 1] and keep as float
self.pixels = torch.tensor(df.iloc[:, 1:].values, dtype=torch.float32) / 255.0
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
# The PiT's projection layer expects input of shape (in_features),
# so for each pixel, we need a tensor of shape (1).
# We reshape the 784 pixels to (784, 1).
return self.pixels[idx].unsqueeze(-1), self.labels[idx]
# --- 3. Pixel Transformer (PiT) Model Architecture ---
class PixelTransformer(nn.Module):
"""
Pixel Transformer (PiT) model.
Treats each pixel as a token and uses a Transformer Encoder for classification.
"""
def __init__(self, seq_len, num_classes, embed_dim, num_layers, num_heads, mlp_dim, dropout):
super().__init__()
# 1. Pixel Projection: Each pixel (a single value) is projected to embed_dim.
# This is the core "pixels-as-tokens" step.
self.pixel_projection = nn.Linear(1, embed_dim)
# 2. CLS Token: A learnable parameter that is prepended to the sequence of
# pixel embeddings. Its output state is used for classification.
self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
# 3. Position Embedding: Learnable embeddings to encode spatial information.
# Size is (seq_len + 1) to account for the CLS token.
# This removes the inductive bias of fixed positional encodings.
self.position_embedding = nn.Parameter(torch.randn(1, seq_len + 1, embed_dim))
self.dropout = nn.Dropout(dropout)
# 4. Transformer Encoder: The main workhorse of the model.
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim,
nhead=num_heads,
dim_feedforward=mlp_dim,
dropout=dropout,
activation="gelu",
batch_first=True # Important for (batch, seq, feature) input format
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
# 5. Classification Head: A simple MLP head on top of the CLS token's output.
self.mlp_head = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(embed_dim, num_classes)
)
def forward(self, x):
# Input x shape: (batch_size, seq_len, 1) -> (B, 784, 1)
# Project pixels to embedding dimension
x = self.pixel_projection(x) # (B, 784, 1) -> (B, 784, embed_dim)
# Prepend CLS token
cls_tokens = self.cls_token.expand(x.shape[0], -1, -1) # (B, 1, embed_dim)
x = torch.cat((cls_tokens, x), dim=1) # (B, 785, embed_dim)
# Add position embedding
x = x + self.position_embedding # (B, 785, embed_dim)
x = self.dropout(x)
# Pass through Transformer Encoder
x = self.transformer_encoder(x) # (B, 785, embed_dim)
# Extract the CLS token's output (at position 0)
cls_output = x[:, 0] # (B, embed_dim)
# Pass through MLP head to get logits
logits = self.mlp_head(cls_output) # (B, num_classes)
return logits
# --- 4. Training and Evaluation Functions ---
def train_one_epoch(model, dataloader, criterion, optimizer, device):
model.train()
total_loss = 0
progress_bar = tqdm(dataloader, desc="Training", leave=False)
for pixels, labels in progress_bar:
pixels, labels = pixels.to(device), labels.to(device)
# Forward pass
logits = model(pixels)
loss = criterion(logits, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
progress_bar.set_postfix(loss=loss.item())
return total_loss / len(dataloader)
def evaluate(model, dataloader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
progress_bar = tqdm(dataloader, desc="Evaluating", leave=False)
for pixels, labels in progress_bar:
pixels, labels = pixels.to(device), labels.to(device)
logits = model(pixels)
loss = criterion(logits, labels)
total_loss += loss.item()
_, predicted = torch.max(logits.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
progress_bar.set_postfix(acc=100. * correct / total)
avg_loss = total_loss / len(dataloader)
accuracy = 100. * correct / total
return avg_loss, accuracy
# --- 5. Main Execution Block ---
if __name__ == "__main__":
device = CONFIG["device"]
# Load full training data and split into train/validation sets
# This helps monitor overfitting, as mnist_train_small is quite small.
full_train_dataset = MNIST_CSV_Dataset(CONFIG["train_file"])
train_indices, val_indices = train_test_split(
range(len(full_train_dataset)),
test_size=0.1, # 10% for validation
random_state=42
)
train_dataset = torch.utils.data.Subset(full_train_dataset, train_indices)
val_dataset = torch.utils.data.Subset(full_train_dataset, val_indices)
test_dataset = MNIST_CSV_Dataset(CONFIG["test_file"])
train_loader = DataLoader(train_dataset, batch_size=CONFIG["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CONFIG["batch_size"], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=CONFIG["batch_size"], shuffle=False)
print(f"\nData loaded.")
print(f" Training samples: {len(train_dataset)}")
print(f" Validation samples: {len(val_dataset)}")
print(f" Test samples: {len(test_dataset)}\n")
# Initialize model, loss function, and optimizer
model = PixelTransformer(
seq_len=CONFIG["sequence_length"],
num_classes=CONFIG["num_classes"],
embed_dim=CONFIG["embed_dim"],
num_layers=CONFIG["num_layers"],
num_heads=CONFIG["num_heads"],
mlp_dim=CONFIG["mlp_dim"],
dropout=CONFIG["dropout"]
).to(device)
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Model initialized on {device}.")
print(f"Total trainable parameters: {total_params:,}\n")
criterion = nn.CrossEntropyLoss()
# AdamW is often preferred for Transformers
optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG["learning_rate"])
# Training loop
best_val_acc = 0
print("--- Starting Training ---")
for epoch in range(CONFIG["epochs"]):
train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = evaluate(model, val_loader, criterion, device)
print(
f"Epoch {epoch+1:02}/{CONFIG['epochs']} | "
f"Train Loss: {train_loss:.4f} | "
f"Val Loss: {val_loss:.4f} | "
f"Val Acc: {val_acc:.2f}%"
)
if val_acc > best_val_acc:
best_val_acc = val_acc
print(f" -> New best validation accuracy! Saving model state.")
torch.save(model.state_dict(), "PiT_MNIST_best.pth")
print("--- Training Finished ---\n")
# Final evaluation on the test set using the best model
print("--- Evaluating on Test Set ---")
model.load_state_dict(torch.load("PiT_MNIST_best.pth"))
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f"Final Test Loss: {test_loss:.4f}")
print(f"Final Test Accuracy: {test_acc:.2f}%")
print("----------------------------\n") |