|
import os |
|
import torch |
|
import numpy as np |
|
|
|
from loguru import logger |
|
from tqdm import tqdm |
|
from dotenv import load_dotenv |
|
|
|
from fastapi import APIRouter |
|
from datetime import datetime |
|
from datasets import load_dataset |
|
from sklearn.metrics import accuracy_score, precision_score, recall_score |
|
|
|
from .utils.evaluation import ImageEvaluationRequest |
|
from .utils.emissions import tracker, clean_emissions_data, get_space_info |
|
|
|
from ultralytics import YOLO |
|
from ultralytics import RTDETR |
|
from torch.utils.data import DataLoader |
|
from torchvision import transforms |
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
router = APIRouter() |
|
DESCRIPTION = "Image to detect smoke" |
|
ROUTE = "/image" |
|
|
|
device = torch.device("cuda") |
|
|
|
|
|
def parse_boxes(annotation_string): |
|
"""Parse multiple boxes from a single annotation string. |
|
Each box has 5 values: class_id, x_center, y_center, width, height""" |
|
values = [float(x) for x in annotation_string.strip().split()] |
|
boxes = [] |
|
|
|
for i in range(0, len(values), 5): |
|
if i + 5 <= len(values): |
|
|
|
box = values[i + 1:i + 5] |
|
boxes.append(box) |
|
return boxes |
|
|
|
|
|
def compute_iou(box1, box2): |
|
"""Compute Intersection over Union (IoU) between two YOLO format boxes.""" |
|
|
|
|
|
def yolo_to_corners(box): |
|
x_center, y_center, width, height = box |
|
x1 = x_center - width / 2 |
|
y1 = y_center - height / 2 |
|
x2 = x_center + width / 2 |
|
y2 = y_center + height / 2 |
|
return np.array([x1, y1, x2, y2]) |
|
|
|
box1_corners = yolo_to_corners(box1) |
|
box2_corners = yolo_to_corners(box2) |
|
|
|
|
|
x1 = max(box1_corners[0], box2_corners[0]) |
|
y1 = max(box1_corners[1], box2_corners[1]) |
|
x2 = min(box1_corners[2], box2_corners[2]) |
|
y2 = min(box1_corners[3], box2_corners[3]) |
|
|
|
intersection = max(0, x2 - x1) * max(0, y2 - y1) |
|
|
|
|
|
box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) |
|
box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) |
|
union = box1_area + box2_area - intersection |
|
|
|
return intersection / (union + 1e-6) |
|
|
|
|
|
def compute_max_iou(true_boxes, pred_box): |
|
"""Compute maximum IoU between a predicted box and all true boxes""" |
|
max_iou = 0 |
|
for true_box in true_boxes: |
|
iou = compute_iou(true_box, pred_box) |
|
max_iou = max(max_iou, iou) |
|
return max_iou |
|
|
|
|
|
class ClampTransform: |
|
def __init__(self, min_val=0.0, max_val=1.0): |
|
self.min_val = min_val |
|
self.max_val = max_val |
|
|
|
def __call__(self, tensor): |
|
return torch.clamp(tensor, min=self.min_val, max=self.max_val) |
|
|
|
|
|
def collate_fn(batch): |
|
images = [item['image'] for item in batch] |
|
annotations = [item.get('annotations', '') for item in batch] |
|
|
|
|
|
transform = transforms.Compose([ |
|
transforms.ToTensor(), |
|
ClampTransform(min_val=0.0, max_val=1.0), |
|
transforms.Resize((640, 640)) |
|
]) |
|
|
|
images = [transform(img) for img in images] |
|
images = torch.stack(images) |
|
return {'image': images, 'annotations': annotations} |
|
|
|
|
|
def parse_boxes(annotation_string): |
|
"""Parse multiple boxes from a single annotation string. |
|
Each box has 5 values: class_id, x_center, y_center, width, height""" |
|
values = [float(x) for x in annotation_string.strip().split()] |
|
boxes = [] |
|
|
|
for i in range(0, len(values), 5): |
|
if i + 5 <= len(values): |
|
|
|
box = values[i+1:i+5] |
|
boxes.append(box) |
|
return boxes |
|
|
|
|
|
def compute_iou(box1, box2): |
|
"""Compute Intersection over Union (IoU) between two YOLO format boxes.""" |
|
|
|
def yolo_to_corners(box): |
|
x_center, y_center, width, height = box |
|
x1 = x_center - width/2 |
|
y1 = y_center - height/2 |
|
x2 = x_center + width/2 |
|
y2 = y_center + height/2 |
|
return np.array([x1, y1, x2, y2]) |
|
|
|
box1_corners = yolo_to_corners(box1) |
|
box2_corners = yolo_to_corners(box2) |
|
|
|
|
|
x1 = max(box1_corners[0], box2_corners[0]) |
|
y1 = max(box1_corners[1], box2_corners[1]) |
|
x2 = min(box1_corners[2], box2_corners[2]) |
|
y2 = min(box1_corners[3], box2_corners[3]) |
|
|
|
intersection = max(0, x2 - x1) * max(0, y2 - y1) |
|
|
|
|
|
box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) |
|
box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) |
|
union = box1_area + box2_area - intersection |
|
|
|
return intersection / (union + 1e-6) |
|
|
|
|
|
def compute_max_iou(true_boxes, pred_box): |
|
"""Compute maximum IoU between a predicted box and all true boxes""" |
|
max_iou = 0 |
|
for true_box in true_boxes: |
|
iou = compute_iou(true_box, pred_box) |
|
max_iou = max(max_iou, iou) |
|
return max_iou |
|
|
|
|
|
@router.post(ROUTE, tags=["Image Task"], |
|
description=DESCRIPTION) |
|
async def evaluate_image(model_path: str = "models/yolo11s_best.pt", request: ImageEvaluationRequest = ImageEvaluationRequest()): |
|
""" |
|
Evaluate image classification and object detection for forest fire smoke. |
|
|
|
Current Model: Random Baseline |
|
- Makes random predictions for both classification and bounding boxes |
|
- Used as a baseline for comparison |
|
|
|
Metrics: |
|
- Classification accuracy: Whether an image contains smoke or not |
|
- Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes |
|
""" |
|
|
|
username, space_url = get_space_info() |
|
|
|
|
|
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) |
|
|
|
|
|
train_test = dataset["train"] |
|
test_dataset = dataset["val"] |
|
if("yolo" in model_path): |
|
model = YOLO(model_path, task="detect") |
|
if("detr" in model_path): |
|
model = RTDETR(model_path) |
|
|
|
|
|
tracker.start() |
|
tracker.start_task("inference") |
|
|
|
|
|
|
|
|
|
|
|
|
|
predictions = [] |
|
true_labels = [] |
|
pred_boxes = [] |
|
true_boxes_list = [] |
|
|
|
for example in tqdm(test_dataset): |
|
|
|
annotation = example.get("annotations", "").strip() |
|
has_smoke = len(annotation) > 0 |
|
true_labels.append(int(has_smoke)) |
|
|
|
image=example["image"] |
|
results = model(image, verbose=False) |
|
boxes = results[0].boxes.xywh.tolist() |
|
|
|
pred_has_smoke = len(boxes) > 0 |
|
predictions.append(int(pred_has_smoke)) |
|
|
|
if has_smoke: |
|
|
|
|
|
image_true_boxes = parse_boxes(annotation) |
|
|
|
|
|
|
|
for box in boxes: |
|
x, y, w, h = box |
|
image_width, image_height = image.size |
|
x = x / image_width |
|
y = y / image_height |
|
w_n = w / image_width |
|
h_n = h / image_height |
|
formatted_box = [x, y, w_n, h_n] |
|
pred_boxes.append(formatted_box) |
|
true_boxes_list.append(image_true_boxes) |
|
|
|
|
|
|
|
|
|
|
|
|
|
emissions_data = tracker.stop_task() |
|
|
|
|
|
classification_accuracy = accuracy_score(true_labels, predictions) |
|
classification_precision = precision_score(true_labels, predictions) |
|
classification_recall = recall_score(true_labels, predictions) |
|
|
|
|
|
|
|
ious = [] |
|
for true_boxes, pred_box in zip(true_boxes_list, pred_boxes): |
|
max_iou = compute_max_iou(true_boxes, pred_box) |
|
ious.append(max_iou) |
|
|
|
mean_iou = float(np.mean(ious)) if ious else 0.0 |
|
|
|
|
|
results = { |
|
"username": username, |
|
"space_url": space_url, |
|
"submission_timestamp": datetime.now().isoformat(), |
|
"model_description": DESCRIPTION, |
|
"classification_accuracy": float(classification_accuracy), |
|
"classification_precision": float(classification_precision), |
|
"classification_recall": float(classification_recall), |
|
"mean_iou": mean_iou, |
|
"energy_consumed_wh": emissions_data.energy_consumed * 1000, |
|
"emissions_gco2eq": emissions_data.emissions * 1000, |
|
"emissions_data": clean_emissions_data(emissions_data), |
|
"api_route": ROUTE, |
|
"dataset_config": { |
|
"dataset_name": request.dataset_name, |
|
"test_size": request.test_size, |
|
"test_seed": request.test_seed |
|
} |
|
} |
|
|
|
return results |