Spaces:
Sleeping
Sleeping
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torchvision | |
import torchvision.transforms as transforms | |
from torch.utils.data import DataLoader, Subset | |
from model import get_model, save_model | |
from tqdm import tqdm | |
import os | |
from datetime import datetime | |
def get_transforms(): | |
""" | |
Define the image transformations with augmentation for training | |
""" | |
train_transform = transforms.Compose([ | |
transforms.Resize(224), | |
transforms.RandomHorizontalFlip(), | |
transforms.RandomRotation(15), | |
transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)), | |
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], | |
std=[0.229, 0.224, 0.225]) | |
]) | |
test_transform = transforms.Compose([ | |
transforms.Resize(224), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], | |
std=[0.229, 0.224, 0.225]) | |
]) | |
return train_transform, test_transform | |
def get_data(subset_size=None, train=True): | |
""" | |
Load and prepare the dataset | |
""" | |
train_transform, test_transform = get_transforms() | |
transform = train_transform if train else test_transform | |
dataset = torchvision.datasets.CIFAR10( | |
root='./data', | |
train=train, | |
download=True, | |
transform=transform | |
) | |
if subset_size: | |
indices = torch.randperm(len(dataset))[:subset_size] | |
dataset = Subset(dataset, indices) | |
dataloader = DataLoader( | |
dataset, | |
batch_size=32, | |
shuffle=True if train else False, | |
num_workers=2 | |
) | |
return dataloader | |
def evaluate_model(model, testloader, device): | |
""" | |
Evaluate the model on test data | |
""" | |
model.eval() | |
correct = 0 | |
total = 0 | |
with torch.no_grad(): | |
for inputs, labels in testloader: | |
inputs, labels = inputs.to(device), labels.to(device) | |
outputs = model(inputs) | |
_, predicted = outputs.max(1) | |
total += labels.size(0) | |
correct += predicted.eq(labels).sum().item() | |
return 100. * correct / total | |
def train_model(model, trainloader, testloader, epochs=100, device='cuda'): | |
""" | |
Train the model with improved hyperparameters and markdown logging | |
""" | |
model = model.to(device) | |
criterion = nn.CrossEntropyLoss() | |
# Add weight decay and reduce initial learning rate | |
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.01) | |
# Modify scheduler for better learning rate adjustment | |
scheduler = optim.lr_scheduler.OneCycleLR( | |
optimizer, | |
max_lr=0.001, | |
epochs=epochs, | |
steps_per_epoch=len(trainloader), | |
pct_start=0.2 # Warm up for first 20% of training | |
) | |
# Create a markdown file for logging | |
log_dir = 'logs' | |
os.makedirs(log_dir, exist_ok=True) | |
log_file = os.path.join(log_dir, f'training_log_{datetime.now().strftime("%Y%m%d_%H%M%S")}.md') | |
with open(log_file, 'w') as f: | |
f.write("# Training Log\n\n") | |
f.write("| Epoch | Train Loss | Train Acc | Test Acc | Best Acc |\n") | |
f.write("|-------|------------|-----------|-----------|----------|\n") | |
best_acc = 0.0 | |
epoch_pbar = tqdm(range(epochs), desc='Training Progress', position=0) | |
for epoch in epoch_pbar: | |
model.train() | |
running_loss = 0.0 | |
correct = 0 | |
total = 0 | |
# Create batch progress bar with position below epoch bar | |
batch_pbar = tqdm(trainloader, | |
desc=f'Epoch {epoch+1}', | |
position=1, | |
leave=True) | |
for inputs, labels in batch_pbar: | |
inputs, labels = inputs.to(device), labels.to(device) | |
optimizer.zero_grad() | |
outputs = model(inputs) | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optimizer.step() | |
scheduler.step() # Step the scheduler every batch | |
running_loss += loss.item() | |
_, predicted = outputs.max(1) | |
total += labels.size(0) | |
correct += predicted.eq(labels).sum().item() | |
# Update batch progress bar | |
batch_pbar.set_postfix({'loss': f'{loss.item():.3f}'}) | |
epoch_acc = 100. * correct / total | |
avg_loss = running_loss/len(trainloader) | |
# Evaluate on test data | |
test_acc = evaluate_model(model, testloader, device) | |
epoch_pbar.write(f'Epoch {epoch+1}: Train Loss: {avg_loss:.3f} | Train Acc: {epoch_acc:.2f}% | Test Acc: {test_acc:.2f}%') | |
# After computing metrics, log to markdown file | |
with open(log_file, 'a') as f: | |
f.write(f"| {epoch+1:5d} | {avg_loss:.3f} | {epoch_acc:.2f}% | {test_acc:.2f}% | {best_acc:.2f}% |\n") | |
if test_acc > best_acc: | |
best_acc = test_acc | |
save_model(model, 'best_model.pth') | |
epoch_pbar.write(f'New best test accuracy: {test_acc:.2f}%') | |
# Add a marker for best accuracy in the markdown | |
with open(log_file, 'a') as f: | |
f.write(f"**New best accuracy achieved at epoch {epoch+1}**\n\n") | |
if test_acc > 70: | |
epoch_pbar.write(f"\nReached target accuracy of 70% on test data!") | |
with open(log_file, 'a') as f: | |
f.write(f"\n**Training stopped at epoch {epoch+1} after reaching target accuracy of 70%**\n") | |
break | |
# Add final summary to markdown | |
with open(log_file, 'a') as f: | |
f.write(f"\n## Training Summary\n") | |
f.write(f"- Final Test Accuracy: {test_acc:.2f}%\n") | |
f.write(f"- Best Test Accuracy: {best_acc:.2f}%\n") | |
f.write(f"- Total Epochs: {epoch+1}\n") | |
if __name__ == "__main__": | |
# Set device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
# Get train and test data with larger batch size | |
trainloader = get_data(subset_size=10000, train=True) # Increased from 5000 | |
testloader = get_data(subset_size=2000, train=False) # Increased from 1000 | |
# Initialize model | |
model = get_model(num_classes=10) | |
# Train model | |
train_model(model, trainloader, testloader, epochs=100, device=device) |