import project_path import os import cv2 import numpy as np import json from threading import Lock from contextlib import contextmanager import torch from torch.utils.data import Dataset from PIL import Image from datetime import datetime # assumes yolov5 on sys.path from lib.yolov5.utils.general import xyxy2xywh from lib.yolov5.utils.augmentations import letterbox from lib.yolov5.utils.dataloaders import create_dataloader as create_yolo_dataloader from backend.pyDIDSON import pyDIDSON from backend.aris import BEAM_WIDTH_DIR # use this flag to test the difference between direct ARIS dataloading and # using the jpeg compressed version. very slow. not much difference observed. TEST_JPG_COMPRESSION = False # # # # # # # Factory(ish) methods for DataLoader creation. Easy entry points to this module. # # # # # # def create_dataloader_aris(aris_filepath, beam_width_dir=BEAM_WIDTH_DIR, annotations_file=None, batch_size=32, stride=64, pad=0.5, img_size=896, rank=-1, world_size=1, workers=0, disable_output=False, cache_bg_frames=False, num_frames_bg_subtract=1000): """ Get a PyTorch Dataset and DataLoader for ARIS files with (optional) associated fisheye-formatted labels. """ print('dataset', datetime.now()) # Make sure only the first process in DDP process the dataset first, and the following others can use the cache # this is a no-op for a single-gpu machine with torch_distributed_zero_first(rank): dataset = YOLOARISBatchedDataset(aris_filepath, beam_width_dir, annotations_file, stride, pad, img_size, disable_output=disable_output, cache_bg_frames=cache_bg_frames, num_frames_bg_subtract=num_frames_bg_subtract) batch_size = min(batch_size, len(dataset)) nw = min([os.cpu_count() // world_size, batch_size if batch_size > 1 else 0, workers]) # number of workers print('dataloader', datetime.now()) if not disable_output: print("dataset size", len(dataset)) print("dataset shape", dataset.shape) print("Num workers", nw) # sampler = torch.utils.data.distributed.DistributedSampler(dataset) if rank != -1 else None # if extending to multi-GPU inference, will need this dataloader = torch.utils.data.dataloader.DataLoader(dataset, batch_size=None, sampler=OnePerBatchSampler(data_source=dataset, batch_size=batch_size), num_workers=nw, pin_memory=True, collate_fn=collate_fn) print('done', datetime.now()) return dataloader, dataset def create_dataloader_frames(frames_path, batch_size=32, model_stride_max=32, pad=0.5, img_size=896, rank=-1, world_size=1, workers=0, disable_output=False): """ Create a DataLoader for a directory of frames without labels. Args: model_stride_max: use model.stride.max() """ gs = max(int(model_stride_max), 32) # grid size (max stride) return create_yolo_dataloader(frames_path, img_size, batch_size, gs, single_cls=False, augment=False, hyp=None, cache=None, rect=True, rank=rank, workers=workers, pad=pad)[0] def create_dataloader_frames_only(frames_path, batch_size=32, img_size=896, workers=0): """ Create a DataLoader for a directory of frames without labels. Args: model_stride_max: use model.stride.max() """ return YOLOFrameDataset(frames_path, img_size=img_size, batch_size=batch_size) # # # # # # # End factory(ish) methods # # # # # # import os import pandas as pd from torchvision.io import read_image import re class YOLOFrameDataset(Dataset): def __init__(self, img_dir, img_size=896, batch_size=32, stride=64, pad=0.5): self.img_dir = img_dir self.img_size = img_size self.files = os.listdir(img_dir) self.files = list(filter(lambda f: f[-4:] == ".jpg", self.files)) self.files.sort(key=lambda f: int(re.sub('\D', '', f))) temp_img = read_image(os.path.join(self.img_dir, self.files[0])) size = temp_img.shape self.ydim = size[1] self.xdim = size[2] n = len(self.files) aspect_ratio = self.ydim / self.xdim if aspect_ratio < 1: shape = [aspect_ratio, 1] elif aspect_ratio > 1: shape = [1, 1 / aspect_ratio] self.original_shape = (self.ydim, self.xdim) self.shape = np.ceil(np.array(shape) * img_size / stride + pad).astype(int) * stride self.batch_indices = [] for i in range(0, n, batch_size): self.batch_indices.append((i, min(n, i+batch_size))) @classmethod def load_image(cls, img, img_size=896): """Loads and resizes 1 image from dataset, returns img, original hw, resized hw. Modified from ScaledYOLOv4.datasets.load_image() """ h0, w0 = img.shape[:2] h1, w1 = h0, w0 r = img_size / max(h0, w0) if r != 1: # always resize down, only resize up if training with augmentation interp = cv2.INTER_AREA if r < 1 else cv2.INTER_LINEAR img = cv2.resize(img, (int(w0 * r), int(h0 * r)), interpolation=interp) h1, w1 = img.shape[:2] return img, (h0, w0), (h1, w1) # img, hw_original, hw_resized def __len__(self): return len(self.batch_indices) def __iter__(self): for batch_idx in self.batch_indices: batch = [] labels = None shapes = [] for i in range(batch_idx[0], batch_idx[1]): img_name = self.files[i] img_path = os.path.join(self.img_dir, img_name) image = Image.open(img_path) image = np.asarray(image) img, (h0, w0), (h, w) = self.load_image(image, img_size=self.img_size) # Letterbox img, ratio, pad = letterbox(img, self.shape, auto=False, scaleup=False) shape = (h0, w0), ((h / h0, w / w0), pad) # for COCO mAP rescaling img = img.transpose(2, 0, 1) # to -> C x H x W img = np.ascontiguousarray(img) img = torch.from_numpy(img) shapes.append(shape) batch.append(img) image = torch.stack(batch) yield (image, labels, shapes) class ARISBatchedDataset(Dataset): def __init__(self, aris_filepath, beam_width_dir, annotations_file, batch_size, num_frames_bg_subtract=1000, disable_output=False, cache_bg_frames=False): """ A PyTorch Dataset class for loading an ARIS file and (optional) associated fisheye-format labels. This class handles the ARIS frame extraction and 3-channel representation generation. It is called a "BatchedDataset" because it loads contiguous frames in self.batch_size chunks. ** The PyTorch sampler must be aware of this!! ** Use the OnePerBatchSampler in this module when using this Dataset. Args: cache_bg_frames: keep the frames used for bg subtraction stored in memory. careful of memory issues. only recommended for small values of num_frames_bg_subtract """ # open ARIS data stream - TODO: make sure this is one per worker self.data = open(aris_filepath, 'rb') self.data_lock = Lock() self.beam_width_dir = beam_width_dir self.disable_output = disable_output self.aris_filepath = aris_filepath self.cache_bg_frames = cache_bg_frames # get header info self.didson = pyDIDSON(self.aris_filepath, beam_width_dir=beam_width_dir) self.xdim = self.didson.info['xdim'] self.ydim = self.didson.info['ydim'] # disable automatic batching - do it ourselves, reading batch_size frames from # the ARIS file at a time self.batch_size = batch_size # load fisheye annotations if annotations_file is None: if not self.disable_output: print("Loading file with no labels.") self.start_frame = self.didson.info['startframe'] self.end_frame = self.didson.info['endframe'] or self.didson.info['numframes'] self.labels = None else: self._load_labels(annotations_file) # intiialize the background subtraction self.num_frames_bg_subtract = num_frames_bg_subtract self._init_bg_frame() def _init_bg_frame(self): """ Intialize bg frame for bg subtraction. Uses min(self.num_frames_bg_subtract, total_frames) frames to do mean subtraction. Caches these frames in self.extracted_frames for reuse. """ # ensure the number of frames used is a multiple of self.batch_size so we can cache them and retrieve full batches # add 1 extra frame to be used for optical flow calculation num_frames_bg = min(self.end_frame - self.start_frame, self.num_frames_bg_subtract // self.batch_size * self.batch_size + 1) if not self.disable_output: print("Initializing mean frame for background subtraction using", num_frames_bg, "frames...") frames_for_bg_subtract = self.didson.load_frames(start_frame=self.start_frame, end_frame=self.start_frame + num_frames_bg) ### NEW WAY ### # save memory (and time?) by computing these in a streaming fashion vs. in a big batch self.mean_blurred_frame = np.zeros([self.ydim, self.xdim], dtype=np.float32) max_blurred_frame = np.zeros([self.ydim, self.xdim], dtype=np.float32) for i in range(frames_for_bg_subtract.shape[0]): blurred = cv2.GaussianBlur( frames_for_bg_subtract[i], (5,5), 0) self.mean_blurred_frame += blurred max_blurred_frame = np.maximum(max_blurred_frame, np.abs(blurred)) self.mean_blurred_frame /= frames_for_bg_subtract.shape[0] max_blurred_frame -= self.mean_blurred_frame self.mean_normalization_value = np.max(max_blurred_frame) # cache these for later self.extracted_frames = [] # Because of the optical flow computation, we only go to end_frame - 1 next_blur = None for i in range(len(frames_for_bg_subtract) - 1): if next_blur is None: this_blur = ((cv2.GaussianBlur(frames_for_bg_subtract[i], (5,5), 0) - self.mean_blurred_frame) / self.mean_normalization_value + 1) / 2 else: this_blur = next_blur next_blur = ((cv2.GaussianBlur(frames_for_bg_subtract[i+1], (5,5), 0) - self.mean_blurred_frame) / self.mean_normalization_value + 1) / 2 frame_image = np.dstack([frames_for_bg_subtract[i], this_blur * 255, np.abs(next_blur - this_blur) * 255]).astype(np.uint8, copy=False) if TEST_JPG_COMPRESSION: from PIL import Image import os Image.fromarray(frame_image).save(f"tmp{i}.jpg", quality=95) frame_image = cv2.imread(f"tmp{i}.jpg")[:, :, ::-1] # BGR to RGB os.remove(f"tmp{i}.jpg") if self.cache_bg_frames: self.extracted_frames.append(frame_image) if not self.disable_output: print("Done initializing background frame.") def _load_labels(self, fisheye_json): """Load labels from a fisheye-formatted json file into self.labels in normalized xywh format. """ js = json.load(open(fisheye_json, 'r')) labels = [] for frame in js['frames']: l = [] for fish in frame['fish']: x, y, w, h = xyxy2xywh(fish['bbox']) cx = x + w/2.0 cy = y + h/2.0 # Each row is `class x_center y_center width height` format. (Normalized) l.append([0, cx, cy, w, h]) l = np.array(l, dtype=np.float32) if len(l) == 0: l = np.zeros((0, 5), dtype=np.float32) labels.append(l) self.labels = labels self.start_frame = js['start_frame'] self.end_frame = js['end_frame'] def __len__(self): # account for optical flow - we can't do the last frame return self.end_frame - self.start_frame - 1 def _postprocess(self, frame_images, frame_labels): raise NotImplementedError def __getitem__(self, idx): """ Return a numpy array representing this batch of frames and labels according to pyARIS frame extraction logic. This class returns a full batch rather than just 1 example, assuming a OnePerBatchSampler is used. """ final_idx = min(idx+self.batch_size, len(self)) frame_labels = self.labels[idx:final_idx] if self.labels else None # see if we have already cached this from bg subtraction # assumes len(self.extracted_frames) is a multiple of self.batch_size if idx+1 < len(self.extracted_frames): return self._postprocess(self.extracted_frames[idx:final_idx], frame_labels) else: frames = self.didson.load_frames(start_frame=self.start_frame+idx, end_frame=self.start_frame + final_idx + 1) blurred_frames = frames.astype(np.float32) for i in range(frames.shape[0]): blurred_frames[i] = cv2.GaussianBlur( blurred_frames[i], (5,5), 0 ) blurred_frames -= self.mean_blurred_frame blurred_frames /= self.mean_normalization_value blurred_frames += 1 blurred_frames /= 2 frame_images = np.stack([ frames[:-1], blurred_frames[:-1] * 255, np.abs(blurred_frames[1:] - blurred_frames[:-1]) * 255 ], axis=-1).astype(np.uint8, copy=False) if TEST_JPG_COMPRESSION: from PIL import Image import os new_frame_images = [] for image in frame_images: Image.fromarray(image).save(f"tmp{idx}.jpg", quality=95) image = cv2.imread(f"tmp{idx}.jpg")[:, :, ::-1] # BGR to RGB os.remove(f"tmp{idx}.jpg") new_frame_images.append(image) frame_images = new_frame_images return self._postprocess(frame_images, frame_labels) class YOLOARISBatchedDataset(ARISBatchedDataset): """An ARIS Dataset that works with YOLOv5 inference.""" def __init__(self, aris_filepath, beam_width_dir, annotations_file, stride=64, pad=0.5, img_size=896, batch_size=32, disable_output=False, cache_bg_frames=False, num_frames_bg_subtract=1000): super().__init__(aris_filepath, beam_width_dir, annotations_file, batch_size, disable_output=disable_output, cache_bg_frames=cache_bg_frames, num_frames_bg_subtract=num_frames_bg_subtract) # compute shapes for letterbox aspect_ratio = self.ydim / self.xdim if aspect_ratio < 1: shape = [aspect_ratio, 1] elif aspect_ratio > 1: shape = [1, 1 / aspect_ratio] self.original_shape = (self.ydim, self.xdim) self.shape = np.ceil(np.array(shape) * img_size / stride + pad).astype(int) * stride @classmethod def load_image(cls, img, img_size=896): """Loads and resizes 1 image from dataset, returns img, original hw, resized hw. Modified from ScaledYOLOv4.datasets.load_image() """ h0, w0 = img.shape[:2] # orig hw r = img_size / max(h0, w0) # resize image to img_size if r != 1: # always resize down, only resize up if training with augmentation interp = cv2.INTER_AREA if r < 1 else cv2.INTER_LINEAR img = cv2.resize(img, (int(w0 * r), int(h0 * r)), interpolation=interp) return img, (h0, w0), img.shape[:2] # img, hw_original, hw_resized def _postprocess(self, frame_images, frame_labels): """ Return a batch of data in the format used by ScaledYOLOv4. That is, a list of tuples, on tuple per image in the batch: [ (img ->torch.Tensor, labels ->torch.Tensor, shapes ->tuple describing image original dimensions and scaled/padded dimensions ), ... ] """ outputs = [] frame_labels = frame_labels or [ None for _ in frame_images ] for image, x in zip(frame_images, frame_labels): img, (h0, w0), (h, w) = self.load_image(image) # Letterbox img, ratio, pad = letterbox(img, self.shape, auto=False, scaleup=False) shapes = (h0, w0), ((h / h0, w / w0), pad) # for COCO mAP rescaling img = img.transpose(2, 0, 1) # to -> C x H x W img = np.ascontiguousarray(img) # Load labels # Convert from normalized xywh to pixel xyxy format in order to add padding from letterbox labels = [] if x is not None and x.size > 0: labels = x.copy() labels[:, 1] = ratio[0] * w * (x[:, 1] - x[:, 3] / 2) + pad[0] # pad width labels[:, 2] = ratio[1] * h * (x[:, 2] - x[:, 4] / 2) + pad[1] # pad height labels[:, 3] = ratio[0] * w * (x[:, 1] + x[:, 3] / 2) + pad[0] labels[:, 4] = ratio[1] * h * (x[:, 2] + x[:, 4] / 2) + pad[1] # convert back to normalized xywh with padding nL = len(labels) # number of labels labels_out = torch.zeros((nL, 6)) if nL: labels[:, 1:5] = xyxy2xywh(labels[:, 1:5]) # convert xyxy to xywh labels[:, [2, 4]] /= img.shape[1] # normalized height 0-1 labels[:, [1, 3]] /= img.shape[2] # normalized width 0-1 labels_out[:, 1:] = torch.from_numpy(labels) outputs.append( (torch.from_numpy(img), labels_out, shapes) ) return outputs @contextmanager def torch_distributed_zero_first(local_rank: int): """ Decorator to make all processes in distributed training wait for each local_master to do something. """ if local_rank not in [-1, 0]: torch.distributed.barrier() yield if local_rank == 0: torch.distributed.barrier() class OnePerBatchSampler(torch.utils.data.Sampler): """Yields the first index of each batch, given a batch size. In other words, returns multiples of self.batch_size up to the size of the Dataset. This is a workaround for Pytorch's standard batch creation that allows us to manually select contiguous segments of an ARIS clip for each batch. """ def __init__(self, data_source, batch_size): self.data_source = data_source self.batch_size = batch_size def __iter__(self): idxs = [i*self.batch_size for i in range(len(self))] return iter(idxs) def __len__(self): return len(self.data_source) // self.batch_size def collate_fn(batch): """See YOLOv5.utils.datasets.collate_fn""" if not len(batch): print("help!") print(batch) img, label, shapes = zip(*batch) # transposed for i, l in enumerate(label): l[:, 0] = i # add target image index for build_targets() return torch.stack(img, 0), torch.cat(label, 0), shapes