|
import os |
|
import io |
|
import copy |
|
import json |
|
import logging |
|
import contextlib |
|
from collections import OrderedDict |
|
|
|
import numpy as np |
|
import torch |
|
|
|
import pycocotools.mask as mask_util |
|
from multiprocessing import freeze_support |
|
from fvcore.common.file_io import PathManager |
|
from detectron2.data import MetadataCatalog |
|
from detectron2.utils.file_io import PathManager |
|
from detectron2.evaluation import DatasetEvaluator |
|
|
|
from .datasets.avis_api.avos import AVOS |
|
|
|
import sys |
|
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) |
|
import aviseval |
|
|
|
|
|
def eval_track(out_dir, gt_file): |
|
freeze_support() |
|
|
|
|
|
default_eval_config = aviseval.Evaluator.get_default_eval_config() |
|
default_dataset_config = aviseval.datasets.AVIS.get_default_dataset_config() |
|
default_dataset_config['TRACKERS_FOLDER'] = out_dir |
|
default_dataset_config['GT_File'] = gt_file |
|
default_metrics_config = {'METRICS': ['TrackMAP', 'HOTA']} |
|
config = {**default_eval_config, **default_dataset_config, **default_metrics_config} |
|
eval_config = {k: v for k, v in config.items() if k in default_eval_config.keys()} |
|
dataset_config = {k: v for k, v in config.items() if k in default_dataset_config.keys()} |
|
metrics_config = {k: v for k, v in config.items() if k in default_metrics_config.keys()} |
|
|
|
|
|
evaluator = aviseval.Evaluator(eval_config) |
|
dataset_list = [aviseval.datasets.AVIS(dataset_config)] |
|
metrics_list = [] |
|
for metric in [aviseval.metrics.TrackMAP, aviseval.metrics.HOTA]: |
|
if metric.get_name() in metrics_config['METRICS']: |
|
if metric == aviseval.metrics.TrackMAP: |
|
default_track_map_config = metric.get_default_metric_config() |
|
default_track_map_config['USE_TIME_RANGES'] = False |
|
default_track_map_config['AREA_RANGES'] = [[0 ** 2, 128 ** 2], |
|
[128 ** 2, 256 ** 2], |
|
[256 ** 2, 1e5 ** 2]] |
|
metrics_list.append(metric(default_track_map_config)) |
|
else: |
|
metrics_list.append(metric()) |
|
if len(metrics_list) == 0: |
|
raise Exception('No metrics selected for evaluation') |
|
|
|
output_res, output_msg = evaluator.evaluate(dataset_list, metrics_list) |
|
|
|
return output_res |
|
|
|
|
|
def instances_to_coco_json_video(inputs, outputs): |
|
""" |
|
Dump an "Instances" object to a COCO-format json that's used for evaluation. |
|
|
|
Args: |
|
instances (Instances): |
|
video_id (int): the image id |
|
|
|
Returns: |
|
list[dict]: list of json annotations in COCO format. |
|
""" |
|
assert len(inputs) == 1, "More than one inputs are loaded for inference!" |
|
|
|
video_id = inputs[0]["video_id"] |
|
video_length = inputs[0]["length"] |
|
|
|
scores = outputs["pred_scores"] |
|
labels = outputs["pred_labels"] |
|
masks = outputs["pred_masks"] |
|
|
|
avis_results = [] |
|
for instance_id, (s, l, m) in enumerate(zip(scores, labels, masks)): |
|
segms = [ |
|
mask_util.encode(np.array(_mask[:, :, None], order="F", dtype="uint8"))[0] |
|
for _mask in m |
|
] |
|
for rle in segms: |
|
rle["counts"] = rle["counts"].decode("utf-8") |
|
|
|
res = { |
|
"video_id": video_id, |
|
"score": s, |
|
"category_id": l, |
|
"segmentations": segms, |
|
} |
|
avis_results.append(res) |
|
|
|
return avis_results |
|
|
|
|
|
|
|
class AVISEvaluator(DatasetEvaluator): |
|
def __init__( |
|
self, |
|
dataset_name, |
|
tasks=None, |
|
distributed=True, |
|
output_dir=None, |
|
*, |
|
use_fast_impl=True, |
|
): |
|
self._logger = logging.getLogger(__name__) |
|
self._distributed = distributed |
|
self._output_dir = output_dir |
|
self._use_fast_impl = use_fast_impl |
|
|
|
self._cpu_device = torch.device("cpu") |
|
|
|
self.dataset_name = dataset_name |
|
self._metadata = MetadataCatalog.get(dataset_name) |
|
|
|
json_file = PathManager.get_local_path(self._metadata.json_file) |
|
with contextlib.redirect_stdout(io.StringIO()): |
|
self._avis_api = AVOS(json_file) |
|
|
|
self._do_evaluation = "annotations" in self._avis_api.dataset |
|
|
|
|
|
def reset(self): |
|
self._predictions = [] |
|
|
|
|
|
def process(self, inputs, outputs): |
|
""" |
|
Args: |
|
inputs: the inputs to a COCO model (e.g., GeneralizedRCNN). |
|
It is a list of dict. Each dict corresponds to an image and |
|
contains keys like "height", "width", "file_name", "image_id". |
|
outputs: the outputs of a COCO model. It is a list of dicts with key |
|
"instances" that contains :class:`Instances`. |
|
""" |
|
prediction = instances_to_coco_json_video(inputs, outputs) |
|
self._predictions.extend(prediction) |
|
|
|
|
|
def evaluate(self): |
|
""" |
|
Args: |
|
img_ids: a list of image IDs to evaluate on. Default to None for the whole dataset |
|
""" |
|
|
|
predictions = self._predictions |
|
|
|
self._results = OrderedDict() |
|
self._eval_predictions(predictions) |
|
|
|
return copy.deepcopy(self._results) |
|
|
|
|
|
def _eval_predictions(self, predictions): |
|
""" |
|
Evaluate predictions. Fill self._results with the metrics of the tasks. |
|
""" |
|
self._logger.info("Preparing results for AVIS format ...") |
|
|
|
|
|
if hasattr(self._metadata, "thing_dataset_id_to_contiguous_id"): |
|
dataset_id_to_contiguous_id = self._metadata.thing_dataset_id_to_contiguous_id |
|
|
|
all_contiguous_ids = list(dataset_id_to_contiguous_id.values()) |
|
num_classes = len(all_contiguous_ids) |
|
assert min(all_contiguous_ids) == 0 and max(all_contiguous_ids) == num_classes - 1 |
|
|
|
reverse_id_mapping = {v: k for k, v in dataset_id_to_contiguous_id.items()} |
|
for result in predictions: |
|
category_id = result["category_id"] |
|
assert category_id < num_classes, ( |
|
f"A prediction has class={category_id}, " |
|
f"but the dataset only has {num_classes} classes and " |
|
f"predicted class id should be in [0, {num_classes - 1}]." |
|
) |
|
result["category_id"] = reverse_id_mapping[category_id] |
|
|
|
o_d = None |
|
if self._output_dir: |
|
o_d = os.path.join(self._output_dir, "results") |
|
os.makedirs(os.path.join(o_d, "model_final"), exist_ok=True) |
|
file_path = os.path.join(o_d, "model_final", "results.json") |
|
|
|
self._logger.info("Saving results to {}".format(file_path)) |
|
with PathManager.open(file_path, "w") as f: |
|
f.write(json.dumps(predictions)) |
|
f.flush() |
|
|
|
if not self._do_evaluation: |
|
self._logger.info("Annotations are not available for evaluation.") |
|
return |
|
|
|
assert o_d != None |
|
output_res = eval_track(o_d, "test.json") |
|
self._results["segm"] = output_res['AVIS']['model_final'] |
|
|