|
|
|
"""classification.ipynb |
|
|
|
Automatically generated by Colab. |
|
|
|
Original file is located at |
|
https://colab.research.google.com/drive/1JuZNV3fqC5XQ0L-jhIyVRbIDPfWWGkVI |
|
""" |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torchvision import datasets, models, transforms |
|
from torch.utils.data import DataLoader |
|
from torch.utils.data import DataLoader, random_split |
|
import os |
|
import matplotlib.pyplot as plt |
|
import random |
|
from PIL import Image |
|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
data_dir = 'drive/MyDrive/Ai_Hackathon_2024/plant_data/data_for_training' |
|
augmented_data_dir = 'drive/MyDrive/Ai_Hackathon_2024/plant_data/augmented_data' |
|
|
|
|
|
N = 50 |
|
|
|
|
|
augmentation_transforms = transforms.Compose([ |
|
transforms.RandomHorizontalFlip(), |
|
transforms.RandomRotation(30), |
|
transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)), |
|
transforms.RandomResizedCrop(size=(224, 224), scale=(0.8, 1.0)), |
|
transforms.Pad(padding=10, padding_mode='reflect'), |
|
transforms.ToTensor(), |
|
]) |
|
|
|
|
|
print('loading dataset...') |
|
dataset = datasets.ImageFolder(data_dir) |
|
class_names = dataset.classes |
|
|
|
print('loaded dataset.') |
|
|
|
|
|
def save_image(img, path, idx): |
|
img.save(os.path.join(path, f'{idx}.png')) |
|
|
|
|
|
if not os.path.exists(augmented_data_dir): |
|
os.makedirs(augmented_data_dir) |
|
|
|
print('starting augmentation process...') |
|
for class_idx in range(len(dataset.classes)): |
|
print(f"class_idx = {class_idx}") |
|
class_dir = os.path.join(augmented_data_dir, dataset.classes[class_idx]) |
|
if not os.path.exists(class_dir): |
|
os.makedirs(class_dir) |
|
|
|
class_images = [img_path for img_path, label in dataset.samples if label == class_idx] |
|
current_count = 0 |
|
|
|
|
|
for img_path in class_images: |
|
img = Image.open(img_path) |
|
save_image(img, class_dir, current_count) |
|
current_count += 1 |
|
|
|
|
|
while current_count < N: |
|
img_path = random.choice(class_images) |
|
img = Image.open(img_path) |
|
img = augmentation_transforms(img) |
|
img = transforms.ToPILImage()(img) |
|
save_image(img, class_dir, current_count) |
|
current_count += 1 |
|
|
|
print('Data augmentation completed.') |
|
|
|
|
|
data_dir = augmented_data_dir |
|
|
|
|
|
|
|
seed = 42 |
|
torch.manual_seed(seed) |
|
|
|
|
|
data_transforms = transforms.Compose([ |
|
transforms.Resize((224, 224)), |
|
transforms.RandomHorizontalFlip(), |
|
transforms.RandomRotation(30), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
|
]) |
|
|
|
|
|
full_dataset = datasets.ImageFolder(data_dir, transform=data_transforms) |
|
|
|
|
|
train_size = int(0.8 * len(full_dataset)) |
|
val_size = len(full_dataset) - train_size |
|
|
|
|
|
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(seed)) |
|
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) |
|
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) |
|
|
|
|
|
resnet50 = models.resnet50(weights='ResNet50_Weights.DEFAULT') |
|
|
|
|
|
for param in resnet50.parameters(): |
|
param.requires_grad = False |
|
|
|
|
|
num_ftrs = resnet50.fc.in_features |
|
resnet50.fc = nn.Identity() |
|
|
|
|
|
class CustomNet(nn.Module): |
|
def __init__(self, num_ftrs, num_classes): |
|
super(CustomNet, self).__init__() |
|
self.resnet50 = resnet50 |
|
self.hidden = nn.Linear(num_ftrs, 512) |
|
self.relu = nn.ReLU() |
|
self.output = nn.Linear(512, num_classes) |
|
|
|
def forward(self, x): |
|
x = self.resnet50(x) |
|
x = self.hidden(x) |
|
x = self.relu(x) |
|
x = self.output(x) |
|
return x |
|
|
|
|
|
num_classes = len(full_dataset.classes) |
|
model = CustomNet(num_ftrs, num_classes) |
|
|
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
model = model.to(device) |
|
|
|
|
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = optim.Adam(model.parameters(), lr=0.001) |
|
|
|
def train_model(model, dataloaders, criterion, optimizer, num_epochs=10): |
|
best_model_wts = model.state_dict() |
|
best_acc = 0.0 |
|
|
|
train_losses = [] |
|
val_losses = [] |
|
|
|
for epoch in range(num_epochs): |
|
print(f'Epoch {epoch}/{num_epochs - 1}') |
|
print('-' * 10) |
|
|
|
|
|
for phase in ['train', 'val']: |
|
if phase == 'train': |
|
model.train() |
|
else: |
|
model.eval() |
|
|
|
running_loss = 0.0 |
|
running_corrects = 0 |
|
|
|
for inputs, labels in dataloaders[phase]: |
|
inputs, labels = inputs.to(device), labels.to(device) |
|
|
|
|
|
optimizer.zero_grad() |
|
|
|
|
|
with torch.set_grad_enabled(phase == 'train'): |
|
outputs = model(inputs) |
|
_, preds = torch.max(outputs, 1) |
|
loss = criterion(outputs, labels) |
|
|
|
|
|
if phase == 'train': |
|
loss.backward() |
|
optimizer.step() |
|
|
|
running_loss += loss.item() * inputs.size(0) |
|
running_corrects += torch.sum(preds == labels.data) |
|
|
|
epoch_loss = running_loss / len(dataloaders[phase].dataset) |
|
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset) |
|
|
|
print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') |
|
|
|
if phase == 'train': |
|
train_losses.append(epoch_loss) |
|
else: |
|
val_losses.append(epoch_loss) |
|
|
|
|
|
if phase == 'val' and epoch_acc > best_acc: |
|
best_acc = epoch_acc |
|
best_model_wts = model.state_dict() |
|
|
|
print('Best val Acc: {:4f}'.format(best_acc)) |
|
|
|
|
|
model.load_state_dict(best_model_wts) |
|
|
|
|
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(train_losses, label='Training Loss') |
|
plt.plot(val_losses, label='Validation Loss') |
|
plt.xlabel('Epochs') |
|
plt.ylabel('Loss') |
|
plt.legend() |
|
plt.show() |
|
|
|
return model |
|
|
|
|
|
dataloaders = {'train': train_loader, 'val': val_loader} |
|
|
|
|
|
model = train_model(model, dataloaders, criterion, optimizer, num_epochs=10) |
|
|
|
|
|
torch.save(model.state_dict(), 'drive/MyDrive/Ai_Hackathon_2024/plant_data/fine_tuned_plant_classifier.pth') |
|
|
|
|
|
def evaluate_model(model, dataloader): |
|
model.eval() |
|
correct = 0 |
|
total = 0 |
|
|
|
all_preds = [] |
|
all_labels = [] |
|
|
|
with torch.no_grad(): |
|
for inputs, labels in dataloader: |
|
inputs, labels = inputs.to(device), labels.to(device) |
|
outputs = model(inputs) |
|
_, preds = torch.max(outputs, 1) |
|
|
|
all_preds.extend(preds.cpu().numpy()) |
|
all_labels.extend(labels.cpu().numpy()) |
|
|
|
correct += (preds == labels).sum().item() |
|
total += labels.size(0) |
|
|
|
accuracy = correct / total |
|
return accuracy, all_preds, all_labels |
|
|
|
|
|
dataloader = DataLoader(full_dataset, batch_size=32, shuffle=True) |
|
accuracy, all_preds, all_labels = evaluate_model(model, dataloader) |
|
|
|
|
|
correct_preds = sum(np.array(all_preds) == np.array(all_labels)) |
|
incorrect_preds = len(all_labels) - correct_preds |
|
|
|
print(f'Total images: {len(all_labels)}') |
|
print(f'Correct predictions: {correct_preds}') |
|
print(f'Incorrect predictions: {incorrect_preds}') |
|
print(f'Accuracy: {accuracy:.4f}') |
|
|
|
|
|
real_dataset = datasets.ImageFolder('drive/MyDrive/Ai_Hackathon_2024/plant_data/data_for_training', transform=data_transforms) |
|
|
|
|
|
dataloader = DataLoader(real_dataset, batch_size=32, shuffle=True) |
|
accuracy, all_preds, all_labels = evaluate_model(model, dataloader) |
|
|
|
|
|
correct_preds = sum(np.array(all_preds) == np.array(all_labels)) |
|
incorrect_preds = len(all_labels) - correct_preds |
|
print('-'*10) |
|
print(f'Total images: {len(all_labels)}') |
|
print(f'Correct predictions: {correct_preds}') |
|
print(f'Incorrect predictions: {incorrect_preds}') |
|
print(f'Accuracy: {accuracy:.4f}') |
|
|
|
|
|
def process_image(image_path): |
|
data_transform = transforms.Compose([ |
|
transforms.Resize((224, 224)), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
|
]) |
|
image = Image.open(image_path).convert('RGB') |
|
image = data_transform(image) |
|
image = image.unsqueeze(0) |
|
return image |
|
|
|
|
|
|
|
|
|
def predict_single_image(image_path, model): |
|
|
|
image = process_image(image_path) |
|
|
|
|
|
model.eval() |
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
model = model.to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
image = image.to(device) |
|
outputs = model(image) |
|
probabilities = torch.nn.functional.softmax(outputs[0], dim=0) |
|
|
|
|
|
return pd.Series(probabilities.cpu().numpy(), index=class_names).sort_values(ascending=False) |
|
|
|
def classify(img_path): |
|
|
|
image_path = img_path |
|
|
|
|
|
model = CustomNet(num_ftrs, num_classes) |
|
|
|
model.load_state_dict(torch.load('./fine_tuned:plant_classifier.pth')) |
|
|
|
|
|
class_probabilities = predict_single_image(image_path, model) |
|
return class_probabilities |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import shutil |
|
from PIL import Image |
|
|
|
|
|
source_dir = 'path/to/source_images' |
|
target_base_dir = 'path/to/training_images' |
|
new_base_dir = 'path/to/training_images_2' |
|
|
|
|
|
class_folders = [d for d in os.listdir(target_base_dir) if os.path.isdir(os.path.join(target_base_dir, d))] |
|
|
|
|
|
def extract_id(filename): |
|
return filename.split('_')[0] |
|
|
|
|
|
def crop_middle_section(image): |
|
width, height = image.size |
|
new_width = width // 3 |
|
new_height = height // 3 |
|
left = (width - new_width) // 2 |
|
top = (height - new_height) // 2 |
|
right = left + new_width |
|
bottom = top + new_height |
|
return image.crop((left, top, right, bottom)) |
|
|
|
|
|
os.makedirs(new_base_dir, exist_ok=True) |
|
|
|
|
|
id_to_class_folder = {} |
|
for class_folder in class_folders: |
|
class_folder_path = os.path.join(target_base_dir, class_folder) |
|
for filename in os.listdir(class_folder_path): |
|
if os.path.isfile(os.path.join(class_folder_path, filename)): |
|
file_id = extract_id(filename) |
|
id_to_class_folder[file_id] = class_folder |
|
|
|
|
|
for filename in os.listdir(source_dir): |
|
if os.path.isfile(os.path.join(source_dir, filename)): |
|
file_id = extract_id(filename) |
|
if file_id in id_to_class_folder: |
|
target_class_folder = id_to_class_folder[file_id] |
|
new_class_folder_path = os.path.join(new_base_dir, target_class_folder) |
|
os.makedirs(new_class_folder_path, exist_ok=True) |
|
|
|
target_path = os.path.join(new_class_folder_path, filename) |
|
|
|
|
|
image_path = os.path.join(source_dir, filename) |
|
with Image.open(image_path) as img: |
|
cropped_img = crop_middle_section(img) |
|
cropped_img.save(target_path) |
|
|
|
print(f'Copied and cropped {filename} to {new_class_folder_path}') |
|
|
|
print('Image processing and copying completed.') |