import os
import json
import torch
import torchvision.transforms as transforms
import os.path
import numpy as np
import cv2
from import Dataset
import random
import mono.utils.transform as img_transform
import copy
from mono.utils.comm import get_func
import pickle
import logging
import multiprocessing as mp
import ctypes
Dataset annotations are saved in a Json file. All data, including rgb, depth, pose, and so on, captured within the same frame are saved in the same dict.
All frames are organized in a list. In each frame, it may contains the some or all of following data format.
# Annotations for the current central RGB/depth cameras.
'rgb': rgb image in the current frame.
'depth': depth map in the current frame.
'sem': semantic mask in the current frame.
'cam_in': camera intrinsic parameters of the current rgb camera.
'cam_ex': camera extrinsic parameters of the current rgb camera.
'cam_ex_path': path to the extrinsic parameters.
'pose': pose in current frame.
'timestamp_rgb': time stamp of current rgb image.
# Annotations for the left hand RGB/depth cameras.
'rgb_l': rgb image of the left hand camera in the current frame.
'depth_l': depth map of the left hand camera in the current frame.
'sem_l': semantic mask of the left hand camera in the current frame.
'cam_in_l': camera intrinsic parameters of the left hand rgb camera in the current frame.
'cam_ex_l': camera extrinsic parameters of the left hand rgb camera in the current frame.
'cam_ex_path': path to the extrinsic parameters.
'pose_l': pose of the left hand camera in the incurrent frame.
'timestamp_rgb_l': time stamp of the rgb img captured by the left hand camera.
# Annotations for the right RGB/depth cameras, which is on the left hand of the current central cameras.
'rgb_r': rgb image of the right hand camera in the current frame.
'depth_r': depth map of the right hand camera in the current frame.
'sem_r': semantic mask of the right hand camera in the current frame.
'cam_in_r': camera intrinsic parameters of the right hand rgb camera in the current frame.
'cam_ex_r': camera extrinsic parameters of the right hand rgb camera in the current frame.
'cam_ex_path_r': path to the extrinsic parameters.
'pose_r': pose of the right hand camera in the incurrent frame.
'timestamp_rgb_r': time stamp of the rgb img captured by the right hand camera.
# Annotations for the central RGB/depth cameras in the last frame.
'rgb_pre': rgb image of the central camera in the last frame.
'depth_pre': depth map of the central camera in the last frame.
'sem_pre': semantic mask of the central camera in the last frame.
'cam_in_pre': camera intrinsic parameters of the central rgb camera in the last frame.
'cam_ex_pre': camera extrinsic parameters of the central rgb camera in the last frame.
'cam_ex_path_pre': path to the extrinsic parameters.
'pose_pre': pose of the central camera in the last frame.
'timestamp_rgb_pre': time stamp of the rgb img captured by the central camera.
# Annotations for the central RGB/depth cameras in the next frame.
'rgb_next': rgb image of the central camera in the next frame.
'depth_next': depth map of the central camera in the next frame.
'sem_next': semantic mask of the central camera in the next frame.
'cam_in_next': camera intrinsic parameters of the central rgb camera in the next frame.
'cam_ex_next': camera extrinsic parameters of the central rgb camera in the next frame.
'cam_ex_path_next': path to the extrinsic parameters.
'pose_next': pose of the central camera in the next frame.
'timestamp_rgb_next': time stamp of the rgb img captured by the central camera.
class BaseDataset(Dataset):
def __init__(self, cfg, phase, **kwargs):
super(BaseDataset, self).__init__()
self.cfg = cfg
self.phase = phase
self.db_info = kwargs['db_info']
# root dir for data
self.data_root = os.path.join(self.db_info['db_root'], self.db_info['data_root'])
# depth/disp data root
disp_root = self.db_info['disp_root'] if 'disp_root' in self.db_info else None
self.disp_root = os.path.join(self.db_info['db_root'], disp_root) if disp_root is not None else None
depth_root = self.db_info['depth_root'] if 'depth_root' in self.db_info else None
self.depth_root = os.path.join(self.db_info['db_root'], depth_root) if depth_root is not None \
else self.data_root
# meta data root
meta_data_root = self.db_info['meta_data_root'] if 'meta_data_root' in self.db_info else None
self.meta_data_root = os.path.join(self.db_info['db_root'], meta_data_root) if meta_data_root is not None \
else None
# semantic segmentation labels root
sem_root = self.db_info['semantic_root'] if 'semantic_root' in self.db_info else None
self.sem_root = os.path.join(self.db_info['db_root'], sem_root) if sem_root is not None \
else None
# depth valid mask labels root
depth_mask_root = self.db_info['depth_mask_root'] if 'depth_mask_root' in self.db_info else None
self.depth_mask_root = os.path.join(self.db_info['db_root'], depth_mask_root) if depth_mask_root is not None \
else None
# surface normal labels root
norm_root = self.db_info['normal_root'] if 'normal_root' in self.db_info else None
self.norm_root = os.path.join(self.db_info['db_root'], norm_root) if norm_root is not None \
else None
# data annotations path
self.data_annos_path = os.path.join(self.db_info['db_root'], self.db_info['%s_annotations_path' % phase])
# load annotations
self.data_info = self.load_annotations()
whole_data_size = len(self.data_info['files'])
# sample a subset for training/validation/testing
# such method is deprecated, each training may get different sample list
cfg_sample_ratio =[phase].sample_ratio
cfg_sample_size = int([phase].sample_size)
self.sample_size = int(whole_data_size * cfg_sample_ratio) if cfg_sample_size == -1 \
else (cfg_sample_size if cfg_sample_size < whole_data_size else whole_data_size)
random.seed(100) # set the random seed
sample_list_of_whole_data = random.sample(list(range(whole_data_size)), self.sample_size)
self.data_size = self.sample_size
self.annotations = {'files': [self.data_info['files'][i] for i in sample_list_of_whole_data]}
self.sample_list = list(range(self.data_size))
# config transforms for the input and label
self.transforms_cfg =[phase]['pipeline']
self.transforms_lib = 'mono.utils.transform.'
self.img_file_type = ['.png', '.jpg', '.jpeg', '.bmp', '.tif']
self.np_file_type = ['.npz', '.npy']
# update canonical sparce information
self.data_basic = copy.deepcopy(kwargs)
canonical = self.data_basic.pop('canonical_space')
self.disp_scale = 10.0
self.depth_range = kwargs['depth_range'] # predefined depth range for the network
self.clip_depth_range = kwargs['clip_depth_range'] # predefined depth range for data processing
self.depth_normalize = kwargs['depth_normalize']
self.img_transforms = img_transform.Compose(self.build_data_transforms())
self.EPS = 1e-6
# dataset info
self.data_name = cfg.data_name
self.data_type = cfg.data_type # there are mainly four types, i.e. ['rel', 'sfm', 'stereo', 'lidar']
self.logger = logging.getLogger()'{self.data_name} in {self.phase} whole data size: {whole_data_size}')
# random crop size for training
crop_size = kwargs['crop_size']
shared_array_base = mp.Array(ctypes.c_int32, 2)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array[0] = crop_size[0]
shared_array[1] = crop_size[1]
self.random_crop_size = torch.from_numpy(shared_array)
def __name__(self):
return self.data_name
def __len__(self):
return self.data_size
def load_annotations(self):
if not os.path.exists(self.data_annos_path):
raise RuntimeError(f'Cannot find {self.data_annos_path} annotations.')
with open(self.data_annos_path, 'r') as f:
annos = json.load(f)
return annos
def build_data_transforms(self):
transforms_list = []
for transform in self.transforms_cfg:
args = copy.deepcopy(transform)
# insert the canonical space configs
obj_name = args.pop('type')
obj_path = self.transforms_lib + obj_name
obj_cls = get_func(obj_path)
obj = obj_cls(**args)
return transforms_list
def load_data(self, path: str, is_rgb_img: bool=False):
if not os.path.exists(path):'>>>>{path} does not exist.')
# raise RuntimeError(f'{path} does not exist.')
data_type = os.path.splitext(path)[-1]
if data_type in self.img_file_type:
if is_rgb_img:
data = cv2.imread(path)
data = cv2.imread(path, -1)
elif data_type in self.np_file_type:
data = np.load(path)
raise RuntimeError(f'{data_type} is not supported in current version.')
return data.squeeze()
raise RuntimeError(f'{path} is not successfully loaded.')
def __getitem__(self, idx: int) -> dict:
if self.phase == 'test':
return self.get_data_for_test(idx)
return self.get_data_for_trainval(idx)
def get_data_for_trainval(self, idx: int):
anno = self.annotations['files'][idx]
meta_data = self.load_meta_data(anno)
data_path = self.load_data_path(meta_data)
data_batch = self.load_batch(meta_data, data_path)
curr_rgb, curr_depth, curr_normal, curr_sem, curr_cam_model = data_batch['curr_rgb'], data_batch['curr_depth'], data_batch['curr_normal'], data_batch['curr_sem'], data_batch['curr_cam_model']
# A patch for stereo depth dataloader (no need to modify specific datasets)
if 'curr_stereo_depth' in data_batch.keys():
curr_stereo_depth = data_batch['curr_stereo_depth']
curr_stereo_depth = self.load_stereo_depth_label(None, H=curr_rgb.shape[0], W=curr_rgb.shape[1])
curr_intrinsic = meta_data['cam_in']
# data augmentation
transform_paras = dict(random_crop_size = self.random_crop_size) # dict()
assert curr_rgb.shape[:2] == curr_depth.shape == curr_normal.shape[:2] == curr_sem.shape
rgbs, depths, intrinsics, cam_models, normals, other_labels, transform_paras = self.img_transforms(
images=[curr_rgb, ],
labels=[curr_depth, ],
cam_models=[curr_cam_model, ],
normals = [curr_normal, ],
other_labels=[curr_sem, curr_stereo_depth],
# process sky masks
sem_mask = other_labels[0].int()
# clip depth map
depth_out = self.normalize_depth(depths[0])
# set the depth of sky region to the invalid
depth_out[sem_mask==142] = -1 # self.depth_normalize[1] - 1e-6
# get inverse depth
inv_depth = self.depth2invdepth(depth_out, sem_mask==142)
filename = os.path.basename(meta_data['rgb'])[:-4] + '.jpg'
curr_intrinsic_mat = self.intrinsics_list2mat(intrinsics[0])
cam_models_stacks = [
torch.nn.functional.interpolate(cam_models[0][None, :, :, :], size=(cam_models[0].shape[1]//i, cam_models[0].shape[2]//i), mode='bilinear', align_corners=False).squeeze()
for i in [2, 4, 8, 16, 32]
# stereo_depth
if 'label_scale_factor' not in transform_paras.keys():
transform_paras['label_scale_factor'] = 1
stereo_depth_pre_trans = other_labels[1] * (other_labels[1] > 0.3) * (other_labels[1] < 200)
stereo_depth = stereo_depth_pre_trans * transform_paras['label_scale_factor']
stereo_depth = self.normalize_depth(stereo_depth)
pad = transform_paras['pad'] if 'pad' in transform_paras else [0,0,0,0]
data = dict(input=rgbs[0],
data_type=[self.data_type, ],,
stereo_depth= stereo_depth,
return data
def get_data_for_test(self, idx: int):
anno = self.annotations['files'][idx]
meta_data = self.load_meta_data(anno)
data_path = self.load_data_path(meta_data)
data_batch = self.load_batch(meta_data, data_path)
# load data
curr_rgb, curr_depth, curr_normal, curr_cam_model = data_batch['curr_rgb'], data_batch['curr_depth'], data_batch['curr_normal'], data_batch['curr_cam_model']
ori_curr_intrinsic = meta_data['cam_in']
# get crop size
transform_paras = dict()
rgbs, depths, intrinsics, cam_models, _, other_labels, transform_paras = self.img_transforms(
images=[curr_rgb,], #+ tmpl_rgbs,
labels=[curr_depth, ],
intrinsics=[ori_curr_intrinsic, ], # * (len(tmpl_rgbs) + 1),
cam_models=[curr_cam_model, ],
# depth in original size and orignial metric***
depth_out = self.clip_depth(curr_depth) * self.depth_range[1] # self.clip_depth(depths[0]) #
inv_depth = self.depth2invdepth(depth_out, np.zeros_like(depth_out, dtype=np.bool))
filename = os.path.basename(meta_data['rgb'])[:-4] + '.jpg'
curr_intrinsic_mat = self.intrinsics_list2mat(intrinsics[0])
ori_curr_intrinsic_mat = self.intrinsics_list2mat(ori_curr_intrinsic)
pad = transform_paras['pad'] if 'pad' in transform_paras else [0,0,0,0]
scale_ratio = transform_paras['label_scale_factor'] if 'label_scale_factor' in transform_paras else 1.0
cam_models_stacks = [
torch.nn.functional.interpolate(cam_models[0][None, :, :, :], size=(cam_models[0].shape[1]//i, cam_models[0].shape[2]//i), mode='bilinear', align_corners=False).squeeze()
for i in [2, 4, 8, 16, 32]
raw_rgb = torch.from_numpy(curr_rgb)
curr_normal = torch.from_numpy(curr_normal.transpose((2,0,1)))
data = dict(input=rgbs[0],
return data
def load_data_path(self, meta_data):
curr_rgb_path = os.path.join(self.data_root, meta_data['rgb'])
curr_depth_path = os.path.join(self.depth_root, meta_data['depth'])
curr_sem_path = os.path.join(self.sem_root, meta_data['sem']) \
if self.sem_root is not None and ('sem' in meta_data) and (meta_data['sem'] is not None) \
else None
# matterport3d separates xyz into three images
if ('normal' in meta_data) and (meta_data['normal'] is not None) and (self.norm_root is not None):
if isinstance(meta_data['normal'], dict):
curr_norm_path = {}
for k,v in meta_data['normal'].items():
curr_norm_path[k] = os.path.join(self.norm_root, v)
curr_norm_path = os.path.join(self.norm_root, meta_data['normal'])
curr_norm_path = None
curr_depth_mask_path = os.path.join(self.depth_mask_root, meta_data['depth_mask']) \
if self.depth_mask_root is not None and ('depth_mask' in meta_data) and (meta_data['depth_mask'] is not None) \
else None
if ('disp' in meta_data) and (meta_data['disp'] is not None) and (self.disp_root is not None):
if isinstance(meta_data['disp'], dict):
curr_disp_path = {}
for k,v in meta_data['disp'].items():
curr_disp_path[k] = os.path.join(self.disp_root, v)
curr_disp_path = os.path.join(self.disp_root, meta_data['disp'])
curr_disp_path = None
return data_path
def load_batch(self, meta_data, data_path):
curr_intrinsic = meta_data['cam_in']
# load rgb/depth
curr_rgb, curr_depth = self.load_rgb_depth(data_path['rgb_path'], data_path['depth_path'])
# get semantic labels
curr_sem = self.load_sem_label(data_path['sem_path'], curr_depth)
# create camera model
curr_cam_model = self.create_cam_model(curr_rgb.shape[0], curr_rgb.shape[1], curr_intrinsic)
# get normal labels
curr_normal = self.load_norm_label(data_path['normal_path'], H=curr_rgb.shape[0], W=curr_rgb.shape[1])
# get depth mask
depth_mask = self.load_depth_valid_mask(data_path['depth_mask_path'])
curr_depth[~depth_mask] = -1
# get stereo depth
curr_stereo_depth = self.load_stereo_depth_label(data_path['disp_path'], H=curr_rgb.shape[0], W=curr_rgb.shape[1])
data_batch = dict(
curr_rgb = curr_rgb,
curr_depth = curr_depth,
curr_sem = curr_sem,
curr_normal = curr_normal,
return data_batch
def clip_depth(self, depth: np.array) -> np.array:
depth[(depth>self.clip_depth_range[1]) | (depth<self.clip_depth_range[0])] = -1
depth /= self.depth_range[1]
depth[depth<self.EPS] = -1
return depth
def normalize_depth(self, depth: np.array) -> np.array:
depth /= self.depth_range[1]
depth[depth<self.EPS] = -1
return depth
def process_depth(self, depth: np.array, rgb:np.array=None):
return depth
def create_cam_model(self, H : int, W : int, intrinsics : list) -> np.array:
Encode the camera model (focal length and principle point) to a 4-channel map.
fx, fy, u0, v0 = intrinsics
f = (fx + fy) / 2.0
# principle point location
x_row = np.arange(0, W).astype(np.float32)
x_row_center_norm = (x_row - u0) / W
x_center = np.tile(x_row_center_norm, (H, 1)) # [H, W]
y_col = np.arange(0, H).astype(np.float32)
y_col_center_norm = (y_col - v0) / H
y_center = np.tile(y_col_center_norm, (W, 1)).T
# FoV
fov_x = np.arctan(x_center / (f / W))
fov_y = np.arctan(y_center/ (f / H))
cam_model = np.stack([x_center, y_center, fov_x, fov_y], axis=2)
return cam_model
def check_data(self, data_dict : dict):
for k, v in data_dict.items():
if v is None:
# print(f'{self.data_name}, {k} cannot be read!')'{self.data_name}, {k} cannot be read!')
def intrinsics_list2mat(self, intrinsics: torch.tensor) -> torch.tensor:
Create camera intrinsic matrix.
intrinsics (torch.tensor, [4,]): list of camera intrinsic parameters.
intrinsics_mat (torch.tensor, [3x3]): camera intrinsic parameters matrix.
intrinsics_mat = torch.zeros((3,3)).float()
intrinsics_mat[0, 0] = intrinsics[0]
intrinsics_mat[1, 1] = intrinsics[1]
intrinsics_mat[0, 2] = intrinsics[2]
intrinsics_mat[1, 2] = intrinsics[3]
intrinsics_mat[2, 2] = 1.0
return intrinsics_mat
def load_meta_data(self, anno: dict) -> dict:
Load meta data information.
if self.meta_data_root is not None and ('meta_data' in anno or 'meta' in anno):
meta_data_path = os.path.join(self.meta_data_root, anno['meta_data']) if 'meta_data' in anno else os.path.join(self.meta_data_root, anno['meta'])
with open(meta_data_path, 'rb') as f:
meta_data = pickle.load(f)
meta_data = anno
return meta_data
def load_rgb_depth(self, rgb_path: str, depth_path: str):
Load the rgb and depth map with the paths.
rgb = self.load_data(rgb_path, is_rgb_img=True)
if rgb is None:'>>>>{rgb_path} has errors.')
depth = self.load_data(depth_path)
if depth is None:'{depth_path} has errors.')
depth = depth.astype(np.float)
depth = self.process_depth(depth, rgb)
return rgb, depth
def load_sem_label(self, sem_path, depth=None, sky_id=142) -> np.array:
H, W = depth.shape
sem_label = cv2.imread(sem_path, 0) if sem_path is not None \
else np.ones((H, W), * -1
if sem_label is None:
sem_label = np.ones((H, W), * -1
# set dtype to int before
sem_label = sem_label.astype(
sem_label[sem_label==255] = -1
# mask invalid sky region
mask_depth_valid = depth > 1e-8
invalid_sky_region = (sem_label==142) & (mask_depth_valid)
if self.data_type in ['lidar', 'sfm', 'denselidar', 'denselidar_nometric']:
sem_label[invalid_sky_region] = -1
return sem_label
def load_depth_valid_mask(self, depth_mask_path, depth=None) -> np.array:
if depth_mask_path is None:
return np.ones_like(depth, dtype=np.bool)
data_type = os.path.splitext(depth_mask_path)[-1]
if data_type in self.img_file_type:
data = cv2.imread(depth_mask_path, -1)
elif data_type in self.np_file_type:
data = np.load(depth_mask_path)
raise RuntimeError(f'{data_type} is not supported in current version.')
data = data.astype(np.bool)
return data
def load_norm_label(self, norm_path, H, W):
norm_gt = np.zeros((H, W, 3)).astype(np.float32)
return norm_gt
def load_stereo_depth_label(self, disp_path, H, W):
stereo_depth_gt = np.zeros((H, W, 1)).astype(np.float32)
return stereo_depth_gt
def depth2invdepth(self, depth, sky_mask):
inv_depth = 1.0 / depth * self.disp_scale
inv_depth[depth<1e-6] = -1.0
inv_depth[inv_depth < 0] = -1.0
inv_depth[sky_mask] = 0
return inv_depth
def set_random_crop_size(self, random_crop_size):
self.random_crop_size[0] = random_crop_size[0]
self.random_crop_size[1] = random_crop_size[1]