# Copyright (c) Github URL # Copied from # https://github.com/youtubevos/cocoapi/blob/master/PythonAPI/pycocotools/ytvos.py __author__ = 'ychfan' # Interface for accessing the YouTubeVIS dataset. # The following API functions are defined: # YTVIS - YTVIS api class that loads YouTubeVIS annotation file # and prepare data structures. # decodeMask - Decode binary mask M encoded via run-length encoding. # encodeMask - Encode binary mask M using run-length encoding. # getAnnIds - Get ann ids that satisfy given filter conditions. # getCatIds - Get cat ids that satisfy given filter conditions. # getImgIds - Get img ids that satisfy given filter conditions. # loadAnns - Load anns with the specified ids. # loadCats - Load cats with the specified ids. # loadImgs - Load imgs with the specified ids. # annToMask - Convert segmentation in an annotation to binary mask. # loadRes - Load algorithm results and create API for accessing them. # Microsoft COCO Toolbox. version 2.0 # Data, paper, and tutorials available at: http://mscoco.org/ # Code written by Piotr Dollar and Tsung-Yi Lin, 2014. # Licensed under the Simplified BSD License [see bsd.txt] import copy import itertools import json import sys import time from collections import defaultdict import numpy as np from pycocotools import mask as maskUtils PYTHON_VERSION = sys.version_info[0] def _isArrayLike(obj): return hasattr(obj, '__iter__') and hasattr(obj, '__len__') class YTVIS: def __init__(self, annotation_file=None): """Constructor of Microsoft COCO helper class for reading and visualizing annotations. :param annotation_file (str | dict): location of annotation file or dict results. :param image_folder (str): location to the folder that hosts images. :return: """ # load dataset self.dataset, self.anns, self.cats, self.vids = dict(), dict(), dict( ), dict() self.vidToAnns, self.catToVids = defaultdict(list), defaultdict(list) if annotation_file is not None: print('loading annotations into memory...') tic = time.time() if type(annotation_file) == str: dataset = json.load(open(annotation_file, 'r')) else: dataset = annotation_file assert type( dataset ) == dict, 'annotation file format {} not supported'.format( type(dataset)) print('Done (t={:0.2f}s)'.format(time.time() - tic)) self.dataset = dataset self.createIndex() def createIndex(self): # create index print('creating index...') anns, cats, vids = {}, {}, {} vidToAnns, catToVids = defaultdict(list), defaultdict(list) if 'annotations' in self.dataset: for ann in self.dataset['annotations']: vidToAnns[ann['video_id']].append(ann) anns[ann['id']] = ann if 'videos' in self.dataset: for vid in self.dataset['videos']: vids[vid['id']] = vid if 'categories' in self.dataset: for cat in self.dataset['categories']: cats[cat['id']] = cat if 'annotations' in self.dataset and 'categories' in self.dataset: for ann in self.dataset['annotations']: catToVids[ann['category_id']].append(ann['video_id']) print('index created!') # create class members self.anns = anns self.vidToAnns = vidToAnns self.catToVids = catToVids self.vids = vids self.cats = cats def getAnnIds(self, vidIds=[], catIds=[], areaRng=[], iscrowd=None): """Get ann ids that satisfy given filter conditions. default skips that filter. :param vidIds (int array) : get anns for given vids catIds (int array) : get anns for given cats areaRng (float array) : get anns for given area range iscrowd (boolean) : get anns for given crowd label :return: ids (int array) : integer array of ann ids """ vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] catIds = catIds if _isArrayLike(catIds) else [catIds] if len(vidIds) == len(catIds) == len(areaRng) == 0: anns = self.dataset['annotations'] else: if not len(vidIds) == 0: lists = [ self.vidToAnns[vidId] for vidId in vidIds if vidId in self.vidToAnns ] anns = list(itertools.chain.from_iterable(lists)) else: anns = self.dataset['annotations'] anns = anns if len(catIds) == 0 else [ ann for ann in anns if ann['category_id'] in catIds ] anns = anns if len(areaRng) == 0 else [ ann for ann in anns if ann['avg_area'] > areaRng[0] and ann['avg_area'] < areaRng[1] ] if iscrowd is not None: ids = [ann['id'] for ann in anns if ann['iscrowd'] == iscrowd] else: ids = [ann['id'] for ann in anns] return ids def getCatIds(self, catNms=[], supNms=[], catIds=[]): """filtering parameters. default skips that filter. :param catNms (str array) : get cats for given cat names :param supNms (str array) : get cats for given supercategory names :param catIds (int array) : get cats for given cat ids :return: ids (int array) : integer array of cat ids """ catNms = catNms if _isArrayLike(catNms) else [catNms] supNms = supNms if _isArrayLike(supNms) else [supNms] catIds = catIds if _isArrayLike(catIds) else [catIds] if len(catNms) == len(supNms) == len(catIds) == 0: cats = self.dataset['categories'] else: cats = self.dataset['categories'] cats = cats if len(catNms) == 0 else [ cat for cat in cats if cat['name'] in catNms ] cats = cats if len(supNms) == 0 else [ cat for cat in cats if cat['supercategory'] in supNms ] cats = cats if len(catIds) == 0 else [ cat for cat in cats if cat['id'] in catIds ] ids = [cat['id'] for cat in cats] return ids def getVidIds(self, vidIds=[], catIds=[]): """Get vid ids that satisfy given filter conditions. :param vidIds (int array) : get vids for given ids :param catIds (int array) : get vids with all given cats :return: ids (int array) : integer array of vid ids """ vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] catIds = catIds if _isArrayLike(catIds) else [catIds] if len(vidIds) == len(catIds) == 0: ids = self.vids.keys() else: ids = set(vidIds) for i, catId in enumerate(catIds): if i == 0 and len(ids) == 0: ids = set(self.catToVids[catId]) else: ids &= set(self.catToVids[catId]) return list(ids) def loadAnns(self, ids=[]): """Load anns with the specified ids. :param ids (int array) : integer ids specifying anns :return: anns (object array) : loaded ann objects """ if _isArrayLike(ids): return [self.anns[id] for id in ids] elif type(ids) == int: return [self.anns[ids]] def loadCats(self, ids=[]): """Load cats with the specified ids. :param ids (int array) : integer ids specifying cats :return: cats (object array) : loaded cat objects """ if _isArrayLike(ids): return [self.cats[id] for id in ids] elif type(ids) == int: return [self.cats[ids]] def loadVids(self, ids=[]): """Load anns with the specified ids. :param ids (int array) : integer ids specifying vid :return: vids (object array) : loaded vid objects """ if _isArrayLike(ids): return [self.vids[id] for id in ids] elif type(ids) == int: return [self.vids[ids]] def loadRes(self, resFile): """Load result file and return a result api object. :param resFile (str) : file name of result file :return: res (obj) : result api object """ res = YTVIS() res.dataset['videos'] = [img for img in self.dataset['videos']] print('Loading and preparing results...') tic = time.time() if type(resFile) == str or (PYTHON_VERSION == 2 and type(resFile) == str): anns = json.load(open(resFile)) elif type(resFile) == np.ndarray: anns = self.loadNumpyAnnotations(resFile) else: anns = resFile assert type(anns) == list, 'results in not an array of objects' annsVidIds = [ann['video_id'] for ann in anns] assert set(annsVidIds) == (set(annsVidIds) & set(self.getVidIds())), \ 'Results do not correspond to current coco set' if 'segmentations' in anns[0]: res.dataset['categories'] = copy.deepcopy( self.dataset['categories']) for id, ann in enumerate(anns): ann['areas'] = [] if 'bboxes' not in ann: ann['bboxes'] = [] for seg in ann['segmentations']: # now only support compressed RLE format # as segmentation results if seg: ann['areas'].append(maskUtils.area(seg)) if len(ann['bboxes']) < len(ann['areas']): ann['bboxes'].append(maskUtils.toBbox(seg)) else: ann['areas'].append(None) if len(ann['bboxes']) < len(ann['areas']): ann['bboxes'].append(None) ann['id'] = id + 1 l_ori = [a for a in ann['areas'] if a] if len(l_ori) == 0: ann['avg_area'] = 0 else: ann['avg_area'] = np.array(l_ori).mean() ann['iscrowd'] = 0 print('DONE (t={:0.2f}s)'.format(time.time() - tic)) res.dataset['annotations'] = anns res.createIndex() return res def annToRLE(self, ann, frameId): """Convert annotation which can be polygons, uncompressed RLE to RLE. :return: binary mask (numpy 2D array) """ t = self.vids[ann['video_id']] h, w = t['height'], t['width'] segm = ann['segmentations'][frameId] if type(segm) == list: # polygon -- a single object might consist of multiple parts # we merge all parts into one mask rle code rles = maskUtils.frPyObjects(segm, h, w) rle = maskUtils.merge(rles) elif type(segm['counts']) == list: # uncompressed RLE rle = maskUtils.frPyObjects(segm, h, w) else: # rle rle = segm return rle def annToMask(self, ann, frameId): """Convert annotation which can be polygons, uncompressed RLE, or RLE to binary mask. :return: binary mask (numpy 2D array) """ rle = self.annToRLE(ann, frameId) m = maskUtils.decode(rle) return m