File size: 9,846 Bytes
fb188bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# ==============================================================================
# PiT_MNIST_V1.0.py
#
# ML-Engineer LLM Agent Implementation
#
# Description:
# This script implements a Pixel Transformer (PiT) for MNIST classification,
# based on the paper "An Image is Worth More Than 16x16 Patches"
# (arXiv:2406.09415). It treats each pixel as an individual token, forgoing
# the patch-based approach of traditional Vision Transformers.
#
# Designed for Google Colab using the sample_data/mnist_*.csv files.
# ==============================================================================

import torch
import torch.nn as nn
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import math

# --- 1. Configuration & Hyperparameters ---
# These parameters are chosen to be reasonable for the MNIST task and
# inspired by the "Tiny" or "Small" variants in the paper.
CONFIG = {
    "train_file": "/content/sample_data/mnist_train_small.csv",
    "test_file": "/content/sample_data/mnist_test.csv",
    "image_size": 28,
    "num_classes": 10,
    "embed_dim": 128,      # 'd' in the paper. Dimension for each pixel embedding.
    "num_layers": 6,       # Number of Transformer Encoder layers.
    "num_heads": 8,        # Number of heads in Multi-Head Self-Attention. Must be a divisor of embed_dim.
    "mlp_dim": 512,        # Hidden dimension of the MLP block inside the Transformer. (4 * embed_dim is common)
    "dropout": 0.1,
    "batch_size": 128,
    "epochs": 25,          # Increased epochs for better convergence on the small dataset.
    "learning_rate": 1e-4,
    "device": "cuda" if torch.cuda.is_available() else "cpu",
}
CONFIG["sequence_length"] = CONFIG["image_size"] * CONFIG["image_size"] # 784 for MNIST

print("--- Configuration ---")
for key, value in CONFIG.items():
    print(f"{key}: {value}")
print("---------------------\n")


# --- 2. Data Loading and Preprocessing ---
class MNIST_CSV_Dataset(Dataset):
    """Custom PyTorch Dataset for loading MNIST data from CSV files."""
    def __init__(self, file_path):
        df = pd.read_csv(file_path)
        self.labels = torch.tensor(df.iloc[:, 0].values, dtype=torch.long)
        # Normalize pixel values to [0, 1] and keep as float
        self.pixels = torch.tensor(df.iloc[:, 1:].values, dtype=torch.float32) / 255.0

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # The PiT's projection layer expects input of shape (in_features),
        # so for each pixel, we need a tensor of shape (1).
        # We reshape the 784 pixels to (784, 1).
        return self.pixels[idx].unsqueeze(-1), self.labels[idx]

# --- 3. Pixel Transformer (PiT) Model Architecture ---
class PixelTransformer(nn.Module):
    """
    Pixel Transformer (PiT) model.
    Treats each pixel as a token and uses a Transformer Encoder for classification.
    """
    def __init__(self, seq_len, num_classes, embed_dim, num_layers, num_heads, mlp_dim, dropout):
        super().__init__()

        # 1. Pixel Projection: Each pixel (a single value) is projected to embed_dim.
        # This is the core "pixels-as-tokens" step.
        self.pixel_projection = nn.Linear(1, embed_dim)

        # 2. CLS Token: A learnable parameter that is prepended to the sequence of
        # pixel embeddings. Its output state is used for classification.
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

        # 3. Position Embedding: Learnable embeddings to encode spatial information.
        # Size is (seq_len + 1) to account for the CLS token.
        # This removes the inductive bias of fixed positional encodings.
        self.position_embedding = nn.Parameter(torch.randn(1, seq_len + 1, embed_dim))

        self.dropout = nn.Dropout(dropout)

        # 4. Transformer Encoder: The main workhorse of the model.
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=mlp_dim,
            dropout=dropout,
            activation="gelu",
            batch_first=True  # Important for (batch, seq, feature) input format
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # 5. Classification Head: A simple MLP head on top of the CLS token's output.
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        # Input x shape: (batch_size, seq_len, 1) -> (B, 784, 1)

        # Project pixels to embedding dimension
        x = self.pixel_projection(x)  # (B, 784, 1) -> (B, 784, embed_dim)

        # Prepend CLS token
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)  # (B, 1, embed_dim)
        x = torch.cat((cls_tokens, x), dim=1)  # (B, 785, embed_dim)

        # Add position embedding
        x = x + self.position_embedding # (B, 785, embed_dim)
        x = self.dropout(x)

        # Pass through Transformer Encoder
        x = self.transformer_encoder(x) # (B, 785, embed_dim)

        # Extract the CLS token's output (at position 0)
        cls_output = x[:, 0] # (B, embed_dim)

        # Pass through MLP head to get logits
        logits = self.mlp_head(cls_output) # (B, num_classes)

        return logits


