|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
import mmcv |
|
from mmdet.datasets.builder import PIPELINES |
|
import torch |
|
from PIL import Image |
|
from math import factorial |
|
import cv2 |
|
import random |
|
import copy |
|
from transformers import AutoTokenizer |
|
import json |
|
import re |
|
import os |
|
from nuscenes.utils.geometry_utils import view_points |
|
from typing import List, Tuple, Union |
|
from shapely.geometry import MultiPoint, Polygon, LineString, Point |
|
from shapely.geometry import box as canvas_box |
|
from ..utils.data_utils import preprocess |
|
from ..utils.constants import DEFAULT_IMAGE_TOKEN |
|
import math |
|
import pickle |
|
|
|
def post_process_coords(corner_coords, imsize=(1600, 900)): |
|
polygon_from_2d_box = MultiPoint(corner_coords).convex_hull |
|
img_canvas = canvas_box(0, 0, imsize[0], imsize[1]) |
|
|
|
if polygon_from_2d_box.intersects(img_canvas): |
|
img_intersection = polygon_from_2d_box.intersection(img_canvas) |
|
|
|
if isinstance(img_intersection, Polygon): |
|
intersection_coords = np.array([coord for coord in img_intersection.exterior.coords]) |
|
|
|
|
|
min_x = min(intersection_coords[:, 0]) |
|
min_y = min(intersection_coords[:, 1]) |
|
max_x = max(intersection_coords[:, 0]) |
|
max_y = max(intersection_coords[:, 1]) |
|
|
|
return min_x, min_y, max_x, max_y |
|
else: |
|
return None |
|
else: |
|
return None |
|
|
|
def analyze_position(x, y, angle_deg): |
|
direction = '' |
|
if x > 0: |
|
direction += 'front' |
|
elif x < 0: |
|
direction += 'back' |
|
|
|
if y > 2.5: |
|
direction += ' left' |
|
elif y < -2.5: |
|
direction += ' right' |
|
|
|
|
|
if abs(angle_deg) < 45: |
|
direction += ", same direction as you, " |
|
elif abs(abs(angle_deg) - 180) < 45: |
|
direction += ", opposite direction from you, " |
|
elif abs(angle_deg - 90) < 45: |
|
direction += ", heading from right to left, " |
|
elif abs(angle_deg + 90) < 45: |
|
direction += ", heading from left to right, " |
|
|
|
return direction.strip() |
|
|
|
|
|
@PIPELINES.register_module() |
|
class ResizeMultiview3D: |
|
"""Resize images & bbox & mask. |
|
This transform resizes the input image to some scale. Bboxes and masks are |
|
then resized with the same scale factor. If the input dict contains the key |
|
"scale", then the scale in the input dict is used, otherwise the specified |
|
scale in the init method is used. If the input dict contains the key |
|
"scale_factor" (if MultiScaleFlipAug does not give img_scale but |
|
scale_factor), the actual scale will be computed by image shape and |
|
scale_factor. |
|
`img_scale` can either be a tuple (single-scale) or a list of tuple |
|
(multi-scale). There are 3 multiscale modes: |
|
- ``ratio_range is not None``: randomly sample a ratio from the ratio \ |
|
range and multiply it with the image scale. |
|
- ``ratio_range is None`` and ``multiscale_mode == "range"``: randomly \ |
|
sample a scale from the multiscale range. |
|
- ``ratio_range is None`` and ``multiscale_mode == "value"``: randomly \ |
|
sample a scale from multiple scales. |
|
Args: |
|
img_scale (tuple or list[tuple]): Images scales for resizing. |
|
multiscale_mode (str): Either "range" or "value". |
|
ratio_range (tuple[float]): (min_ratio, max_ratio) |
|
keep_ratio (bool): Whether to keep the aspect ratio when resizing the |
|
image. |
|
bbox_clip_border (bool, optional): Whether to clip the objects outside |
|
the border of the image. In some dataset like MOT17, the gt bboxes |
|
are allowed to cross the border of images. Therefore, we don't |
|
need to clip the gt bboxes in these cases. Defaults to True. |
|
backend (str): Image resize backend, choices are 'cv2' and 'pillow'. |
|
These two backends generates slightly different results. Defaults |
|
to 'cv2'. |
|
override (bool, optional): Whether to override `scale` and |
|
`scale_factor` so as to call resize twice. Default False. If True, |
|
after the first resizing, the existed `scale` and `scale_factor` |
|
will be ignored so the second resizing can be allowed. |
|
This option is a work-around for multiple times of resize in DETR. |
|
Defaults to False. |
|
""" |
|
|
|
def __init__(self, |
|
img_scale=None, |
|
multiscale_mode='range', |
|
ratio_range=None, |
|
keep_ratio=True, |
|
bbox_clip_border=True, |
|
backend='cv2', |
|
override=False): |
|
if img_scale is None: |
|
self.img_scale = None |
|
else: |
|
if isinstance(img_scale, list): |
|
self.img_scale = img_scale |
|
else: |
|
self.img_scale = [img_scale] |
|
assert mmcv.is_list_of(self.img_scale, tuple) |
|
|
|
if ratio_range is not None: |
|
|
|
assert len(self.img_scale) == 1 |
|
else: |
|
|
|
assert multiscale_mode in ['value', 'range'] |
|
|
|
self.backend = backend |
|
self.multiscale_mode = multiscale_mode |
|
self.ratio_range = ratio_range |
|
self.keep_ratio = keep_ratio |
|
|
|
self.override = override |
|
self.bbox_clip_border = bbox_clip_border |
|
|
|
@staticmethod |
|
def random_select(img_scales): |
|
"""Randomly select an img_scale from given candidates. |
|
Args: |
|
img_scales (list[tuple]): Images scales for selection. |
|
Returns: |
|
(tuple, int): Returns a tuple ``(img_scale, scale_dix)``, \ |
|
where ``img_scale`` is the selected image scale and \ |
|
``scale_idx`` is the selected index in the given candidates. |
|
""" |
|
|
|
assert mmcv.is_list_of(img_scales, tuple) |
|
scale_idx = np.random.randint(len(img_scales)) |
|
img_scale = img_scales[scale_idx] |
|
return img_scale, scale_idx |
|
|
|
@staticmethod |
|
def random_sample(img_scales): |
|
"""Randomly sample an img_scale when ``multiscale_mode=='range'``. |
|
Args: |
|
img_scales (list[tuple]): Images scale range for sampling. |
|
There must be two tuples in img_scales, which specify the lower |
|
and upper bound of image scales. |
|
Returns: |
|
(tuple, None): Returns a tuple ``(img_scale, None)``, where \ |
|
``img_scale`` is sampled scale and None is just a placeholder \ |
|
to be consistent with :func:`random_select`. |
|
""" |
|
|
|
assert mmcv.is_list_of(img_scales, tuple) and len(img_scales) == 2 |
|
img_scale_long = [max(s) for s in img_scales] |
|
img_scale_short = [min(s) for s in img_scales] |
|
long_edge = np.random.randint( |
|
min(img_scale_long), |
|
max(img_scale_long) + 1) |
|
short_edge = np.random.randint( |
|
min(img_scale_short), |
|
max(img_scale_short) + 1) |
|
img_scale = (long_edge, short_edge) |
|
return img_scale, None |
|
|
|
@staticmethod |
|
def random_sample_ratio(img_scale, ratio_range): |
|
"""Randomly sample an img_scale when ``ratio_range`` is specified. |
|
A ratio will be randomly sampled from the range specified by |
|
``ratio_range``. Then it would be multiplied with ``img_scale`` to |
|
generate sampled scale. |
|
Args: |
|
img_scale (tuple): Images scale base to multiply with ratio. |
|
ratio_range (tuple[float]): The minimum and maximum ratio to scale |
|
the ``img_scale``. |
|
Returns: |
|
(tuple, None): Returns a tuple ``(scale, None)``, where \ |
|
``scale`` is sampled ratio multiplied with ``img_scale`` and \ |
|
None is just a placeholder to be consistent with \ |
|
:func:`random_select`. |
|
""" |
|
|
|
assert isinstance(img_scale, tuple) and len(img_scale) == 2 |
|
min_ratio, max_ratio = ratio_range |
|
assert min_ratio <= max_ratio |
|
ratio = np.random.random_sample() * (max_ratio - min_ratio) + min_ratio |
|
scale = int(img_scale[0] * ratio), int(img_scale[1] * ratio) |
|
return scale, None |
|
|
|
def _random_scale(self, results): |
|
"""Randomly sample an img_scale according to ``ratio_range`` and |
|
``multiscale_mode``. |
|
If ``ratio_range`` is specified, a ratio will be sampled and be |
|
multiplied with ``img_scale``. |
|
If multiple scales are specified by ``img_scale``, a scale will be |
|
sampled according to ``multiscale_mode``. |
|
Otherwise, single scale will be used. |
|
Args: |
|
results (dict): Result dict from :obj:`dataset`. |
|
Returns: |
|
dict: Two new keys 'scale` and 'scale_idx` are added into \ |
|
``results``, which would be used by subsequent pipelines. |
|
""" |
|
|
|
if self.ratio_range is not None: |
|
scale, scale_idx = self.random_sample_ratio( |
|
self.img_scale[0], self.ratio_range) |
|
elif len(self.img_scale) == 1: |
|
scale, scale_idx = self.img_scale[0], 0 |
|
elif self.multiscale_mode == 'range': |
|
scale, scale_idx = self.random_sample(self.img_scale) |
|
elif self.multiscale_mode == 'value': |
|
scale, scale_idx = self.random_select(self.img_scale) |
|
else: |
|
raise NotImplementedError |
|
|
|
results['scale'] = scale |
|
results['scale_idx'] = scale_idx |
|
|
|
def _resize_img(self, results): |
|
"""Resize images with ``results['scale']``.""" |
|
|
|
img_shapes = [] |
|
pad_shapes = [] |
|
scale_factors = [] |
|
keep_ratios = [] |
|
new_gt_bboxes = [] |
|
new_centers2d = [] |
|
for i in range(len(results['img'])): |
|
if self.keep_ratio: |
|
img, scale_factor = mmcv.imrescale( |
|
results['img'][i], |
|
results['scale'], |
|
return_scale=True, |
|
backend=self.backend) |
|
|
|
|
|
new_h, new_w = img.shape[:2] |
|
h, w = results['img'][i].shape[:2] |
|
w_scale = new_w / w |
|
h_scale = new_h / h |
|
else: |
|
img, w_scale, h_scale = mmcv.imresize( |
|
results['img'][i], |
|
results['scale'], |
|
return_scale=True, |
|
backend=self.backend) |
|
results['img'][i] = img |
|
scale_factor = np.array([w_scale, h_scale, w_scale, h_scale], |
|
dtype=np.float32) |
|
img_shapes.append(img.shape) |
|
pad_shapes.append(img.shape) |
|
scale_factors.append(scale_factor) |
|
keep_ratios.append(self.keep_ratio) |
|
|
|
results['intrinsics'][i][0, 0] *= w_scale |
|
results['intrinsics'][i][0, 2] *= w_scale |
|
results['intrinsics'][i][1, 1] *= h_scale |
|
results['intrinsics'][i][1, 2] *= h_scale |
|
|
|
if 'gt_bboxes' in results.keys() and len(results['gt_bboxes']) > 0: |
|
gt_bboxes = results['gt_bboxes'][i] |
|
if len(gt_bboxes) > 0: |
|
gt_bboxes[:, 0] *= w_scale |
|
gt_bboxes[:, 1] *= h_scale |
|
gt_bboxes[:, 2] *= w_scale |
|
gt_bboxes[:, 3] *= h_scale |
|
new_gt_bboxes.append(gt_bboxes) |
|
|
|
if 'centers2d' in results.keys() and len(results['centers2d']) > 0: |
|
centers2d = results['centers2d'][i] |
|
if len(gt_bboxes) > 0: |
|
centers2d[:, 0] *= w_scale |
|
centers2d[:, 1] *= h_scale |
|
new_centers2d.append(centers2d) |
|
|
|
results['gt_bboxes'] = new_gt_bboxes |
|
results['centers2d'] = new_centers2d |
|
results['img_shape'] = img_shapes |
|
results['pad_shape'] = pad_shapes |
|
results['scale_factor'] = scale_factors |
|
results['keep_ratio'] = keep_ratios |
|
|
|
results['lidar2img'] = [results['intrinsics'][i] @ results['extrinsics'][i] for i in range(len(results['extrinsics']))] |
|
|
|
def __call__(self, results): |
|
"""Call function to resize images, bounding boxes, masks, semantic |
|
segmentation map. |
|
Args: |
|
results (dict): Result dict from loading pipeline. |
|
Returns: |
|
dict: Resized results, 'img_shape', 'pad_shape', 'scale_factor', \ |
|
'keep_ratio' keys are added into result dict. |
|
""" |
|
|
|
if 'scale' not in results: |
|
self._random_scale(results) |
|
else: |
|
if not self.override: |
|
assert 'scale_factor' not in results, ( |
|
'scale and scale_factor cannot be both set.') |
|
else: |
|
results.pop('scale') |
|
if 'scale_factor' in results: |
|
results.pop('scale_factor') |
|
self._random_scale(results) |
|
|
|
self._resize_img(results) |
|
|
|
return results |
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
repr_str += f'(img_scale={self.img_scale}, ' |
|
repr_str += f'multiscale_mode={self.multiscale_mode}, ' |
|
repr_str += f'ratio_range={self.ratio_range}, ' |
|
repr_str += f'keep_ratio={self.keep_ratio}, ' |
|
return repr_str |
|
|
|
@PIPELINES.register_module() |
|
class PadMultiViewImage(): |
|
"""Pad the multi-view image. |
|
There are two padding modes: (1) pad to a fixed size and (2) pad to the |
|
minimum size that is divisible by some number. |
|
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor", |
|
Args: |
|
size (tuple, optional): Fixed padding size. |
|
size_divisor (int, optional): The divisor of padded size. |
|
pad_val (float, optional): Padding value, 0 by default. |
|
""" |
|
def __init__(self, size=None, size_divisor=None, pad_val=0): |
|
self.size = size |
|
self.size_divisor = size_divisor |
|
self.pad_val = pad_val |
|
assert size is not None or size_divisor is not None |
|
assert size_divisor is None or size is None |
|
|
|
def _pad_img(self, results): |
|
"""Pad images according to ``self.size``.""" |
|
if self.size is not None: |
|
padded_img = [mmcv.impad(img, |
|
shape = self.size, pad_val=self.pad_val) for img in results['img']] |
|
elif self.size_divisor is not None: |
|
padded_img = [mmcv.impad_to_multiple(img, |
|
self.size_divisor, pad_val=self.pad_val) for img in results['img']] |
|
results['img_shape'] = [img.shape for img in results['img']] |
|
results['img'] = padded_img |
|
results['pad_shape'] = [img.shape for img in padded_img] |
|
results['pad_fix_size'] = self.size |
|
results['pad_size_divisor'] = self.size_divisor |
|
|
|
def __call__(self, results): |
|
"""Call function to pad images, masks, semantic segmentation maps. |
|
Args: |
|
results (dict): Result dict from loading pipeline. |
|
Returns: |
|
dict: Updated result dict. |
|
""" |
|
self._pad_img(results) |
|
return results |
|
|
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
repr_str += f'(size={self.size}, ' |
|
repr_str += f'size_divisor={self.size_divisor}, ' |
|
repr_str += f'pad_val={self.pad_val})' |
|
return repr_str |
|
|
|
def format_number(n, decimal_places=1): |
|
if abs(round(n, decimal_places)) <= 1e-2: |
|
return 0.0 |
|
else: |
|
format_string = f"{{n:+.{decimal_places}f}}" |
|
return format_string.format(n=n) |
|
|
|
|
|
@PIPELINES.register_module() |
|
class LoadAnnoatationVQA(): |
|
def __init__( |
|
self, |
|
base_vqa_path, |
|
base_desc_path, |
|
base_conv_path, |
|
base_key_path, |
|
tokenizer, |
|
max_length, |
|
n_gen=2, |
|
ignore_type=["v1", "v2", "v3"], |
|
lane_objs_info=None): |
|
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer, |
|
model_max_length=max_length, |
|
padding_side="right", |
|
use_fast=False, |
|
) |
|
self.n_gen = n_gen |
|
self.ignore_type = ignore_type |
|
self.tokenizer.pad_token = self.tokenizer.unk_token |
|
self.base_vqa_path = base_vqa_path |
|
self.base_desc_path = base_desc_path |
|
self.base_conv_path = base_conv_path |
|
self.base_key_path = base_key_path |
|
self.lane_objs_info = pickle.load(open(lane_objs_info, 'rb')) |
|
CLASSES = ('car', 'truck', 'trailer', 'bus', 'construction_vehicle', |
|
'bicycle', 'motorcycle', 'pedestrian', 'traffic_cone', |
|
'barrier') |
|
self.id2cat = {i: name for i, name in enumerate(CLASSES)} |
|
self.side = { |
|
'singapore': 'left', |
|
'boston': 'right', |
|
} |
|
self.template = [ |
|
"What can you tell about the current driving conditions from the images?", |
|
"What can be observed in the panoramic images provided?", |
|
"Can you provide a summary of the current driving scenario based on the input images?", |
|
"What can you observe from the provided images regarding the driving conditions?", |
|
"Please describe the current driving conditions based on the images provided.", |
|
"Can you describe the current weather conditions and the general environment depicted in the images?", |
|
"Please describe the current driving conditions based on the input images.", |
|
"Could you summarize the current driving conditions based on the input images?", |
|
"Please provide an overview of the current driving conditions based on the images.", |
|
"Can you summarize what the panoramic images show?", |
|
"Can you describe the overall conditions and environment based on the images?", |
|
"Could you describe the overall environment and objects captured in the images provided?" |
|
] |
|
|
|
def preprocess_vqa(self, results, traj): |
|
sources = [] |
|
if os.path.exists(self.base_key_path+results['sample_idx']+".json"): |
|
with open(self.base_key_path+results['sample_idx']+".json", 'r') as f: |
|
action = json.load(f) |
|
|
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": "Please shortly describe your driving action."}, |
|
{"from": 'gpt', |
|
"value": action} |
|
] |
|
) |
|
if os.path.exists(self.base_desc_path+results['sample_idx']+".json"): |
|
with open(self.base_desc_path+results['sample_idx']+".json", 'r') as f: |
|
desc = json.load(f) |
|
question = random.sample(self.template, 1)[0] |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": question}, |
|
{"from": 'gpt', |
|
"value": desc["description"]} |
|
] |
|
) |
|
if os.path.exists(self.base_vqa_path+results['sample_idx']+".json"): |
|
with open(self.base_vqa_path+results['sample_idx']+".json", 'r') as f: |
|
data_qa = json.load(f) |
|
for i, pair in enumerate(data_qa): |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": pair["question"]}, |
|
{"from": 'gpt', |
|
"value": pair["answer"]} |
|
] |
|
) |
|
|
|
if os.path.exists(self.base_conv_path+results['sample_idx']+".json"): |
|
with open(self.base_conv_path+results['sample_idx']+".json", 'r') as f: |
|
data_qa = json.load(f) |
|
for pair in data_qa: |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": pair["question"]}, |
|
{"from": 'gpt', |
|
"value": pair["answer"]} |
|
] |
|
) |
|
return sources |
|
|
|
def online_vqa(self, results): |
|
sources = [] |
|
|
|
gt_bboxes_2d = [] |
|
gt_bboxes_3d = copy.deepcopy(results['gt_bboxes_3d']) |
|
gt_bboxes_3d_points = gt_bboxes_3d.corners |
|
gt_bboxes_points = gt_bboxes_3d_points.view(-1, 3) |
|
gt_bboxes_points = np.concatenate((gt_bboxes_points[:, :3], np.ones(gt_bboxes_points.shape[0])[:, None]), axis=1) |
|
if "v1" not in self.ignore_type: |
|
for i, (cam_type, cam_info) in enumerate(results['cam_infos'].items()): |
|
gt_bboxes_points_cam = np.matmul(gt_bboxes_points, results['extrinsics'][i].T) |
|
bboxes = gt_bboxes_points_cam.reshape(-1, 8, 4) |
|
|
|
|
|
for j, box in enumerate(bboxes): |
|
box = box.transpose(1, 0) |
|
in_front = np.argwhere(box[2, :] > 0).flatten() |
|
corners_3d = box[:, in_front] |
|
|
|
corner_coords = view_points(corners_3d[:3, :], results['intrinsics'][i], True).T[:, :2].tolist() |
|
final_coords = post_process_coords(corner_coords) |
|
if final_coords is None: |
|
continue |
|
else: |
|
min_x, min_y, max_x, max_y = final_coords |
|
(height, width, _) = results['pad_shape'][0] |
|
|
|
min_x = np.clip(min_x, 0, width) |
|
min_y = np.clip(min_y, 0, height) |
|
max_x = np.clip(max_x, 0, width) |
|
max_y = np.clip(max_y, 0, height) |
|
w, h = max_x - min_x, max_y - min_y |
|
inter_w = max(0, min(min_x + w, width) - max(min_x, 0)) |
|
inter_h = max(0, min(min_y + h, height) - max(min_y, 0)) |
|
area = w * h |
|
if inter_w * inter_h == 0: |
|
continue |
|
if area <= 0 or w < 16 or h < 16: |
|
continue |
|
|
|
gt_bboxes_2d.append([round(min_x/width, 3), round(min_y/height, 3), round(max_x/width, 3), round(max_y/height, 3), j, cam_type]) |
|
|
|
|
|
if len(gt_bboxes_2d) >= 1: |
|
selected_objs = random.sample(gt_bboxes_2d, min(self.n_gen, len(gt_bboxes_2d))) |
|
for obj in selected_objs: |
|
answer = self.format_det_answer(obj[4], gt_bboxes_3d, results) |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": f"Please Identity the object in the <{obj[5]}, {obj[0]}, {obj[1]}, {obj[2]}, {obj[3]}> and describe its 3D information."}, |
|
{"from": 'gpt', |
|
"value": f"The object is a {answer}",} |
|
] |
|
) |
|
|
|
if len(gt_bboxes_3d) >= 1 and "v2" not in self.ignore_type: |
|
centers = torch.FloatTensor(max(self.n_gen, len(gt_bboxes_3d)), 2).uniform_(-50, 50) |
|
bbox_center = gt_bboxes_3d.center[:, :2] + 5 * (torch.rand_like(gt_bboxes_3d.center[:, :2]) * 2 - 1) |
|
centers = torch.cat([bbox_center, centers], dim=0) |
|
indices = torch.randperm(centers.size(0))[:self.n_gen] |
|
centers = centers[indices] |
|
|
|
for center in centers: |
|
objs_near = [] |
|
for i in range(len(gt_bboxes_3d)): |
|
gt_box = gt_bboxes_3d[i] |
|
dis = torch.norm(gt_box.center[0, :2] - center) |
|
if dis < 10: |
|
objs_near.append(self.format_det_answer(i, gt_bboxes_3d, results)) |
|
if len(objs_near) == 0: |
|
answer = f"There are no objects nearby." |
|
else: |
|
answer = "There are the following objects nearby:\n" |
|
answer += '\n'.join(objs_near) |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": f"What objects are there near the position ({format_number(center[0].item())}, {format_number(center[1].item())})?"}, |
|
{"from": 'gpt', |
|
"value": f"{answer}",} |
|
] |
|
) |
|
|
|
lane_objs = self.lane_objs_info[results['sample_idx']] |
|
if "lane_objects" in lane_objs.keys(): |
|
if "v3" not in self.ignore_type: |
|
index_list = [i for i in range(len(lane_objs['all_lane_pts']))] |
|
index_list = random.sample(index_list, min(self.n_gen, len(index_list))) |
|
for idx in index_list: |
|
if idx not in lane_objs['lane_objects'].keys(): |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": f"What objects are there on the lane {self.describe_lane([lane_objs['all_lane_pts'][idx]])}?"}, |
|
{"from": 'gpt', |
|
"value": f"There are no objects on this lane.",} |
|
] |
|
) |
|
else: |
|
objs = [] |
|
for obj in lane_objs['lane_objects'][idx]: |
|
name, bbox, vel = obj |
|
objs.append(self.format_lane_answer(bbox, vel, name)) |
|
answer = '\n'.join(objs) |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": f"What objects are there on the lane {self.describe_lane([lane_objs['all_lane_pts'][idx]])}?"}, |
|
{"from": 'gpt', |
|
"value": f"The objects on this lane include:\n{answer}",} |
|
] |
|
) |
|
|
|
return sources |
|
|
|
def describe_lane(self, bezier_lane): |
|
formatted_points = ", ".join(f"({format_number(point[0])}, {format_number(point[1])})" for point in bezier_lane[0]) |
|
result = f"[{formatted_points}]" |
|
return result |
|
|
|
def format_lane_answer(self, bbox, vel, name): |
|
x = bbox[0] |
|
y = bbox[1] |
|
z = bbox[2] |
|
l = bbox[3] |
|
w = bbox[4] |
|
h = bbox[5] |
|
yaw = bbox[6] |
|
yaw = math.degrees(yaw) |
|
vx = vel[0] |
|
vy =vel[1] |
|
|
|
position = analyze_position(x, y, yaw) |
|
|
|
answer = f"{name} in the {position} " |
|
answer += f"location: ({format_number(x)}, {format_number(y)}), " |
|
answer += f"length: {l:.1f}, width: {w:.1f}, height: {h:.1f}, " |
|
answer += f"angles in degrees: {format_number(yaw)}" |
|
if np.sqrt(vx**2 + vy**2) > 0.2: |
|
answer += f", velocity: ({format_number(vx)}, {format_number(vy)}). " |
|
else: |
|
answer += "." |
|
|
|
return answer |
|
|
|
def format_det_answer(self, index, gt_bboxes_3d, results): |
|
x = gt_bboxes_3d.tensor[index][0].item() |
|
y = gt_bboxes_3d.tensor[index][1].item() |
|
z = gt_bboxes_3d.tensor[index][2].item() |
|
l = gt_bboxes_3d.tensor[index][3].item() |
|
w = gt_bboxes_3d.tensor[index][4].item() |
|
h = gt_bboxes_3d.tensor[index][5].item() |
|
yaw = gt_bboxes_3d.tensor[index][6].item() |
|
vx = gt_bboxes_3d.tensor[index][7].item() |
|
vy = gt_bboxes_3d.tensor[index][8].item() |
|
yaw = math.degrees(yaw) |
|
position = analyze_position(x, y, yaw) |
|
|
|
answer = f"{self.id2cat[results['gt_labels_3d'][index]]} in the {position} " |
|
answer += f"location: ({format_number(x)}, {format_number(y)}), " |
|
answer += f"length: {l:.1f}, width: {w:.1f}, height: {h:.1f}, " |
|
answer += f"angles in degrees: {format_number(yaw)}" |
|
if np.sqrt(vx**2 + vy**2) > 0.2: |
|
answer += f", velocity: ({format_number(vx)}, {format_number(vy)}). " |
|
else: |
|
answer += "." |
|
|
|
return answer |
|
|
|
def __call__(self, results): |
|
traj = None |
|
if 'gt_planning' in results.keys(): |
|
planning_traj = results['gt_planning'][0 ,: , :2] |
|
mask = results['gt_planning_mask'][0].any(axis=1) |
|
planning_traj = planning_traj[mask] |
|
if len(planning_traj) == 6: |
|
formatted_points = ', '.join(f"({format_number(point[0], 2)}, {format_number(point[1], 2)})" for point in planning_traj) |
|
traj = f"Here is the planning trajectory [PT, {formatted_points}]." |
|
|
|
sources = self.preprocess_vqa(results, traj) |
|
prompt = f"You are driving in {results['location']}. " |
|
|
|
online_sources = self.online_vqa(results) |
|
sources += online_sources |
|
|
|
random.shuffle(sources) |
|
if 'gt_planning' in results.keys() and len(planning_traj) == 6: |
|
sources = [ |
|
[{"from": 'human', |
|
"value": "Please provide the planning trajectory for the ego car without reasons."}, |
|
{"from": 'gpt', |
|
"value": traj}] |
|
] + sources |
|
|
|
vqa_anno = [item for pair in sources for item in pair] |
|
vqa_anno[0]['value'] = DEFAULT_IMAGE_TOKEN + '\n' + prompt + vqa_anno[0]['value'] |
|
vqa_converted = preprocess([vqa_anno], self.tokenizer, True) |
|
input_ids = vqa_converted['input_ids'][0] |
|
vlm_labels = vqa_converted['labels'][0] |
|
|
|
results['input_ids'] = input_ids |
|
results['vlm_labels'] = vlm_labels |
|
|
|
return results |
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
return repr_str |
|
|
|
|
|
@PIPELINES.register_module() |
|
class LoadAnnoatationVQATest(): |
|
def __init__( |
|
self, |
|
base_conv_path, |
|
base_vqa_path, |
|
tokenizer, |
|
max_length, |
|
base_counter_path=None, |
|
load_type=["conv", "planning", "counter"], |
|
): |
|
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer, |
|
model_max_length=max_length, |
|
padding_side="right", |
|
use_fast=False, |
|
) |
|
self.tokenizer.pad_token = self.tokenizer.unk_token |
|
self.base_conv_path = base_conv_path |
|
self.base_vqa_path = base_vqa_path |
|
self.base_counter_path = base_counter_path |
|
self.load_type = load_type |
|
self.side = { |
|
'singapore': 'left', |
|
'boston': 'right', |
|
} |
|
self.template = [ |
|
"What can you tell about the current driving conditions from the images?", |
|
"What can be observed in the panoramic images provided?", |
|
"Can you provide a summary of the current driving scenario based on the input images?", |
|
"What can you observe from the provided images regarding the driving conditions?", |
|
"Please describe the current driving conditions based on the images provided.", |
|
"Can you describe the current weather conditions and the general environment depicted in the images?", |
|
"Please describe the current driving conditions based on the input images.", |
|
"Could you summarize the current driving conditions based on the input images?", |
|
"Please provide an overview of the current driving conditions based on the images.", |
|
"Can you summarize what the panoramic images show?", |
|
"Can you describe the overall conditions and environment based on the images?", |
|
"Could you describe the overall environment and objects captured in the images provided?" |
|
] |
|
|
|
def preprocess_vqa(self, results): |
|
sources = [] |
|
if "planning" in self.load_type: |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": "Please provide the planning trajectory for the ego car without reasons."}, |
|
{"from": 'gpt', |
|
"value": ""} |
|
] |
|
) |
|
if "short" in self.load_type: |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": "Please shortly describe your driving action."}, |
|
{"from": 'gpt', |
|
"value": ""} |
|
] |
|
) |
|
if "conv" in self.load_type: |
|
question = random.sample(self.template, 1)[0] |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": question}, |
|
{"from": 'gpt', |
|
"value": ""} |
|
] |
|
) |
|
if os.path.exists(self.base_conv_path+results['sample_idx']+".json"): |
|
with open(self.base_conv_path+results['sample_idx']+".json", 'r') as f: |
|
data_qa = json.load(f) |
|
|
|
for pair in data_qa: |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": pair["question"]}, |
|
{"from": 'gpt', |
|
"value": ""} |
|
] |
|
) |
|
if os.path.exists(self.base_vqa_path+results['sample_idx']+".json"): |
|
with open(self.base_vqa_path+results['sample_idx']+".json", 'r') as f: |
|
data_qa = json.load(f) |
|
|
|
for pair in data_qa: |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": pair["question"]}, |
|
{"from": 'gpt', |
|
"value": ""} |
|
] |
|
) |
|
if "counter" in self.load_type: |
|
all_counters = pickle.load(open(os.path.join(self.base_counter_path + results['sample_idx']+'.pkl'), 'rb')) |
|
for data in all_counters: |
|
sources.append( |
|
[ |
|
{"from": 'human', |
|
"value": f"If you follow the trajectory {data['traj']}, what would happen?"}, |
|
{"from": 'gpt', |
|
"value": ""} |
|
] |
|
) |
|
return sources |
|
|
|
|
|
def __call__(self, results): |
|
sources = self.preprocess_vqa(results) |
|
prompt = f"You are driving in {results['location']}. " |
|
|
|
vlm_labels = [anno[0]['value'] for anno in sources] |
|
|
|
for anno in sources: |
|
anno[0]['value'] = DEFAULT_IMAGE_TOKEN + '\n' + prompt + anno[0]['value'] |
|
anno[1]['value'] = '' |
|
vqa_converted = preprocess(sources, self.tokenizer, True, False) |
|
input_ids = vqa_converted['input_ids'] |
|
results['input_ids'] = input_ids |
|
results['vlm_labels'] = vlm_labels |
|
|
|
return results |
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
return repr_str |
|
|
|
|
|
@PIPELINES.register_module() |
|
class NormalizeMultiviewImage(object): |
|
"""Normalize the image. |
|
Added key is "img_norm_cfg". |
|
Args: |
|
mean (sequence): Mean values of 3 channels. |
|
std (sequence): Std values of 3 channels. |
|
to_rgb (bool): Whether to convert the image from BGR to RGB, |
|
default is true. |
|
""" |
|
|
|
def __init__(self, mean, std, to_rgb=True): |
|
self.mean = np.array(mean, dtype=np.float32) |
|
self.std = np.array(std, dtype=np.float32) |
|
self.to_rgb = to_rgb |
|
|
|
def __call__(self, results): |
|
"""Call function to normalize images. |
|
Args: |
|
results (dict): Result dict from loading pipeline. |
|
Returns: |
|
dict: Normalized results, 'img_norm_cfg' key is added into |
|
result dict. |
|
""" |
|
results['img'] = [mmcv.imnormalize( |
|
img, self.mean, self.std, self.to_rgb) for img in results['img']] |
|
results['img_norm_cfg'] = dict( |
|
mean=self.mean, std=self.std, to_rgb=self.to_rgb) |
|
return results |
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
repr_str += f'(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})' |
|
return repr_str |
|
|
|
|
|
@PIPELINES.register_module() |
|
class ResizeCropFlipRotImage(): |
|
def __init__(self, data_aug_conf=None, with_2d=True, filter_invisible=True, training=True): |
|
self.data_aug_conf = data_aug_conf |
|
self.training = training |
|
self.min_size = 2.0 |
|
self.with_2d = with_2d |
|
self.filter_invisible = filter_invisible |
|
|
|
def __call__(self, results): |
|
|
|
imgs = results['img'] |
|
N = len(imgs) |
|
new_imgs = [] |
|
new_gt_bboxes = [] |
|
new_centers2d = [] |
|
new_gt_labels = [] |
|
new_depths = [] |
|
assert self.data_aug_conf['rot_lim'] == (0.0, 0.0), "Rotation is not currently supported" |
|
|
|
resize, resize_dims, crop, flip, rotate = self._sample_augmentation() |
|
|
|
|
|
for i in range(N): |
|
img = Image.fromarray(np.uint8(imgs[i])) |
|
img, ida_mat = self._img_transform( |
|
img, |
|
resize=resize, |
|
resize_dims=resize_dims, |
|
crop=crop, |
|
flip=flip, |
|
rotate=rotate, |
|
) |
|
if self.training and self.with_2d: |
|
gt_bboxes = results['gt_bboxes'][i] |
|
centers2d = results['centers2d'][i] |
|
gt_labels = results['gt_labels'][i] |
|
depths = results['depths'][i] |
|
if len(gt_bboxes) != 0: |
|
gt_bboxes, centers2d, gt_labels, depths = self._bboxes_transform( |
|
gt_bboxes, |
|
centers2d, |
|
gt_labels, |
|
depths, |
|
resize=resize, |
|
crop=crop, |
|
flip=flip, |
|
) |
|
if len(gt_bboxes) != 0 and self.filter_invisible: |
|
gt_bboxes, centers2d, gt_labels, depths = self._filter_invisible(gt_bboxes, centers2d, gt_labels, depths) |
|
|
|
new_gt_bboxes.append(gt_bboxes) |
|
new_centers2d.append(centers2d) |
|
new_gt_labels.append(gt_labels) |
|
new_depths.append(depths) |
|
|
|
new_imgs.append(np.array(img).astype(np.float32)) |
|
results['intrinsics'][i][:3, :3] = ida_mat @ results['intrinsics'][i][:3, :3] |
|
results['gt_bboxes'] = new_gt_bboxes |
|
results['centers2d'] = new_centers2d |
|
results['gt_labels'] = new_gt_labels |
|
results['depths'] = new_depths |
|
results['img'] = new_imgs |
|
results['lidar2img'] = [results['intrinsics'][i] @ results['extrinsics'][i] for i in range(len(results['extrinsics']))] |
|
|
|
return results |
|
|
|
def _bboxes_transform(self, bboxes, centers2d, gt_labels, depths,resize, crop, flip): |
|
assert len(bboxes) == len(centers2d) == len(gt_labels) == len(depths) |
|
fH, fW = self.data_aug_conf["final_dim"] |
|
bboxes = bboxes * resize |
|
bboxes[:, 0] = bboxes[:, 0] - crop[0] |
|
bboxes[:, 1] = bboxes[:, 1] - crop[1] |
|
bboxes[:, 2] = bboxes[:, 2] - crop[0] |
|
bboxes[:, 3] = bboxes[:, 3] - crop[1] |
|
bboxes[:, 0] = np.clip(bboxes[:, 0], 0, fW) |
|
bboxes[:, 2] = np.clip(bboxes[:, 2], 0, fW) |
|
bboxes[:, 1] = np.clip(bboxes[:, 1], 0, fH) |
|
bboxes[:, 3] = np.clip(bboxes[:, 3], 0, fH) |
|
keep = ((bboxes[:, 2] - bboxes[:, 0]) >= self.min_size) & ((bboxes[:, 3] - bboxes[:, 1]) >= self.min_size) |
|
|
|
|
|
if flip: |
|
x0 = bboxes[:, 0].copy() |
|
x1 = bboxes[:, 2].copy() |
|
bboxes[:, 2] = fW - x0 |
|
bboxes[:, 0] = fW - x1 |
|
bboxes = bboxes[keep] |
|
|
|
centers2d = centers2d * resize |
|
centers2d[:, 0] = centers2d[:, 0] - crop[0] |
|
centers2d[:, 1] = centers2d[:, 1] - crop[1] |
|
centers2d[:, 0] = np.clip(centers2d[:, 0], 0, fW) |
|
centers2d[:, 1] = np.clip(centers2d[:, 1], 0, fH) |
|
if flip: |
|
centers2d[:, 0] = fW - centers2d[:, 0] |
|
|
|
centers2d = centers2d[keep] |
|
gt_labels = gt_labels[keep] |
|
depths = depths[keep] |
|
|
|
return bboxes, centers2d, gt_labels, depths |
|
|
|
|
|
def _filter_invisible(self, bboxes, centers2d, gt_labels, depths): |
|
|
|
assert len(bboxes) == len(centers2d) == len(gt_labels) == len(depths) |
|
fH, fW = self.data_aug_conf["final_dim"] |
|
indices_maps = np.zeros((fH,fW)) |
|
tmp_bboxes = np.zeros_like(bboxes) |
|
tmp_bboxes[:, :2] = np.ceil(bboxes[:, :2]) |
|
tmp_bboxes[:, 2:] = np.floor(bboxes[:, 2:]) |
|
tmp_bboxes = tmp_bboxes.astype(np.int64) |
|
sort_idx = np.argsort(-depths, axis=0, kind='stable') |
|
tmp_bboxes = tmp_bboxes[sort_idx] |
|
bboxes = bboxes[sort_idx] |
|
depths = depths[sort_idx] |
|
centers2d = centers2d[sort_idx] |
|
gt_labels = gt_labels[sort_idx] |
|
for i in range(bboxes.shape[0]): |
|
u1, v1, u2, v2 = tmp_bboxes[i] |
|
indices_maps[v1:v2, u1:u2] = i |
|
indices_res = np.unique(indices_maps).astype(np.int64) |
|
bboxes = bboxes[indices_res] |
|
depths = depths[indices_res] |
|
centers2d = centers2d[indices_res] |
|
gt_labels = gt_labels[indices_res] |
|
|
|
return bboxes, centers2d, gt_labels, depths |
|
|
|
|
|
|
|
def _get_rot(self, h): |
|
return torch.Tensor( |
|
[ |
|
[np.cos(h), np.sin(h)], |
|
[-np.sin(h), np.cos(h)], |
|
] |
|
) |
|
|
|
def _img_transform(self, img, resize, resize_dims, crop, flip, rotate): |
|
ida_rot = torch.eye(2) |
|
ida_tran = torch.zeros(2) |
|
|
|
img = img.resize(resize_dims) |
|
img = img.crop(crop) |
|
if flip: |
|
img = img.transpose(method=Image.FLIP_LEFT_RIGHT) |
|
img = img.rotate(rotate) |
|
|
|
|
|
ida_rot *= resize |
|
ida_tran -= torch.Tensor(crop[:2]) |
|
if flip: |
|
A = torch.Tensor([[-1, 0], [0, 1]]) |
|
b = torch.Tensor([crop[2] - crop[0], 0]) |
|
ida_rot = A.matmul(ida_rot) |
|
ida_tran = A.matmul(ida_tran) + b |
|
A = self._get_rot(rotate / 180 * np.pi) |
|
b = torch.Tensor([crop[2] - crop[0], crop[3] - crop[1]]) / 2 |
|
b = A.matmul(-b) + b |
|
ida_rot = A.matmul(ida_rot) |
|
ida_tran = A.matmul(ida_tran) + b |
|
ida_mat = torch.eye(3) |
|
ida_mat[:2, :2] = ida_rot |
|
ida_mat[:2, 2] = ida_tran |
|
return img, ida_mat |
|
|
|
def _sample_augmentation(self): |
|
H, W = self.data_aug_conf["H"], self.data_aug_conf["W"] |
|
fH, fW = self.data_aug_conf["final_dim"] |
|
if self.training: |
|
resize = np.random.uniform(*self.data_aug_conf["resize_lim"]) |
|
resize_dims = (int(W * resize), int(H * resize)) |
|
newW, newH = resize_dims |
|
crop_h = int((1 - np.random.uniform(*self.data_aug_conf["bot_pct_lim"])) * newH) - fH |
|
crop_w = int(np.random.uniform(0, max(0, newW - fW))) |
|
crop = (crop_w, crop_h, crop_w + fW, crop_h + fH) |
|
flip = False |
|
if self.data_aug_conf["rand_flip"] and np.random.choice([0, 1]): |
|
flip = True |
|
rotate = np.random.uniform(*self.data_aug_conf["rot_lim"]) |
|
else: |
|
resize = max(fH / H, fW / W) |
|
resize_dims = (int(W * resize), int(H * resize)) |
|
newW, newH = resize_dims |
|
crop_h = int((1 - np.mean(self.data_aug_conf["bot_pct_lim"])) * newH) - fH |
|
crop_w = int(max(0, newW - fW) / 2) |
|
crop = (crop_w, crop_h, crop_w + fW, crop_h + fH) |
|
flip = False |
|
rotate = 0 |
|
return resize, resize_dims, crop, flip, rotate |
|
|
|
@PIPELINES.register_module() |
|
class GlobalRotScaleTransImage(): |
|
def __init__( |
|
self, |
|
rot_range=[-0.3925, 0.3925], |
|
scale_ratio_range=[0.95, 1.05], |
|
translation_std=[0, 0, 0], |
|
reverse_angle=False, |
|
training=True, |
|
): |
|
|
|
self.rot_range = rot_range |
|
self.scale_ratio_range = scale_ratio_range |
|
self.translation_std = translation_std |
|
|
|
self.reverse_angle = reverse_angle |
|
self.training = training |
|
|
|
def __call__(self, results): |
|
|
|
translation_std = np.array(self.translation_std, dtype=np.float32) |
|
|
|
rot_angle = np.random.uniform(*self.rot_range) |
|
scale_ratio = np.random.uniform(*self.scale_ratio_range) |
|
trans = np.random.normal(scale=translation_std, size=3).T |
|
|
|
self._rotate_bev_along_z(results, rot_angle) |
|
if self.reverse_angle: |
|
rot_angle = rot_angle * -1 |
|
results["gt_bboxes_3d"].rotate( |
|
np.array(rot_angle) |
|
) |
|
|
|
|
|
self._scale_xyz(results, scale_ratio) |
|
results["gt_bboxes_3d"].scale(scale_ratio) |
|
|
|
|
|
self._trans_xyz(results, trans) |
|
results["gt_bboxes_3d"].translate(trans) |
|
|
|
return results |
|
|
|
def _trans_xyz(self, results, trans): |
|
trans_mat = torch.eye(4, 4) |
|
trans_mat[:3, -1] = torch.from_numpy(trans).reshape(1, 3) |
|
trans_mat_inv = torch.inverse(trans_mat) |
|
num_view = len(results["lidar2img"]) |
|
results['ego_pose'] = (torch.tensor(results["ego_pose"]).float() @ trans_mat_inv).numpy() |
|
results['ego_pose_inv'] = (trans_mat.float() @ torch.tensor(results["ego_pose_inv"])).numpy() |
|
|
|
for view in range(num_view): |
|
results["lidar2img"][view] = (torch.tensor(results["lidar2img"][view]).float() @ trans_mat_inv).numpy() |
|
|
|
|
|
def _rotate_bev_along_z(self, results, angle): |
|
rot_cos = torch.cos(torch.tensor(angle)) |
|
rot_sin = torch.sin(torch.tensor(angle)) |
|
|
|
rot_mat = torch.tensor([[rot_cos, rot_sin, 0, 0], [-rot_sin, rot_cos, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]]) |
|
rot_mat_inv = torch.inverse(rot_mat) |
|
|
|
results['ego_pose'] = (torch.tensor(results["ego_pose"]).float() @ rot_mat_inv).numpy() |
|
results['ego_pose_inv'] = (rot_mat.float() @ torch.tensor(results["ego_pose_inv"])).numpy() |
|
num_view = len(results["lidar2img"]) |
|
for view in range(num_view): |
|
results["lidar2img"][view] = (torch.tensor(results["lidar2img"][view]).float() @ rot_mat_inv).numpy() |
|
|
|
def _scale_xyz(self, results, scale_ratio): |
|
scale_mat = torch.tensor( |
|
[ |
|
[scale_ratio, 0, 0, 0], |
|
[0, scale_ratio, 0, 0], |
|
[0, 0, scale_ratio, 0], |
|
[0, 0, 0, 1], |
|
] |
|
) |
|
|
|
scale_mat_inv = torch.inverse(scale_mat) |
|
|
|
results['ego_pose'] = (torch.tensor(results["ego_pose"]).float() @ scale_mat_inv).numpy() |
|
results['ego_pose_inv'] = (scale_mat @ torch.tensor(results["ego_pose_inv"]).float()).numpy() |
|
|
|
num_view = len(results["lidar2img"]) |
|
for view in range(num_view): |
|
results["lidar2img"][view] = (torch.tensor(results["lidar2img"][view]).float() @ scale_mat_inv).numpy() |
|
|
|
@PIPELINES.register_module() |
|
class CustomPadMultiViewImage: |
|
|
|
def __init__(self, size_divisor=None, pad_val=0): |
|
self.size_divisor = size_divisor |
|
self.pad_val = pad_val |
|
|
|
def __call__(self, results): |
|
max_h = max([img.shape[0] for img in results['img']]) |
|
max_w = max([img.shape[1] for img in results['img']]) |
|
padded_img = [mmcv.impad(img, shape=(max_h, max_w), pad_val=self.pad_val) for img in results['img']] |
|
if self.size_divisor is not None: |
|
padded_img = [mmcv.impad_to_multiple( |
|
img, self.size_divisor, pad_val=self.pad_val) for img in padded_img] |
|
|
|
results['img'] = padded_img |
|
results['pad_shape'] = [img.shape for img in padded_img] |
|
results['pad_fixed_size'] = None |
|
results['pad_size_divisor'] = self.size_divisor |
|
|
|
return results |
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
repr_str += f'size_divisor={self.size_divisor}, ' |
|
repr_str += f'pad_val={self.pad_val})' |
|
return repr_str |
|
|
|
@PIPELINES.register_module() |
|
class CustomParameterizeLane: |
|
|
|
def __init__(self, method, n_control): |
|
self.method = method |
|
self.n_control = n_control |
|
|
|
def __call__(self, results): |
|
centerlines = results['ann_info']['lane_pts'] |
|
para_centerlines = getattr(self, self.method)(centerlines, self.n_control) |
|
results['lane_pts'] = para_centerlines |
|
return results |
|
|
|
def comb(self, n, k): |
|
return factorial(n) // (factorial(k) * factorial(n - k)) |
|
|
|
def fit_bezier(self, points, n_control): |
|
n_points = len(points) |
|
A = np.zeros((n_points, n_control)) |
|
t = np.arange(n_points) / (n_points - 1) |
|
for i in range(n_points): |
|
for j in range(n_control): |
|
A[i, j] = self.comb(n_control - 1, j) * np.power(1 - t[i], n_control - 1 - j) * np.power(t[i], j) |
|
conts = np.linalg.lstsq(A, points, rcond=None) |
|
return conts |
|
|
|
def fit_bezier_Endpointfixed(self, points, n_control): |
|
n_points = len(points) |
|
A = np.zeros((n_points, n_control)) |
|
t = np.arange(n_points) / (n_points - 1) |
|
for i in range(n_points): |
|
for j in range(n_control): |
|
A[i, j] = self.comb(n_control - 1, j) * np.power(1 - t[i], n_control - 1 - j) * np.power(t[i], j) |
|
A_BE = A[1:-1, 1:-1] |
|
_points = points[1:-1] |
|
_points = _points - A[1:-1, 0].reshape(-1, 1) @ points[0].reshape(1, -1) - A[1:-1, -1].reshape(-1, 1) @ points[-1].reshape(1, -1) |
|
|
|
conts = np.linalg.lstsq(A_BE, _points, rcond=None) |
|
|
|
control_points = np.zeros((n_control, points.shape[1])) |
|
control_points[0] = points[0] |
|
control_points[-1] = points[-1] |
|
control_points[1:-1] = conts[0] |
|
|
|
return control_points |
|
|
|
def bezier_Endpointfixed(self, input_data, n_control=4): |
|
coeffs_list = [] |
|
for idx, centerline in enumerate(input_data): |
|
res = self.fit_bezier_Endpointfixed(centerline, n_control) |
|
coeffs = res.flatten() |
|
coeffs_list.append(coeffs) |
|
return np.array(coeffs_list, dtype=np.float32) |
|
|
|
@PIPELINES.register_module() |
|
class PhotoMetricDistortionMultiViewImage: |
|
r""" |
|
Notes |
|
----- |
|
Adapted from https://github.com/fundamentalvision/BEVFormer/blob/master/projects/mmdet3d_plugin/datasets/pipelines/transform_3d.py#L99. |
|
|
|
Apply photometric distortion to image sequentially, every transformation |
|
is applied with a probability of 0.5. The position of random contrast is in |
|
second or second to last. |
|
1. random brightness |
|
2. random contrast (mode 0) |
|
3. convert color from BGR to HSV |
|
4. random saturation |
|
5. random hue |
|
6. convert color from HSV to BGR |
|
7. random contrast (mode 1) |
|
8. randomly swap channels |
|
Args: |
|
brightness_delta (int): delta of brightness. |
|
contrast_range (tuple): range of contrast. |
|
saturation_range (tuple): range of saturation. |
|
hue_delta (int): delta of hue. |
|
""" |
|
|
|
def __init__(self, |
|
brightness_delta=32, |
|
contrast_range=(0.5, 1.5), |
|
saturation_range=(0.5, 1.5), |
|
hue_delta=18): |
|
self.brightness_delta = brightness_delta |
|
self.contrast_lower, self.contrast_upper = contrast_range |
|
self.saturation_lower, self.saturation_upper = saturation_range |
|
self.hue_delta = hue_delta |
|
|
|
def __call__(self, results): |
|
"""Call function to perform photometric distortion on images. |
|
Args: |
|
results (dict): Result dict from loading pipeline. |
|
Returns: |
|
dict: Result dict with images distorted. |
|
""" |
|
imgs = results['img'] |
|
new_imgs = [] |
|
for img in imgs: |
|
assert img.dtype == np.float32, \ |
|
'PhotoMetricDistortion needs the input image of dtype np.float32,'\ |
|
' please set "to_float32=True" in "LoadImageFromFile" pipeline' |
|
|
|
if np.random.randint(2): |
|
delta = random.uniform(-self.brightness_delta, |
|
self.brightness_delta) |
|
img += delta |
|
|
|
|
|
|
|
mode = np.random.randint(2) |
|
if mode == 1: |
|
if np.random.randint(2): |
|
alpha = np.random.uniform(self.contrast_lower, |
|
self.contrast_upper) |
|
img *= alpha |
|
|
|
|
|
img = mmcv.bgr2hsv(img) |
|
|
|
|
|
if np.random.randint(2): |
|
img[..., 1] *= np.random.uniform(self.saturation_lower, |
|
self.saturation_upper) |
|
|
|
|
|
if np.random.randint(2): |
|
img[..., 0] += np.random.uniform(-self.hue_delta, self.hue_delta) |
|
img[..., 0][img[..., 0] > 360] -= 360 |
|
img[..., 0][img[..., 0] < 0] += 360 |
|
|
|
|
|
img = mmcv.hsv2bgr(img) |
|
|
|
|
|
if mode == 0: |
|
if np.random.randint(2): |
|
alpha = np.random.uniform(self.contrast_lower, |
|
self.contrast_upper) |
|
img *= alpha |
|
|
|
|
|
if np.random.randint(2): |
|
img = img[..., np.random.permutation(3)] |
|
new_imgs.append(img) |
|
results['img'] = new_imgs |
|
return results |
|
|
|
def __repr__(self): |
|
repr_str = self.__class__.__name__ |
|
repr_str += f'(\nbrightness_delta={self.brightness_delta},\n' |
|
repr_str += 'contrast_range=' |
|
repr_str += f'{(self.contrast_lower, self.contrast_upper)},\n' |
|
repr_str += 'saturation_range=' |
|
repr_str += f'{(self.saturation_lower, self.saturation_upper)},\n' |
|
repr_str += f'hue_delta={self.hue_delta})' |
|
return repr_str |
|
|