import os from pathlib import Path from tqdm.notebook import tqdm import numpy as np import pandas as pd from PIL import Image as PILImage import torch import cv2 import pickle import shutil from detectron2.config import get_cfg from detectron2 import model_zoo from detectron2.engine import DefaultPredictor from detectron2.data import DatasetCatalog, MetadataCatalog from detectron2.utils.visualizer import ColorMode, Visualizer from math import ceil import uuid from flask import Flask, request, send_file import matplotlib matplotlib.use('Agg') app = Flask(__name__) def get_vinbigdata_dicts_test(imgdir: Path, test_meta: pd.DataFrame, use_cache: bool = True, debug: bool = True): debug_str = f"_debug{int(debug)}" cache_path = Path(".") / f"dataset_dicts_cache_test{debug_str}.pkl" if not use_cache or not cache_path.exists(): print("Creating data...") if debug: test_meta = test_meta.iloc[:500] # For debug # Load 1 image to get image size. image_id = test_meta.loc[0, "image_id"] image_path = os.path.join(imgdir, f"{image_id}.png") image = cv2.imread(image_path) resized_height, resized_width, ch = image.shape dataset_dicts = [] for index, test_meta_row in tqdm(test_meta.iterrows(), total=len(test_meta)): record = {} image_id, height, width = test_meta_row.values filename = os.path.join(imgdir, f"{image_id}.png") record["file_name"] = filename record["image_id"] = image_id record["height"] = resized_height record["width"] = resized_width dataset_dicts.append(record) with open(cache_path, mode="wb") as f: pickle.dump(dataset_dicts, f) print(f"Load from cache {cache_path}") with open(cache_path, mode="rb") as f: dataset_dicts = pickle.load(f) return dataset_dicts def format_pred(labels: np.ndarray, boxes: np.ndarray, scores: np.ndarray) -> str: pred_strings = [] for label, score, bbox in zip(labels, scores, boxes): xmin, ymin, xmax, ymax = bbox.astype(np.int64) pred_strings.append(f"{label} {score} {xmin} {ymin} {xmax} {ymax}") return " ".join(pred_strings) def predict_batch(predictor: DefaultPredictor, im_list: list) -> list: with torch.no_grad(): inputs_list = [] for original_image in im_list: if predictor.input_format == "RGB": original_image = original_image[:, :, ::-1] height, width = original_image.shape[:2] image = torch.as_tensor(original_image.astype("float32").transpose(2, 0, 1)) inputs = {"image": image, "height": height, "width": width} inputs_list.append(inputs) predictions = predictor.model(inputs_list) return predictions def csv_create(new_image_path, image_id): image = PILImage.open(new_image_path) width, height = image.size directory = os.path.dirname(new_image_path) sample_submission_data = { 'image_id': [image_id], 'PredictionString': ['14 1 0 0 1 1'] } sample_submission_df = pd.DataFrame(sample_submission_data) sample_submission_path = os.path.join(directory, 'sample_submission.csv') sample_submission_df.to_csv(sample_submission_path, index=False) test_meta_data = { 'image_id': [image_id], 'dim0': [width], 'dim1': [height] } test_meta_df = pd.DataFrame(test_meta_data) test_meta_path = os.path.join(directory, 'test_meta.csv') test_meta_df.to_csv(test_meta_path, index=False) print("CSV files have been generated successfully.") return sample_submission_path, test_meta_path def prediction(image_id_main, local_image_path, model_path): thing_classes = [ "Aortic enlargement", "Atelectasis", "Calcification", "Cardiomegaly", "Consolidation", "ILD", "Infiltration", "Lung Opacity", "Nodule/Mass", "Other lesion", "Pleural effusion", "Pleural thickening", "Pneumothorax", "Pulmonary fibrosis" ] category_name_to_id = {class_name: index for index, class_name in enumerate(thing_classes)} debug = False outdir = 'result_images' os.makedirs(outdir, exist_ok=True) imgdir = f'processed_images_{image_id_main}' os.makedirs(imgdir, exist_ok=True) shutil.copy(local_image_path, imgdir) new_image_path = os.path.join(imgdir, os.path.basename(local_image_path)) sample_submission, test_meta = csv_create(new_image_path, image_id_main) cfg = get_cfg() cfg.OUTPUT_DIR = outdir cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")) cfg.DATASETS.TEST = () cfg.DATALOADER.NUM_WORKERS = 2 cfg.MODEL.WEIGHTS = model_path cfg.SOLVER.IMS_PER_BATCH = 2 cfg.SOLVER.BASE_LR = 0.001 cfg.SOLVER.MAX_ITER = 30000 cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 512 cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(thing_classes) cfg.MODEL.WEIGHTS = model_path cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.0 predictor = DefaultPredictor(cfg) unique_id = f"bigdata2_{uuid.uuid4().hex[:8]}" DatasetCatalog.register(unique_id, lambda: get_vinbigdata_dicts_test(imgdir, pd.read_csv(test_meta), debug=debug)) MetadataCatalog.get(unique_id).set(thing_classes=thing_classes) metadata = MetadataCatalog.get(unique_id) dataset_dicts = get_vinbigdata_dicts_test(imgdir, pd.read_csv(test_meta), debug=debug) if debug: dataset_dicts = dataset_dicts[:100] results_list = [] batch_size = 4 for i in tqdm(range(ceil(len(dataset_dicts) / batch_size))): inds = list(range(batch_size * i, min(batch_size * (i + 1), len(dataset_dicts)))) dataset_dicts_batch = [dataset_dicts[i] for i in inds] im_list = [cv2.imread(d["file_name"]) for d in dataset_dicts_batch] outputs_list = predict_batch(predictor, im_list) for im, outputs, d in zip(im_list, outputs_list, dataset_dicts_batch): resized_height, resized_width, ch = im.shape if outputs["instances"].has("pred_classes"): fields = outputs["instances"].get_fields() pred_classes = fields["pred_classes"] pred_scores = fields["scores"] pred_boxes = fields["pred_boxes"].tensor h_ratio = d["height"] / resized_height w_ratio = d["width"] / resized_width pred_boxes[:, [0, 2]] *= w_ratio pred_boxes[:, [1, 3]] *= h_ratio pred_classes_array = pred_classes.cpu().numpy() pred_boxes_array = pred_boxes.cpu().numpy() pred_scores_array = pred_scores.cpu().numpy() result = { "image_id": d["image_id"], "PredictionString": format_pred(pred_classes_array, pred_boxes_array, pred_scores_array) } else: result = {"image_id": d["image_id"], "PredictionString": "14 1 0 0 1 1"} results_list.append(result) submission_det = pd.DataFrame(results_list, columns=['image_id', 'PredictionString']) submission_det_path = os.path.join(outdir, "submission_det.csv") submission_det.to_csv(submission_det_path, index=False) return submission_det_path @app.route('/', methods=['POST']) def predict(): image_id = request.form['image_id'] image_file = request.files['image'] model_path = "model_final.pth" local_image_path = os.path.join("input_images", image_file.filename) os.makedirs("input_images", exist_ok=True) image_file.save(local_image_path) submission_det_path = prediction(image_id, local_image_path, model_path) return send_file(submission_det_path, as_attachment=True) if __name__ == '__main__': app.run(host='0.0.0.0', port=8888)