# --- 4. Training and Evaluation Functions ---
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    progress_bar = tqdm(dataloader, desc="Training", leave=False)
    for pixels, labels in progress_bar:
        pixels, labels = pixels.to(device), labels.to(device)

        # Forward pass
        logits = model(pixels)
        loss = criterion(logits, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        progress_bar = tqdm(dataloader, desc="Evaluating", leave=False)
        for pixels, labels in progress_bar:
            pixels, labels = pixels.to(device), labels.to(device)

            logits = model(pixels)
            loss = criterion(logits, labels)

            total_loss += loss.item()
            _, predicted = torch.max(logits.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            progress_bar.set_postfix(acc=100. * correct / total)

    avg_loss = total_loss / len(dataloader)
    accuracy = 100. * correct / total
    return avg_loss, accuracy


# --- 5. Main Execution Block ---
if __name__ == "__main__":
    device = CONFIG["device"]

    # Load full training data and split into train/validation sets
    # This helps monitor overfitting, as mnist_train_small is quite small.
    full_train_dataset = MNIST_CSV_Dataset(CONFIG["train_file"])
    train_indices, val_indices = train_test_split(
        range(len(full_train_dataset)),
        test_size=0.1,  # 10% for validation
        random_state=42
    )
    train_dataset = torch.utils.data.Subset(full_train_dataset, train_indices)
    val_dataset = torch.utils.data.Subset(full_train_dataset, val_indices)
    test_dataset = MNIST_CSV_Dataset(CONFIG["test_file"])

    train_loader = DataLoader(train_dataset, batch_size=CONFIG["batch_size"], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=CONFIG["batch_size"], shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=CONFIG["batch_size"], shuffle=False)

    print(f"\nData loaded.")
    print(f"  Training samples:   {len(train_dataset)}")
    print(f"  Validation samples: {len(val_dataset)}")
    print(f"  Test samples:       {len(test_dataset)}\n")

    # Initialize model, loss function, and optimizer
    model = PixelTransformer(
        seq_len=CONFIG["sequence_length"],
        num_classes=CONFIG["num_classes"],
        embed_dim=CONFIG["embed_dim"],
        num_layers=CONFIG["num_layers"],
        num_heads=CONFIG["num_heads"],
        mlp_dim=CONFIG["mlp_dim"],
        dropout=CONFIG["dropout"]
    ).to(device)

    total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Model initialized on {device}.")
    print(f"Total trainable parameters: {total_params:,}\n")

    criterion = nn.CrossEntropyLoss()
    # AdamW is often preferred for Transformers
    optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG["learning_rate"])

    # Training loop
    best_val_acc = 0
    print("--- Starting Training ---")
    for epoch in range(CONFIG["epochs"]):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)

        print(
            f"Epoch {epoch+1:02}/{CONFIG['epochs']} | "
            f"Train Loss: {train_loss:.4f} | "
            f"Val Loss: {val_loss:.4f} | "
            f"Val Acc: {val_acc:.2f}%"
        )

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            print(f"  -> New best validation accuracy! Saving model state.")
            torch.save(model.state_dict(), "PiT_MNIST_best.pth")

    print("--- Training Finished ---\n")

    # Final evaluation on the test set using the best model
    print("--- Evaluating on Test Set ---")
    model.load_state_dict(torch.load("PiT_MNIST_best.pth"))
    test_loss, test_acc = evaluate(model, test_loader, criterion, device)
    print(f"Final Test Loss: {test_loss:.4f}")
    print(f"Final Test Accuracy: {test_acc:.2f}%")
    print("----------------------------\n")