Spaces:
Runtime error
Runtime error
import os | |
from pathlib import Path | |
from tqdm.notebook import tqdm | |
import numpy as np | |
import pandas as pd | |
from PIL import Image as PILImage | |
import torch | |
import cv2 | |
import pickle | |
import shutil | |
from detectron2.config import get_cfg | |
from detectron2 import model_zoo | |
from detectron2.engine import DefaultPredictor | |
from detectron2.data import DatasetCatalog, MetadataCatalog | |
from detectron2.utils.visualizer import ColorMode, Visualizer | |
from math import ceil | |
import uuid | |
from flask import Flask, request, send_file | |
import matplotlib | |
matplotlib.use('Agg') | |
app = Flask(__name__) | |
def get_vinbigdata_dicts_test(imgdir: Path, test_meta: pd.DataFrame, use_cache: bool = True, debug: bool = True): | |
debug_str = f"_debug{int(debug)}" | |
cache_path = Path(".") / f"dataset_dicts_cache_test{debug_str}.pkl" | |
if not use_cache or not cache_path.exists(): | |
print("Creating data...") | |
if debug: | |
test_meta = test_meta.iloc[:500] # For debug | |
# Load 1 image to get image size. | |
image_id = test_meta.loc[0, "image_id"] | |
image_path = os.path.join(imgdir, f"{image_id}.png") | |
image = cv2.imread(image_path) | |
resized_height, resized_width, ch = image.shape | |
dataset_dicts = [] | |
for index, test_meta_row in tqdm(test_meta.iterrows(), total=len(test_meta)): | |
record = {} | |
image_id, height, width = test_meta_row.values | |
filename = os.path.join(imgdir, f"{image_id}.png") | |
record["file_name"] = filename | |
record["image_id"] = image_id | |
record["height"] = resized_height | |
record["width"] = resized_width | |
dataset_dicts.append(record) | |
with open(cache_path, mode="wb") as f: | |
pickle.dump(dataset_dicts, f) | |
print(f"Load from cache {cache_path}") | |
with open(cache_path, mode="rb") as f: | |
dataset_dicts = pickle.load(f) | |
return dataset_dicts | |
def format_pred(labels: np.ndarray, boxes: np.ndarray, scores: np.ndarray) -> str: | |
pred_strings = [] | |
for label, score, bbox in zip(labels, scores, boxes): | |
xmin, ymin, xmax, ymax = bbox.astype(np.int64) | |
pred_strings.append(f"{label} {score} {xmin} {ymin} {xmax} {ymax}") | |
return " ".join(pred_strings) | |
def predict_batch(predictor: DefaultPredictor, im_list: list) -> list: | |
with torch.no_grad(): | |
inputs_list = [] | |
for original_image in im_list: | |
if predictor.input_format == "RGB": | |
original_image = original_image[:, :, ::-1] | |
height, width = original_image.shape[:2] | |
image = torch.as_tensor(original_image.astype("float32").transpose(2, 0, 1)) | |
inputs = {"image": image, "height": height, "width": width} | |
inputs_list.append(inputs) | |
predictions = predictor.model(inputs_list) | |
return predictions | |
def csv_create(new_image_path, image_id): | |
image = PILImage.open(new_image_path) | |
width, height = image.size | |
directory = os.path.dirname(new_image_path) | |
sample_submission_data = { | |
'image_id': [image_id], | |
'PredictionString': ['14 1 0 0 1 1'] | |
} | |
sample_submission_df = pd.DataFrame(sample_submission_data) | |
sample_submission_path = os.path.join(directory, 'sample_submission.csv') | |
sample_submission_df.to_csv(sample_submission_path, index=False) | |
test_meta_data = { | |
'image_id': [image_id], | |
'dim0': [width], | |
'dim1': [height] | |
} | |
test_meta_df = pd.DataFrame(test_meta_data) | |
test_meta_path = os.path.join(directory, 'test_meta.csv') | |
test_meta_df.to_csv(test_meta_path, index=False) | |
print("CSV files have been generated successfully.") | |
return sample_submission_path, test_meta_path | |
def prediction(image_id_main, local_image_path, model_path): | |
thing_classes = [ | |
"Aortic enlargement", "Atelectasis", "Calcification", "Cardiomegaly", | |
"Consolidation", "ILD", "Infiltration", "Lung Opacity", "Nodule/Mass", | |
"Other lesion", "Pleural effusion", "Pleural thickening", "Pneumothorax", "Pulmonary fibrosis" | |
] | |
category_name_to_id = {class_name: index for index, class_name in enumerate(thing_classes)} | |
debug = False | |
outdir = 'result_images' | |
os.makedirs(outdir, exist_ok=True) | |
imgdir = f'processed_images_{image_id_main}' | |
os.makedirs(imgdir, exist_ok=True) | |
shutil.copy(local_image_path, imgdir) | |
new_image_path = os.path.join(imgdir, os.path.basename(local_image_path)) | |
sample_submission, test_meta = csv_create(new_image_path, image_id_main) | |
cfg = get_cfg() | |
cfg.OUTPUT_DIR = outdir | |
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")) | |
cfg.DATASETS.TEST = () | |
cfg.DATALOADER.NUM_WORKERS = 2 | |
cfg.MODEL.WEIGHTS = model_path | |
cfg.SOLVER.IMS_PER_BATCH = 2 | |
cfg.SOLVER.BASE_LR = 0.001 | |
cfg.SOLVER.MAX_ITER = 30000 | |
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 512 | |
cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(thing_classes) | |
cfg.MODEL.WEIGHTS = model_path | |
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.0 | |
predictor = DefaultPredictor(cfg) | |
unique_id = f"bigdata2_{uuid.uuid4().hex[:8]}" | |
DatasetCatalog.register(unique_id, lambda: get_vinbigdata_dicts_test(imgdir, pd.read_csv(test_meta), debug=debug)) | |
MetadataCatalog.get(unique_id).set(thing_classes=thing_classes) | |
metadata = MetadataCatalog.get(unique_id) | |
dataset_dicts = get_vinbigdata_dicts_test(imgdir, pd.read_csv(test_meta), debug=debug) | |
if debug: | |
dataset_dicts = dataset_dicts[:100] | |
results_list = [] | |
batch_size = 4 | |
for i in tqdm(range(ceil(len(dataset_dicts) / batch_size))): | |
inds = list(range(batch_size * i, min(batch_size * (i + 1), len(dataset_dicts)))) | |
dataset_dicts_batch = [dataset_dicts[i] for i in inds] | |
im_list = [cv2.imread(d["file_name"]) for d in dataset_dicts_batch] | |
outputs_list = predict_batch(predictor, im_list) | |
for im, outputs, d in zip(im_list, outputs_list, dataset_dicts_batch): | |
resized_height, resized_width, ch = im.shape | |
if outputs["instances"].has("pred_classes"): | |
fields = outputs["instances"].get_fields() | |
pred_classes = fields["pred_classes"] | |
pred_scores = fields["scores"] | |
pred_boxes = fields["pred_boxes"].tensor | |
h_ratio = d["height"] / resized_height | |
w_ratio = d["width"] / resized_width | |
pred_boxes[:, [0, 2]] *= w_ratio | |
pred_boxes[:, [1, 3]] *= h_ratio | |
pred_classes_array = pred_classes.cpu().numpy() | |
pred_boxes_array = pred_boxes.cpu().numpy() | |
pred_scores_array = pred_scores.cpu().numpy() | |
result = { | |
"image_id": d["image_id"], | |
"PredictionString": format_pred(pred_classes_array, pred_boxes_array, pred_scores_array) | |
} | |
else: | |
result = {"image_id": d["image_id"], "PredictionString": "14 1 0 0 1 1"} | |
results_list.append(result) | |
submission_det = pd.DataFrame(results_list, columns=['image_id', 'PredictionString']) | |
submission_det_path = os.path.join(outdir, "submission_det.csv") | |
submission_det.to_csv(submission_det_path, index=False) | |
return submission_det_path | |
def predict(): | |
image_id = request.form['image_id'] | |
image_file = request.files['image'] | |
model_path = "model_final.pth" | |
local_image_path = os.path.join("input_images", image_file.filename) | |
os.makedirs("input_images", exist_ok=True) | |
image_file.save(local_image_path) | |
submission_det_path = prediction(image_id, local_image_path, model_path) | |
return send_file(submission_det_path, as_attachment=True) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=8888) | |