import os import tqdm import traceback import numpy as np from . import utils from . import _timing from .metrics import Count from .utils import TrackEvalException from .metrics import compute_av_loc, combine_av_loc_sequences class Evaluator: """Evaluator class for evaluating different metrics for different datasets""" @staticmethod def get_default_eval_config(): """Returns the default config values for evaluation""" code_path = utils.get_code_path() default_config = { 'USE_PARALLEL': False, 'NUM_PARALLEL_CORES': 8, 'BREAK_ON_ERROR': True, # Raises exception and exits with error 'RETURN_ON_ERROR': False, # if not BREAK_ON_ERROR, then returns from function on error 'LOG_ON_ERROR': os.path.join(code_path, 'error_log.txt'), # if not None, save any errors into a log file. 'PRINT_RESULTS': False, 'PRINT_ONLY_COMBINED': False, 'PRINT_CONFIG': False, 'TIME_PROGRESS': False, 'DISPLAY_LESS_PROGRESS': True, 'OUTPUT_SUMMARY': False, 'OUTPUT_EMPTY_CLASSES': False, 'OUTPUT_DETAILED': False, 'PLOT_CURVES': False, } return default_config def __init__(self, config=None): """Initialise the evaluator with a config file""" self.config = utils.init_config(config, self.get_default_eval_config(), 'Eval') # Only run timing analysis if not run in parallel. if self.config['TIME_PROGRESS'] and not self.config['USE_PARALLEL']: _timing.DO_TIMING = True if self.config['DISPLAY_LESS_PROGRESS']: _timing.DISPLAY_LESS_PROGRESS = True @_timing.time def evaluate(self, dataset_list, metrics_list): """Evaluate a set of metrics on a set of datasets""" config = self.config metrics_list = metrics_list + [Count()] # Count metrics are always run metric_names = utils.validate_metrics_list(metrics_list) dataset_names = [dataset.get_name() for dataset in dataset_list] output_res = {} output_msg = {} for dataset, dataset_name in zip(dataset_list, dataset_names): # Get dataset info about what to evaluate output_res[dataset_name] = {} output_msg[dataset_name] = {} tracker_list, seq_list, class_list = dataset.get_eval_info() # Evaluate each tracker for tracker in tracker_list: # if not config['BREAK_ON_ERROR'] then go to next tracker without breaking try: print('\nEvaluating model ...... \n') res = {} res_av_loc = {} seq_list_sorted = sorted(seq_list) for curr_seq in tqdm.tqdm(seq_list_sorted): res[curr_seq] = eval_sequence(curr_seq, dataset, tracker, class_list, metrics_list, metric_names) res_av_loc[curr_seq] = eval_av_loc_sequence(curr_seq, dataset, tracker) # Combine results over all sequences and then over all classes res_av_loc_all = combine_av_loc_sequences(res_av_loc) # collecting combined cls keys (cls averaged, det averaged, super classes) combined_cls_keys = [] res['COMBINED_SEQ'] = {} # combine sequences for each class for c_cls in class_list: res['COMBINED_SEQ'][c_cls] = {} for metric, metric_name in zip(metrics_list, metric_names): curr_res = {seq_key: seq_value[c_cls][metric_name] for seq_key, seq_value in res.items() if seq_key != 'COMBINED_SEQ'} res['COMBINED_SEQ'][c_cls][metric_name] = metric.combine_sequences(curr_res) # combine classes if dataset.should_classes_combine: combined_cls_keys += ['cls_comb_cls_av', 'cls_comb_det_av', 'all'] res['COMBINED_SEQ']['cls_comb_cls_av'] = {} res['COMBINED_SEQ']['cls_comb_det_av'] = {} for metric, metric_name in zip(metrics_list, metric_names): cls_res = {cls_key: cls_value[metric_name] for cls_key, cls_value in res['COMBINED_SEQ'].items() if cls_key not in combined_cls_keys} res['COMBINED_SEQ']['cls_comb_cls_av'][metric_name] = \ metric.combine_classes_class_averaged(cls_res) res['COMBINED_SEQ']['cls_comb_det_av'][metric_name] = \ metric.combine_classes_det_averaged(cls_res) # combine classes to super classes if dataset.use_super_categories: for cat, sub_cats in dataset.super_categories.items(): combined_cls_keys.append(cat) res['COMBINED_SEQ'][cat] = {} for metric, metric_name in zip(metrics_list, metric_names): cat_res = {cls_key: cls_value[metric_name] for cls_key, cls_value in res['COMBINED_SEQ'].items() if cls_key in sub_cats} res['COMBINED_SEQ'][cat][metric_name] = metric.combine_classes_det_averaged(cat_res) # Print and output results in various formats output_fol = dataset.get_output_fol(tracker) tracker_display_name = dataset.get_display_name(tracker) for c_cls in res['COMBINED_SEQ'].keys(): # class_list + combined classes if calculated summaries = [] details = [] num_dets = res['COMBINED_SEQ'][c_cls]['Count']['Dets'] if config['OUTPUT_EMPTY_CLASSES'] or num_dets > 0: for metric, metric_name in zip(metrics_list, metric_names): # for combined classes there is no per sequence evaluation if c_cls in combined_cls_keys: table_res = {'COMBINED_SEQ': res['COMBINED_SEQ'][c_cls][metric_name]} else: table_res = {seq_key: seq_value[c_cls][metric_name] for seq_key, seq_value in res.items()} if config['PLOT_CURVES']: metric.plot_single_tracker_results(table_res, tracker_display_name, c_cls, output_fol) if config['OUTPUT_SUMMARY']: utils.write_summary_results(summaries, c_cls, output_fol) if config['OUTPUT_DETAILED']: utils.write_detailed_results(details, c_cls, output_fol) # Output for returning from function res_output = {} res_output["AP_all"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['TrackMAP']['AP_all']), 2) res_output["AP_s"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['TrackMAP']['AP_area_s']), 2) res_output["AP_m"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['TrackMAP']['AP_area_m']), 2) res_output["AP_l"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['TrackMAP']['AP_area_l']), 2) res_output["AR_all"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['TrackMAP']['AR_all']), 2) res_output["HOTA"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['HOTA']), 2) res_output["DetA"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['DetA']), 2) res_output["DetRe"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['DetRe']), 2) res_output["DetPr"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['DetPr']), 2) res_output["AssA"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['AssA']), 2) res_output["AssRe"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['AssRe']), 2) res_output["AssPr"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['AssPr']), 2) res_output["LocA"] = round(100 * np.mean(res['COMBINED_SEQ']['cls_comb_cls_av']['HOTA']['LocA']), 2) res_output["FA"] = round(100 * np.mean(res_av_loc_all['FA']), 2) res_output["FAn"] = round(100 * np.mean(res_av_loc_all['FAn']), 2) res_output['FAn_count'] = int(np.mean(res_av_loc_all['FAn_count'])) res_output['FAn_all'] = int(np.mean(res_av_loc_all['FAn_all'])) res_output["FAs"] = round(100 * np.mean(res_av_loc_all['FAs']), 2) res_output['FAs_count'] = int(np.mean(res_av_loc_all['FAs_count'])) res_output['FAs_all'] = int(np.mean(res_av_loc_all['FAs_all'])) res_output["FAm"] = round(100 * np.mean(res_av_loc_all['FAm']), 2) res_output['FAm_count'] = int(np.mean(res_av_loc_all['FAm_count'])) res_output['FAm_all'] = int(np.mean(res_av_loc_all['FAm_all'])) output_res[dataset_name][tracker] = res_output output_msg[dataset_name][tracker] = 'Success' except Exception as err: output_res[dataset_name][tracker] = None if type(err) == TrackEvalException: output_msg[dataset_name][tracker] = str(err) else: output_msg[dataset_name][tracker] = 'Unknown error occurred.' print('Tracker %s was unable to be evaluated.' % tracker) print(err) traceback.print_exc() if config['LOG_ON_ERROR'] is not None: with open(config['LOG_ON_ERROR'], 'a') as f: print(dataset_name, file=f) print(tracker, file=f) print(traceback.format_exc(), file=f) print('\n\n\n', file=f) if config['BREAK_ON_ERROR']: raise err elif config['RETURN_ON_ERROR']: return output_res, output_msg return output_res, output_msg @_timing.time def eval_sequence(seq, dataset, tracker, class_list, metrics_list, metric_names): """Function for evaluating a single sequence""" raw_data = dataset.get_raw_seq_data(tracker, seq) seq_res = {} for cls in class_list: seq_res[cls] = {} data = dataset.get_preprocessed_seq_data(raw_data, cls) for metric, met_name in zip(metrics_list, metric_names): seq_res[cls][met_name] = metric.eval_sequence(data) return seq_res def eval_av_loc_sequence(seq, dataset, tracker): """Function for evaluating a single sequence""" raw_data = dataset.get_raw_seq_data(tracker, seq) av_loc_res = compute_av_loc(raw_data) return av_loc_res