import os import io import copy import json import logging import contextlib from collections import OrderedDict import numpy as np import torch import pycocotools.mask as mask_util from multiprocessing import freeze_support from fvcore.common.file_io import PathManager from detectron2.data import MetadataCatalog from detectron2.utils.file_io import PathManager from detectron2.evaluation import DatasetEvaluator from .datasets.avis_api.avos import AVOS import sys sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) import aviseval def eval_track(out_dir, gt_file): freeze_support() # Command line interface: default_eval_config = aviseval.Evaluator.get_default_eval_config() default_dataset_config = aviseval.datasets.AVIS.get_default_dataset_config() default_dataset_config['TRACKERS_FOLDER'] = out_dir default_dataset_config['GT_File'] = gt_file default_metrics_config = {'METRICS': ['TrackMAP', 'HOTA']} # 'CLEAR', 'Identity' config = {**default_eval_config, **default_dataset_config, **default_metrics_config} # Merge default configs eval_config = {k: v for k, v in config.items() if k in default_eval_config.keys()} dataset_config = {k: v for k, v in config.items() if k in default_dataset_config.keys()} metrics_config = {k: v for k, v in config.items() if k in default_metrics_config.keys()} # Run code evaluator = aviseval.Evaluator(eval_config) dataset_list = [aviseval.datasets.AVIS(dataset_config)] metrics_list = [] for metric in [aviseval.metrics.TrackMAP, aviseval.metrics.HOTA]: if metric.get_name() in metrics_config['METRICS']: if metric == aviseval.metrics.TrackMAP: default_track_map_config = metric.get_default_metric_config() default_track_map_config['USE_TIME_RANGES'] = False default_track_map_config['AREA_RANGES'] = [[0 ** 2, 128 ** 2], [128 ** 2, 256 ** 2], [256 ** 2, 1e5 ** 2]] metrics_list.append(metric(default_track_map_config)) else: metrics_list.append(metric()) if len(metrics_list) == 0: raise Exception('No metrics selected for evaluation') output_res, output_msg = evaluator.evaluate(dataset_list, metrics_list) return output_res def instances_to_coco_json_video(inputs, outputs): """ Dump an "Instances" object to a COCO-format json that's used for evaluation. Args: instances (Instances): video_id (int): the image id Returns: list[dict]: list of json annotations in COCO format. """ assert len(inputs) == 1, "More than one inputs are loaded for inference!" video_id = inputs[0]["video_id"] video_length = inputs[0]["length"] scores = outputs["pred_scores"] labels = outputs["pred_labels"] masks = outputs["pred_masks"] avis_results = [] for instance_id, (s, l, m) in enumerate(zip(scores, labels, masks)): segms = [ mask_util.encode(np.array(_mask[:, :, None], order="F", dtype="uint8"))[0] for _mask in m ] for rle in segms: rle["counts"] = rle["counts"].decode("utf-8") res = { "video_id": video_id, "score": s, "category_id": l, "segmentations": segms, } avis_results.append(res) return avis_results class AVISEvaluator(DatasetEvaluator): def __init__( self, dataset_name, tasks=None, distributed=True, output_dir=None, *, use_fast_impl=True, ): self._logger = logging.getLogger(__name__) self._distributed = distributed self._output_dir = output_dir self._use_fast_impl = use_fast_impl self._cpu_device = torch.device("cpu") self.dataset_name = dataset_name self._metadata = MetadataCatalog.get(dataset_name) json_file = PathManager.get_local_path(self._metadata.json_file) with contextlib.redirect_stdout(io.StringIO()): self._avis_api = AVOS(json_file) self._do_evaluation = "annotations" in self._avis_api.dataset def reset(self): self._predictions = [] def process(self, inputs, outputs): """ Args: inputs: the inputs to a COCO model (e.g., GeneralizedRCNN). It is a list of dict. Each dict corresponds to an image and contains keys like "height", "width", "file_name", "image_id". outputs: the outputs of a COCO model. It is a list of dicts with key "instances" that contains :class:`Instances`. """ prediction = instances_to_coco_json_video(inputs, outputs) self._predictions.extend(prediction) def evaluate(self): """ Args: img_ids: a list of image IDs to evaluate on. Default to None for the whole dataset """ predictions = self._predictions self._results = OrderedDict() self._eval_predictions(predictions) # Copy so the caller can do whatever with results return copy.deepcopy(self._results) def _eval_predictions(self, predictions): """ Evaluate predictions. Fill self._results with the metrics of the tasks. """ self._logger.info("Preparing results for AVIS format ...") # unmap the category ids for COCO if hasattr(self._metadata, "thing_dataset_id_to_contiguous_id"): dataset_id_to_contiguous_id = self._metadata.thing_dataset_id_to_contiguous_id all_contiguous_ids = list(dataset_id_to_contiguous_id.values()) num_classes = len(all_contiguous_ids) assert min(all_contiguous_ids) == 0 and max(all_contiguous_ids) == num_classes - 1 reverse_id_mapping = {v: k for k, v in dataset_id_to_contiguous_id.items()} for result in predictions: category_id = result["category_id"] assert category_id < num_classes, ( f"A prediction has class={category_id}, " f"but the dataset only has {num_classes} classes and " f"predicted class id should be in [0, {num_classes - 1}]." ) result["category_id"] = reverse_id_mapping[category_id] o_d = None if self._output_dir: o_d = os.path.join(self._output_dir, "results") os.makedirs(os.path.join(o_d, "model_final"), exist_ok=True) file_path = os.path.join(o_d, "model_final", "results.json") self._logger.info("Saving results to {}".format(file_path)) with PathManager.open(file_path, "w") as f: f.write(json.dumps(predictions)) f.flush() if not self._do_evaluation: self._logger.info("Annotations are not available for evaluation.") return assert o_d != None output_res = eval_track(o_d, "test.json") self._results["segm"] = output_res['AVIS']['model_final']