import os |
import json |
import torch |
import torchvision.transforms as transforms |
import os.path |
import numpy as np |
import cv2 |
from torch.utils.data import Dataset |
import random |
import mono.utils.transform as img_transform |
import copy |
from mono.utils.comm import get_func |
import pickle |
import logging |
import multiprocessing as mp |
import ctypes |
""" |
Dataset annotations are saved in a Json file. All data, including rgb, depth, pose, and so on, captured within the same frame are saved in the same dict. |
All frames are organized in a list. In each frame, it may contains the some or all of following data format. |
# Annotations for the current central RGB/depth cameras. |
'rgb': rgb image in the current frame. |
'depth': depth map in the current frame. |
'sem': semantic mask in the current frame. |
'cam_in': camera intrinsic parameters of the current rgb camera. |
'cam_ex': camera extrinsic parameters of the current rgb camera. |
'cam_ex_path': path to the extrinsic parameters. |
'pose': pose in current frame. |
'timestamp_rgb': time stamp of current rgb image. |
# Annotations for the left hand RGB/depth cameras. |
'rgb_l': rgb image of the left hand camera in the current frame. |
'depth_l': depth map of the left hand camera in the current frame. |
'sem_l': semantic mask of the left hand camera in the current frame. |
'cam_in_l': camera intrinsic parameters of the left hand rgb camera in the current frame. |
'cam_ex_l': camera extrinsic parameters of the left hand rgb camera in the current frame. |
'cam_ex_path': path to the extrinsic parameters. |
'pose_l': pose of the left hand camera in the incurrent frame. |
'timestamp_rgb_l': time stamp of the rgb img captured by the left hand camera. |
# Annotations for the right RGB/depth cameras, which is on the left hand of the current central cameras. |
'rgb_r': rgb image of the right hand camera in the current frame. |
'depth_r': depth map of the right hand camera in the current frame. |
'sem_r': semantic mask of the right hand camera in the current frame. |
'cam_in_r': camera intrinsic parameters of the right hand rgb camera in the current frame. |
'cam_ex_r': camera extrinsic parameters of the right hand rgb camera in the current frame. |
'cam_ex_path_r': path to the extrinsic parameters. |
'pose_r': pose of the right hand camera in the incurrent frame. |
'timestamp_rgb_r': time stamp of the rgb img captured by the right hand camera. |
# Annotations for the central RGB/depth cameras in the last frame. |
'rgb_pre': rgb image of the central camera in the last frame. |
'depth_pre': depth map of the central camera in the last frame. |
'sem_pre': semantic mask of the central camera in the last frame. |
'cam_in_pre': camera intrinsic parameters of the central rgb camera in the last frame. |
'cam_ex_pre': camera extrinsic parameters of the central rgb camera in the last frame. |
'cam_ex_path_pre': path to the extrinsic parameters. |
'pose_pre': pose of the central camera in the last frame. |
'timestamp_rgb_pre': time stamp of the rgb img captured by the central camera. |
# Annotations for the central RGB/depth cameras in the next frame. |
'rgb_next': rgb image of the central camera in the next frame. |
'depth_next': depth map of the central camera in the next frame. |
'sem_next': semantic mask of the central camera in the next frame. |
'cam_in_next': camera intrinsic parameters of the central rgb camera in the next frame. |
'cam_ex_next': camera extrinsic parameters of the central rgb camera in the next frame. |
'cam_ex_path_next': path to the extrinsic parameters. |
'pose_next': pose of the central camera in the next frame. |
'timestamp_rgb_next': time stamp of the rgb img captured by the central camera. |
""" |
class BaseDataset(Dataset): |
def __init__(self, cfg, phase, **kwargs): |
super(BaseDataset, self).__init__() |
self.cfg = cfg |
self.phase = phase |
self.db_info = kwargs['db_info'] |
self.data_root = os.path.join(self.db_info['db_root'], self.db_info['data_root']) |
disp_root = self.db_info['disp_root'] if 'disp_root' in self.db_info else None |
self.disp_root = os.path.join(self.db_info['db_root'], disp_root) if disp_root is not None else None |
depth_root = self.db_info['depth_root'] if 'depth_root' in self.db_info else None |
self.depth_root = os.path.join(self.db_info['db_root'], depth_root) if depth_root is not None \ |
else self.data_root |
meta_data_root = self.db_info['meta_data_root'] if 'meta_data_root' in self.db_info else None |
self.meta_data_root = os.path.join(self.db_info['db_root'], meta_data_root) if meta_data_root is not None \ |
else None |
sem_root = self.db_info['semantic_root'] if 'semantic_root' in self.db_info else None |
self.sem_root = os.path.join(self.db_info['db_root'], sem_root) if sem_root is not None \ |
else None |
depth_mask_root = self.db_info['depth_mask_root'] if 'depth_mask_root' in self.db_info else None |
self.depth_mask_root = os.path.join(self.db_info['db_root'], depth_mask_root) if depth_mask_root is not None \ |
else None |
norm_root = self.db_info['normal_root'] if 'normal_root' in self.db_info else None |
self.norm_root = os.path.join(self.db_info['db_root'], norm_root) if norm_root is not None \ |
else None |
self.data_annos_path = os.path.join(self.db_info['db_root'], self.db_info['%s_annotations_path' % phase]) |
self.data_info = self.load_annotations() |
whole_data_size = len(self.data_info['files']) |
cfg_sample_ratio = cfg.data[phase].sample_ratio |
cfg_sample_size = int(cfg.data[phase].sample_size) |
self.sample_size = int(whole_data_size * cfg_sample_ratio) if cfg_sample_size == -1 \ |
else (cfg_sample_size if cfg_sample_size < whole_data_size else whole_data_size) |
random.seed(100) |
sample_list_of_whole_data = random.sample(list(range(whole_data_size)), self.sample_size) |
self.data_size = self.sample_size |
self.annotations = {'files': [self.data_info['files'][i] for i in sample_list_of_whole_data]} |
self.sample_list = list(range(self.data_size)) |
self.transforms_cfg = cfg.data[phase]['pipeline'] |
self.transforms_lib = 'mono.utils.transform.' |
self.img_file_type = ['.png', '.jpg', '.jpeg', '.bmp', '.tif'] |
self.np_file_type = ['.npz', '.npy'] |
self.data_basic = copy.deepcopy(kwargs) |
canonical = self.data_basic.pop('canonical_space') |
self.data_basic.update(canonical) |
self.disp_scale = 10.0 |
self.depth_range = kwargs['depth_range'] |
self.clip_depth_range = kwargs['clip_depth_range'] |
self.depth_normalize = kwargs['depth_normalize'] |
self.img_transforms = img_transform.Compose(self.build_data_transforms()) |
self.EPS = 1e-6 |
self.data_name = cfg.data_name |
self.data_type = cfg.data_type |
self.logger = logging.getLogger() |
self.logger.info(f'{self.data_name} in {self.phase} whole data size: {whole_data_size}') |
crop_size = kwargs['crop_size'] |
shared_array_base = mp.Array(ctypes.c_int32, 2) |
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) |
shared_array[0] = crop_size[0] |
shared_array[1] = crop_size[1] |
self.random_crop_size = torch.from_numpy(shared_array) |
def __name__(self): |
return self.data_name |
def __len__(self): |
return self.data_size |
def load_annotations(self): |
if not os.path.exists(self.data_annos_path): |
raise RuntimeError(f'Cannot find {self.data_annos_path} annotations.') |
with open(self.data_annos_path, 'r') as f: |
annos = json.load(f) |
return annos |
def build_data_transforms(self): |
transforms_list = [] |
for transform in self.transforms_cfg: |
args = copy.deepcopy(transform) |
args.update(self.data_basic) |
obj_name = args.pop('type') |
obj_path = self.transforms_lib + obj_name |
obj_cls = get_func(obj_path) |
obj = obj_cls(**args) |
transforms_list.append(obj) |
return transforms_list |
def load_data(self, path: str, is_rgb_img: bool=False): |
if not os.path.exists(path): |
self.logger.info(f'>>>>{path} does not exist.') |
data_type = os.path.splitext(path)[-1] |
if data_type in self.img_file_type: |
if is_rgb_img: |
data = cv2.imread(path) |
else: |
data = cv2.imread(path, -1) |
elif data_type in self.np_file_type: |
data = np.load(path) |
else: |
raise RuntimeError(f'{data_type} is not supported in current version.') |
try: |
return data.squeeze() |
except: |
temp = 1 |
raise RuntimeError(f'{path} is not successfully loaded.') |
def __getitem__(self, idx: int) -> dict: |
if self.phase == 'test': |
return self.get_data_for_test(idx) |
else: |
return self.get_data_for_trainval(idx) |
def get_data_for_trainval(self, idx: int): |
anno = self.annotations['files'][idx] |
meta_data = self.load_meta_data(anno) |
data_path = self.load_data_path(meta_data) |
data_batch = self.load_batch(meta_data, data_path) |
curr_rgb, curr_depth, curr_normal, curr_sem, curr_cam_model = data_batch['curr_rgb'], data_batch['curr_depth'], data_batch['curr_normal'], data_batch['curr_sem'], data_batch['curr_cam_model'] |
if 'curr_stereo_depth' in data_batch.keys(): |
curr_stereo_depth = data_batch['curr_stereo_depth'] |
else: |
curr_stereo_depth = self.load_stereo_depth_label(None, H=curr_rgb.shape[0], W=curr_rgb.shape[1]) |
curr_intrinsic = meta_data['cam_in'] |
transform_paras = dict(random_crop_size = self.random_crop_size) |
assert curr_rgb.shape[:2] == curr_depth.shape == curr_normal.shape[:2] == curr_sem.shape |
rgbs, depths, intrinsics, cam_models, normals, other_labels, transform_paras = self.img_transforms( |
images=[curr_rgb, ], |
labels=[curr_depth, ], |
intrinsics=[curr_intrinsic,], |
cam_models=[curr_cam_model, ], |
normals = [curr_normal, ], |
other_labels=[curr_sem, curr_stereo_depth], |
transform_paras=transform_paras) |
sem_mask = other_labels[0].int() |
depth_out = self.normalize_depth(depths[0]) |
depth_out[sem_mask==142] = -1 |
inv_depth = self.depth2invdepth(depth_out, sem_mask==142) |
filename = os.path.basename(meta_data['rgb'])[:-4] + '.jpg' |
curr_intrinsic_mat = self.intrinsics_list2mat(intrinsics[0]) |
cam_models_stacks = [ |
torch.nn.functional.interpolate(cam_models[0][None, :, :, :], size=(cam_models[0].shape[1]//i, cam_models[0].shape[2]//i), mode='bilinear', align_corners=False).squeeze() |
for i in [2, 4, 8, 16, 32] |
] |
if 'label_scale_factor' not in transform_paras.keys(): |
transform_paras['label_scale_factor'] = 1 |
stereo_depth_pre_trans = other_labels[1] * (other_labels[1] > 0.3) * (other_labels[1] < 200) |
stereo_depth = stereo_depth_pre_trans * transform_paras['label_scale_factor'] |
stereo_depth = self.normalize_depth(stereo_depth) |
pad = transform_paras['pad'] if 'pad' in transform_paras else [0,0,0,0] |
data = dict(input=rgbs[0], |
target=depth_out, |
intrinsic=curr_intrinsic_mat, |
filename=filename, |
dataset=self.data_name, |
cam_model=cam_models_stacks, |
pad=torch.tensor(pad), |
data_type=[self.data_type, ], |
sem_mask=sem_mask.int(), |
stereo_depth= stereo_depth, |
normal=normals[0], |
inv_depth=inv_depth, |
scale=transform_paras['label_scale_factor']) |
return data |
def get_data_for_test(self, idx: int): |
anno = self.annotations['files'][idx] |
meta_data = self.load_meta_data(anno) |
data_path = self.load_data_path(meta_data) |
data_batch = self.load_batch(meta_data, data_path) |
curr_rgb, curr_depth, curr_normal, curr_cam_model = data_batch['curr_rgb'], data_batch['curr_depth'], data_batch['curr_normal'], data_batch['curr_cam_model'] |
ori_curr_intrinsic = meta_data['cam_in'] |
transform_paras = dict() |
rgbs, depths, intrinsics, cam_models, _, other_labels, transform_paras = self.img_transforms( |
images=[curr_rgb,], |
labels=[curr_depth, ], |
intrinsics=[ori_curr_intrinsic, ], |
cam_models=[curr_cam_model, ], |
transform_paras=transform_paras) |
depth_out = self.clip_depth(curr_depth) * self.depth_range[1] |
inv_depth = self.depth2invdepth(depth_out, np.zeros_like(depth_out, dtype=np.bool)) |
filename = os.path.basename(meta_data['rgb'])[:-4] + '.jpg' |
curr_intrinsic_mat = self.intrinsics_list2mat(intrinsics[0]) |
ori_curr_intrinsic_mat = self.intrinsics_list2mat(ori_curr_intrinsic) |
pad = transform_paras['pad'] if 'pad' in transform_paras else [0,0,0,0] |
scale_ratio = transform_paras['label_scale_factor'] if 'label_scale_factor' in transform_paras else 1.0 |
cam_models_stacks = [ |
torch.nn.functional.interpolate(cam_models[0][None, :, :, :], size=(cam_models[0].shape[1]//i, cam_models[0].shape[2]//i), mode='bilinear', align_corners=False).squeeze() |
for i in [2, 4, 8, 16, 32] |
] |
raw_rgb = torch.from_numpy(curr_rgb) |
curr_normal = torch.from_numpy(curr_normal.transpose((2,0,1))) |
data = dict(input=rgbs[0], |
target=depth_out, |
intrinsic=curr_intrinsic_mat, |
filename=filename, |
dataset=self.data_name, |
cam_model=cam_models_stacks, |
pad=pad, |
scale=scale_ratio, |
raw_rgb=raw_rgb, |
sample_id=idx, |
data_path=meta_data['rgb'], |
inv_depth=inv_depth, |
normal=curr_normal, |
) |
return data |
def load_data_path(self, meta_data): |
curr_rgb_path = os.path.join(self.data_root, meta_data['rgb']) |
curr_depth_path = os.path.join(self.depth_root, meta_data['depth']) |
curr_sem_path = os.path.join(self.sem_root, meta_data['sem']) \ |
if self.sem_root is not None and ('sem' in meta_data) and (meta_data['sem'] is not None) \ |
else None |
if ('normal' in meta_data) and (meta_data['normal'] is not None) and (self.norm_root is not None): |
if isinstance(meta_data['normal'], dict): |
curr_norm_path = {} |
for k,v in meta_data['normal'].items(): |
curr_norm_path[k] = os.path.join(self.norm_root, v) |
else: |
curr_norm_path = os.path.join(self.norm_root, meta_data['normal']) |
else: |
curr_norm_path = None |
curr_depth_mask_path = os.path.join(self.depth_mask_root, meta_data['depth_mask']) \ |
if self.depth_mask_root is not None and ('depth_mask' in meta_data) and (meta_data['depth_mask'] is not None) \ |
else None |
if ('disp' in meta_data) and (meta_data['disp'] is not None) and (self.disp_root is not None): |
if isinstance(meta_data['disp'], dict): |
curr_disp_path = {} |
for k,v in meta_data['disp'].items(): |
curr_disp_path[k] = os.path.join(self.disp_root, v) |
else: |
curr_disp_path = os.path.join(self.disp_root, meta_data['disp']) |
else: |
curr_disp_path = None |
data_path=dict( |
rgb_path=curr_rgb_path, |
depth_path=curr_depth_path, |
sem_path=curr_sem_path, |
normal_path=curr_norm_path, |
disp_path=curr_disp_path, |
depth_mask_path=curr_depth_mask_path, |
) |
return data_path |
def load_batch(self, meta_data, data_path): |
curr_intrinsic = meta_data['cam_in'] |
curr_rgb, curr_depth = self.load_rgb_depth(data_path['rgb_path'], data_path['depth_path']) |
curr_sem = self.load_sem_label(data_path['sem_path'], curr_depth) |
curr_cam_model = self.create_cam_model(curr_rgb.shape[0], curr_rgb.shape[1], curr_intrinsic) |
curr_normal = self.load_norm_label(data_path['normal_path'], H=curr_rgb.shape[0], W=curr_rgb.shape[1]) |
depth_mask = self.load_depth_valid_mask(data_path['depth_mask_path']) |
curr_depth[~depth_mask] = -1 |
curr_stereo_depth = self.load_stereo_depth_label(data_path['disp_path'], H=curr_rgb.shape[0], W=curr_rgb.shape[1]) |
data_batch = dict( |
curr_rgb = curr_rgb, |
curr_depth = curr_depth, |
curr_sem = curr_sem, |
curr_normal = curr_normal, |
curr_cam_model=curr_cam_model, |
curr_stereo_depth=curr_stereo_depth, |
) |
return data_batch |
def clip_depth(self, depth: np.array) -> np.array: |
depth[(depth>self.clip_depth_range[1]) | (depth<self.clip_depth_range[0])] = -1 |
depth /= self.depth_range[1] |
depth[depth<self.EPS] = -1 |
return depth |
def normalize_depth(self, depth: np.array) -> np.array: |
depth /= self.depth_range[1] |
depth[depth<self.EPS] = -1 |
return depth |
def process_depth(self, depth: np.array, rgb:np.array=None): |
return depth |
def create_cam_model(self, H : int, W : int, intrinsics : list) -> np.array: |
""" |
Encode the camera model (focal length and principle point) to a 4-channel map. |
""" |
fx, fy, u0, v0 = intrinsics |
f = (fx + fy) / 2.0 |
x_row = np.arange(0, W).astype(np.float32) |
x_row_center_norm = (x_row - u0) / W |
x_center = np.tile(x_row_center_norm, (H, 1)) |
y_col = np.arange(0, H).astype(np.float32) |
y_col_center_norm = (y_col - v0) / H |
y_center = np.tile(y_col_center_norm, (W, 1)).T |
fov_x = np.arctan(x_center / (f / W)) |
fov_y = np.arctan(y_center/ (f / H)) |
cam_model = np.stack([x_center, y_center, fov_x, fov_y], axis=2) |
return cam_model |
def check_data(self, data_dict : dict): |
for k, v in data_dict.items(): |
if v is None: |
self.logger.info(f'{self.data_name}, {k} cannot be read!') |
def intrinsics_list2mat(self, intrinsics: torch.tensor) -> torch.tensor: |
""" |
Create camera intrinsic matrix. |
Args: |
intrinsics (torch.tensor, [4,]): list of camera intrinsic parameters. |
returns: |
intrinsics_mat (torch.tensor, [3x3]): camera intrinsic parameters matrix. |
""" |
intrinsics_mat = torch.zeros((3,3)).float() |
intrinsics_mat[0, 0] = intrinsics[0] |
intrinsics_mat[1, 1] = intrinsics[1] |
intrinsics_mat[0, 2] = intrinsics[2] |
intrinsics_mat[1, 2] = intrinsics[3] |
intrinsics_mat[2, 2] = 1.0 |
return intrinsics_mat |
def load_meta_data(self, anno: dict) -> dict: |
""" |
Load meta data information. |
""" |
if self.meta_data_root is not None and ('meta_data' in anno or 'meta' in anno): |
meta_data_path = os.path.join(self.meta_data_root, anno['meta_data']) if 'meta_data' in anno else os.path.join(self.meta_data_root, anno['meta']) |
with open(meta_data_path, 'rb') as f: |
meta_data = pickle.load(f) |
meta_data.update(anno) |
else: |
meta_data = anno |
return meta_data |
def load_rgb_depth(self, rgb_path: str, depth_path: str): |
""" |
Load the rgb and depth map with the paths. |
""" |
rgb = self.load_data(rgb_path, is_rgb_img=True) |
if rgb is None: |
self.logger.info(f'>>>>{rgb_path} has errors.') |
depth = self.load_data(depth_path) |
if depth is None: |
self.logger.info(f'{depth_path} has errors.') |
depth = depth.astype(np.float) |
depth = self.process_depth(depth, rgb) |
return rgb, depth |
def load_sem_label(self, sem_path, depth=None, sky_id=142) -> np.array: |
H, W = depth.shape |
sem_label = cv2.imread(sem_path, 0) if sem_path is not None \ |
else np.ones((H, W), dtype=np.int) * -1 |
if sem_label is None: |
sem_label = np.ones((H, W), dtype=np.int) * -1 |
sem_label = sem_label.astype(np.int) |
sem_label[sem_label==255] = -1 |
mask_depth_valid = depth > 1e-8 |
invalid_sky_region = (sem_label==142) & (mask_depth_valid) |
if self.data_type in ['lidar', 'sfm', 'denselidar', 'denselidar_nometric']: |
sem_label[invalid_sky_region] = -1 |
return sem_label |
def load_depth_valid_mask(self, depth_mask_path, depth=None) -> np.array: |
if depth_mask_path is None: |
return np.ones_like(depth, dtype=np.bool) |
data_type = os.path.splitext(depth_mask_path)[-1] |
if data_type in self.img_file_type: |
data = cv2.imread(depth_mask_path, -1) |
elif data_type in self.np_file_type: |
data = np.load(depth_mask_path) |
else: |
raise RuntimeError(f'{data_type} is not supported in current version.') |
data = data.astype(np.bool) |
return data |
def load_norm_label(self, norm_path, H, W): |
norm_gt = np.zeros((H, W, 3)).astype(np.float32) |
return norm_gt |
def load_stereo_depth_label(self, disp_path, H, W): |
stereo_depth_gt = np.zeros((H, W, 1)).astype(np.float32) |
return stereo_depth_gt |
def depth2invdepth(self, depth, sky_mask): |
inv_depth = 1.0 / depth * self.disp_scale |
inv_depth[depth<1e-6] = -1.0 |
inv_depth[inv_depth < 0] = -1.0 |
inv_depth[sky_mask] = 0 |
return inv_depth |
def set_random_crop_size(self, random_crop_size): |
self.random_crop_size[0] = random_crop_size[0] |
self.random_crop_size[1] = random_crop_size[1] |