navsim_ours / navsim /agents /vadv2 /vadv2_features.py
lkllkl's picture
Upload folder using huggingface_hub
da2e2ac verified
from enum import IntEnum
from typing import Any, Dict, List, Tuple
import cv2
import numpy as np
import numpy.typing as npt
import torch
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.common.actor_state.oriented_box import OrientedBox
from nuplan.common.actor_state.state_representation import StateSE2, TimePoint, StateVector2D
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters
from nuplan.common.geometry.convert import absolute_to_relative_poses
from nuplan.common.maps.abstract_map import AbstractMap, SemanticMapLayer, MapObject
from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling
from shapely import affinity
from shapely.geometry import Polygon, LineString
from torchvision import transforms
from det_map.data.datasets.lidar_utils import transform_points
from navsim.agents.vadv2.vadv2_config import Vadv2Config
from navsim.common.dataclasses import AgentInput, Scene, Annotations
from navsim.common.enums import BoundingBoxIndex, LidarIndex
from navsim.evaluate.pdm_score import transform_trajectory, get_trajectory_as_array
from navsim.planning.scenario_builder.navsim_scenario_utils import tracked_object_types
from navsim.planning.simulation.planner.pdm_planner.utils.pdm_enums import StateIndex
from navsim.planning.training.abstract_feature_target_builder import (
AbstractFeatureBuilder,
AbstractTargetBuilder,
)
# todo temporal input
# todo velocity regression
class Vadv2FeatureBuilder(AbstractFeatureBuilder):
def __init__(self, config: Vadv2Config):
self._config = config
def get_unique_name(self) -> str:
"""Inherited, see superclass."""
return "transfuser_feature"
def compute_features(self, agent_input: AgentInput) -> Dict[str, torch.Tensor]:
"""Inherited, see superclass."""
features = {}
features["camera_feature"] = self._get_camera_feature(agent_input)
# todo pers bev
if self._config.use_pers_bev_embed:
features["pers_bev"] = self._get_pers_bev(agent_input)
if self._config.lidar_seq_len == 4:
features["lidar_feature"] = self._get_lidar_feature_4f(agent_input)
else:
features["lidar_feature"] = self._get_lidar_feature(agent_input)
ego_status_list = []
for i in range(self._config.num_ego_status):
# i=0: idx=-1
# i=1: idx=-2
# i=2: idx=-3
# i=3: idx=-4
idx = - (i + 1)
ego_status_list += [
torch.tensor(agent_input.ego_statuses[idx].driving_command, dtype=torch.float32),
torch.tensor(agent_input.ego_statuses[idx].ego_velocity, dtype=torch.float32),
torch.tensor(agent_input.ego_statuses[idx].ego_acceleration, dtype=torch.float32),
]
features["status_feature"] = torch.concatenate(
ego_status_list
)
# # add image_meta
# cams_all_frames = [[
# tmp.cam_f0,
# tmp.cam_l0,
# # tmp.cam_l1,
# # tmp.cam_l2,
# tmp.cam_r0,
# # tmp.cam_r1,
# # tmp.cam_r2,
# # tmp.cam_b0
# ] for tmp in agent_input.cameras]
#
# image, canvas, sensor2lidar_rotation, sensor2lidar_translation, intrinsics, distortion, post_rot, post_tran = [], [], [], [], [], [], [], []
# for cams_frame_t in cams_all_frames:
# image_t, canvas_t, sensor2lidar_rotation_t, sensor2lidar_translation_t, intrinsics_t, distortion_t, post_rot_t, post_tran_t = [], [], [], [], [], [], [], []
# for cam in cams_frame_t:
# cam_processed: Camera = img_pipeline(cam)
# image_t.append(cam_processed.image)
# canvas_t.append(cam_processed.canvas)
# sensor2lidar_rotation_t.append(cam_processed.sensor2lidar_rotation)
# sensor2lidar_translation_t.append(cam_processed.sensor2lidar_translation)
# intrinsics_t.append(cam_processed.intrinsics)
# distortion_t.append(cam_processed.distortion)
# post_rot_t.append(cam_processed.post_rot)
# post_tran_t.append(cam_processed.post_tran)
# image.append(torch.stack(image_t))
# canvas.append(torch.stack(canvas_t))
# sensor2lidar_rotation.append(torch.stack(sensor2lidar_rotation_t))
# sensor2lidar_translation.append(torch.stack(sensor2lidar_translation_t))
# intrinsics.append(torch.stack(intrinsics_t))
# distortion.append(torch.stack(distortion_t))
# post_rot.append(torch.stack(post_rot_t))
# post_tran.append(torch.stack(post_tran_t))
#
# features["sensor2lidar_rotation"] = torch.stack(sensor2lidar_rotation).to(imgs)
# features["sensor2lidar_translation"] = torch.stack(sensor2lidar_translation).to(imgs)
# features["intrinsics"] = torch.stack(intrinsics).to(imgs)
return features
def _get_pers_bev(self, agent_input: AgentInput) -> torch.Tensor:
return None
def _get_camera_feature(self, agent_input: AgentInput) -> torch.Tensor:
"""
Extract stitched camera from AgentInput
:param agent_input: input dataclass
:return: stitched front view image as torch tensor
"""
cameras = agent_input.cameras[-1]
# Crop to ensure 4:1 aspect ratio
l0 = cameras.cam_l0.image[28:-28, 416:-416]
f0 = cameras.cam_f0.image[28:-28]
r0 = cameras.cam_r0.image[28:-28, 416:-416]
# stitch l0, f0, r0 images
stitched_image = np.concatenate([l0, f0, r0], axis=1)
resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height))
tensor_image = transforms.ToTensor()(resized_image)
return tensor_image
def _get_lidar_feature(self, agent_input: AgentInput) -> torch.Tensor:
"""
Compute LiDAR feature as 2D histogram, according to Transfuser
:param agent_input: input dataclass
:return: LiDAR histogram as torch tensors
"""
# only consider (x,y,z) & swap axes for (N,3) numpy array
lidar_pc = agent_input.lidars[-1].lidar_pc[LidarIndex.POSITION].T
# NOTE: Code from
# https://github.com/autonomousvision/carla_garage/blob/main/team_code/data.py#L873
def splat_points(point_cloud):
# 256 x 256 grid
xbins = np.linspace(
self._config.lidar_min_x,
self._config.lidar_max_x,
(self._config.lidar_max_x - self._config.lidar_min_x)
* int(self._config.pixels_per_meter)
+ 1,
)
ybins = np.linspace(
self._config.lidar_min_y,
self._config.lidar_max_y,
(self._config.lidar_max_y - self._config.lidar_min_y)
* int(self._config.pixels_per_meter)
+ 1,
)
hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0]
hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel
overhead_splat = hist / self._config.hist_max_per_pixel
return overhead_splat
# Remove points above the vehicle
lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar]
below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height]
above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height]
above_features = splat_points(above)
if self._config.use_ground_plane:
below_features = splat_points(below)
features = np.stack([below_features, above_features], axis=-1)
else:
features = np.stack([above_features], axis=-1)
features = np.transpose(features, (2, 0, 1)).astype(np.float32)
return torch.tensor(features)
def _get_lidar_feature_4f(self, agent_input: AgentInput) -> torch.Tensor:
"""
Compute LiDAR feature as 2D histogram, according to Transfuser
:param agent_input: input dataclass
:return: LiDAR histogram as torch tensors
"""
# only consider (x,y,z) & swap axes for (N,3) numpy array
# lidar_pc = agent_input.lidars[-1].lidar_pc[LidarIndex.POSITION].T
lidars = [np.copy(tmp.lidar_pc) for tmp in agent_input.lidars]
# timestamps_ori = agent_input.timestamps
# timestamps = [(timestamps_ori[-1] - tmp) / 1e6 for tmp in timestamps_ori]
ego2globals = [tmp for tmp in agent_input.ego2globals]
global2ego_key = np.linalg.inv(ego2globals[-1])
# ego2global, global2ego key frame
lidars_warped = [transform_points(transform_points(pts, mat), global2ego_key)
for pts, mat in zip(lidars[:-1], ego2globals[:-1])]
lidars_warped.append(lidars[-1])
# NOTE: Code from
# https://github.com/autonomousvision/carla_garage/blob/main/team_code/data.py#L873
def splat_points(point_cloud):
# 256 x 256 grid
xbins = np.linspace(
self._config.lidar_min_x,
self._config.lidar_max_x,
(self._config.lidar_max_x - self._config.lidar_min_x)
* int(self._config.pixels_per_meter)
+ 1,
)
ybins = np.linspace(
self._config.lidar_min_y,
self._config.lidar_max_y,
(self._config.lidar_max_y - self._config.lidar_min_y)
* int(self._config.pixels_per_meter)
+ 1,
)
hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0]
hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel
overhead_splat = hist / self._config.hist_max_per_pixel
return overhead_splat
# Remove points above the vehicle
lidar_feats = []
for lidar_pc in lidars_warped:
lidar_pc = lidar_pc.T
lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar]
below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height]
above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height]
above_features = splat_points(above)
if self._config.use_ground_plane:
below_features = splat_points(below)
features = np.stack([below_features, above_features], axis=-1)
else:
features = np.stack([above_features], axis=-1)
features = np.transpose(features, (2, 0, 1)).astype(np.float32)
# append timestamps
lidar_feats.append(torch.tensor(features))
return torch.cat(lidar_feats, 0).contiguous()
class Vadv2TargetBuilder(AbstractTargetBuilder):
def __init__(self, config: Vadv2Config):
self._config = config
self.v_params = get_pacifica_parameters()
# lidar_resolution_width = 256
# lidar_resolution_height = 256
# self.dense_layers: List[SemanticMapLayer] = [
# SemanticMapLayer.DRIVABLE_AREA,
# SemanticMapLayer.CROSSWALK
# ]
# self.dense_layers_labels = [
# 1, 2
# ]
# self.discrete_layers: List[SemanticMapLayer] = [
# SemanticMapLayer.LANE,
# SemanticMapLayer.LANE_CONNECTOR,
# ]
# self.radius = 32.0
# self.bev_pixel_width: int = lidar_resolution_width
# self.bev_pixel_height: int = lidar_resolution_height
# self.bev_pixel_size: float = 0.25
# self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width)
# self.padding_value = -10000
# self.sample_dist = 1
# self.num_samples = 250
# self.padding = False
# self.fixed_num = 20
def get_unique_name(self) -> str:
"""Inherited, see superclass."""
return "transfuser_target"
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
"""Inherited, see superclass."""
future_traj = scene.get_future_trajectory(
num_trajectory_frames=self._config.trajectory_sampling.num_poses
)
trajectory = torch.tensor(future_traj.poses)
frame_idx = scene.scene_metadata.num_history_frames - 1
annotations = scene.frames[frame_idx].annotations
ego_pose = StateSE2(*scene.frames[frame_idx].ego_status.ego_pose)
agent_states, agent_labels = self._compute_agent_targets(annotations)
bev_semantic_map = self._compute_bev_semantic_map(annotations, scene.map_api, ego_pose)
ego_state = EgoState.build_from_rear_axle(
StateSE2(*scene.frames[frame_idx].ego_status.ego_pose),
tire_steering_angle=0.0,
vehicle_parameters=self.v_params,
time_point=TimePoint(scene.frames[frame_idx].timestamp),
rear_axle_velocity_2d=StateVector2D(
*scene.frames[frame_idx].ego_status.ego_velocity
),
rear_axle_acceleration_2d=StateVector2D(
*scene.frames[frame_idx].ego_status.ego_acceleration
),
)
trans_traj = transform_trajectory(
future_traj, ego_state
)
interpolated_traj = get_trajectory_as_array(
trans_traj,
TrajectorySampling(num_poses=40, interval_length=0.1),
ego_state.time_point
)
rel_poses = absolute_to_relative_poses([StateSE2(*tmp) for tmp in
interpolated_traj[:, StateIndex.STATE_SE2]])
# skip the curr frame
final_traj = [pose.serialize() for pose in rel_poses[1:]]
final_traj = torch.tensor(final_traj)
#TODO:map
# map_api = scene.map_api
# ego_statuses = [frame.ego_status for frame in scene.frames]
# ego2globals = [frame.ego2global for frame in scene.frames]
# # Last one is the current frame
# ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose)
# # dense
# # dense_semantic_map = np.zeros(self.bev_semantic_frame, dtype=np.int64)
# # for layer, label in zip(self.dense_layers, self.dense_layers_labels):
# # entity_mask = self._compute_map_polygon_mask(map_api, ego_status_curr, [layer])
# # dense_semantic_map[entity_mask] = label
# # discrete
# # centerline_list
# map_dict = {'centerline': []}
# line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api,
# ego_status_curr,
# list(
# self.discrete_layers))
# centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings)
# for instance in centerline_list:
# map_dict['centerline'].append(np.array(instance.coords))
# vectors = []
# gt_labels = []
# gt_instance = []
# instance_list = map_dict['centerline']
# for instance in instance_list:
# vectors.append(LineString(np.array(instance)))
# for instance in vectors:
# gt_instance.append(instance)
# gt_labels.append(0)
# gt_semantic_mask = None
# gt_pv_semantic_mask = None
# gt_instance = LiDARInstanceLines(gt_instance, self.sample_dist, self.num_samples,
# self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2)
return {
#"gt_depth":?????????????
# "gt_bboxes_3d": gt_instance,
# "gt_labels_3d": gt_labels,
"trajectory": trajectory,
"agent_states": agent_states,
"agent_labels": agent_labels,
"bev_semantic_map": bev_semantic_map,
"interpolated_traj": final_traj
}
def _compute_agent_targets(self, annotations: Annotations) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Extracts 2D agent bounding boxes in ego coordinates
:param annotations: annotation dataclass
:return: tuple of bounding box values and labels (binary)
"""
max_agents = self._config.num_bounding_boxes
agent_states_list: List[npt.NDArray[np.float32]] = []
def _xy_in_lidar(x: float, y: float, config: Vadv2Config) -> bool:
return (config.lidar_min_x <= x <= config.lidar_max_x) and (
config.lidar_min_y <= y <= config.lidar_max_y
)
for box, name in zip(annotations.boxes, annotations.names):
box_x, box_y, box_heading, box_length, box_width = (
box[BoundingBoxIndex.X],
box[BoundingBoxIndex.Y],
box[BoundingBoxIndex.HEADING],
box[BoundingBoxIndex.LENGTH],
box[BoundingBoxIndex.WIDTH],
)
if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config):
agent_states_list.append(
np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32)
)
agents_states_arr = np.array(agent_states_list)
# filter num_instances nearest
agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32)
agent_labels = np.zeros(max_agents, dtype=bool)
if len(agents_states_arr) > 0:
distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1)
argsort = np.argsort(distances)[:max_agents]
# filter detections
agents_states_arr = agents_states_arr[argsort]
agent_states[: len(agents_states_arr)] = agents_states_arr
agent_labels[: len(agents_states_arr)] = True
return torch.tensor(agent_states), torch.tensor(agent_labels)
def _compute_bev_semantic_map(
self, annotations: Annotations, map_api: AbstractMap, ego_pose: StateSE2
) -> torch.Tensor:
"""
Creates sematic map in BEV
:param annotations: annotation dataclass
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:return: 2D torch tensor of semantic labels
"""
bev_semantic_map = np.zeros(self._config.bev_semantic_frame, dtype=np.int64)
for label, (entity_type, layers) in self._config.bev_semantic_classes.items():
if entity_type == "polygon":
entity_mask = self._compute_map_polygon_mask(map_api, ego_pose, layers)
elif entity_type == "linestring":
entity_mask = self._compute_map_linestring_mask(map_api, ego_pose, layers)
else:
entity_mask = self._compute_box_mask(annotations, layers)
bev_semantic_map[entity_mask] = label
return torch.Tensor(bev_semantic_map)
def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any:
"""
Transform shapely geometry in local coordinates of origin.
:param geometry: shapely geometry
:param origin: pose dataclass
:return: shapely geometry
"""
a = np.cos(origin.heading)
b = np.sin(origin.heading)
d = -np.sin(origin.heading)
e = np.cos(origin.heading)
xoff = -origin.x
yoff = -origin.y
translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
return rotated_geometry
def _coords_to_pixel(self, coords):
"""
Transform local coordinates in pixel indices of BEV map
:param coords: _description_
:return: _description_
"""
# NOTE: remove half in backward direction
pixel_center = np.array([[0, self.bev_pixel_width / 2.0]])
coords_idcs = (coords / self.bev_pixel_size) + pixel_center
return coords_idcs.astype(np.int32)
def _compute_map_linestrings(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
"""
Compute binary of linestring given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
"""
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self.radius, layers=layers
)
something = []
incoming_something = []
outcoming_something = []
for layer in layers:
for map_object in map_object_dict[layer]:
linestring: LineString = self._geometry_local_coords(
map_object.baseline_path.linestring, ego_pose
)
something.append(linestring)
for incoming_edge in map_object.incoming_edges:
incomingstring: LineString = self._geometry_local_coords(
incoming_edge.baseline_path.linestring, ego_pose
)
incoming_something.append(incomingstring)
for outgoing_edge in map_object.outgoing_edges:
outcomingstring: LineString = self._geometry_local_coords(
outgoing_edge.baseline_path.linestring, ego_pose
)
outcoming_something.append(outcomingstring)
# todo
points = np.array(linestring.coords).reshape((-1, 1, 2))
return something, incoming_something, outcoming_something
def union_centerline(self, centerline_list, incoming_list, outcoming_list):
pts_G = nx.DiGraph()
junction_pts_list = []
start_pt = np.array(centerline_list[0].coords).round(3)[0]
end_pt = np.array(centerline_list[-1].coords).round(3)[-1]
for centerline_geom in centerline_list:
centerline_pts = np.array(centerline_geom.coords).round(3)
start_pt = centerline_pts[0]
end_pt = centerline_pts[-1]
for idx, pts in enumerate(centerline_pts[:-1]):
pts_G.add_edge(tuple(centerline_pts[idx]), tuple(centerline_pts[idx + 1]))
valid_incoming_num = 0
for pred_geom in incoming_list:
valid_incoming_num += 1
pred_pt = np.array(pred_geom.coords).round(3)[-1]
pts_G.add_edge(tuple(pred_pt), tuple(start_pt))
valid_outgoing_num = 0
for succ_geom in outcoming_list:
valid_outgoing_num += 1
succ_pt = np.array(succ_geom.coords).round(3)[0]
pts_G.add_edge(tuple(end_pt), tuple(succ_pt))
roots = (v for v, d in pts_G.in_degree() if d == 0)
leaves = [v for v, d in pts_G.out_degree() if d == 0]
all_paths = []
for root in roots:
paths = nx.all_simple_paths(pts_G, root, leaves)
all_paths.extend(paths)
final_centerline_paths = []
for path in all_paths:
merged_line = LineString(path)
merged_line = merged_line.simplify(0.2, preserve_topology=True)
final_centerline_paths.append(merged_line)
return final_centerline_paths
# def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
# map_api = scene.map_api
# ego_statuses = [frame.ego_status for frame in scene.frames]
# ego2globals = [frame.ego2global for frame in scene.frames]
# # Last one is the current frame
# ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose)
#
# # dense
# # dense_semantic_map = np.zeros(self.bev_semantic_frame, dtype=np.int64)
# # for layer, label in zip(self.dense_layers, self.dense_layers_labels):
# # entity_mask = self._compute_map_polygon_mask(map_api, ego_status_curr, [layer])
# # dense_semantic_map[entity_mask] = label
#
# # discrete
# # centerline_list
# map_dict = {'centerline': []}
# line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api,
# ego_status_curr,
# list(
# self.discrete_layers))
# centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings)
# for instance in centerline_list:
# map_dict['centerline'].append(np.array(instance.coords))
#
# vectors = []
# gt_labels = []
# gt_instance = []
# instance_list = map_dict['centerline']
# for instance in instance_list:
# vectors.append(LineString(np.array(instance)))
# for instance in vectors:
# gt_instance.append(instance)
# gt_labels.append(0)
# gt_semantic_mask = None
# gt_pv_semantic_mask = None
# gt_instance = LiDARInstanceLines(gt_instance, self.sample_dist, self.num_samples,
# self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2)
#
# return {"dense_el": None,
# "gt_bboxes_3d": gt_instance,
# "gt_labels_3d": gt_labels}
def _compute_map_polygon_mask(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
"""
Compute binary mask given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
"""
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self._config.bev_radius, layers=layers
)
map_polygon_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8)
for layer in layers:
for map_object in map_object_dict[layer]:
polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose)
exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2))
exterior = self._coords_to_pixel(exterior)
cv2.fillPoly(map_polygon_mask, [exterior], color=255)
# OpenCV has origin on top-left corner
map_polygon_mask = np.rot90(map_polygon_mask)[::-1]
return map_polygon_mask > 0
def _compute_map_linestring_mask(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
"""
Compute binary of linestring given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
"""
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self._config.bev_radius, layers=layers
)
map_linestring_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8)
for layer in layers:
for map_object in map_object_dict[layer]:
linestring: LineString = self._geometry_local_coords(
map_object.baseline_path.linestring, ego_pose
)
points = np.array(linestring.coords).reshape((-1, 1, 2))
points = self._coords_to_pixel(points)
cv2.polylines(map_linestring_mask, [points], isClosed=False, color=255, thickness=2)
# OpenCV has origin on top-left corner
map_linestring_mask = np.rot90(map_linestring_mask)[::-1]
return map_linestring_mask > 0
def _compute_box_mask(
self, annotations: Annotations, layers: TrackedObjectType
) -> npt.NDArray[np.bool_]:
"""
Compute binary of bounding boxes in BEV space
:param annotations: annotation dataclass
:param layers: bounding box labels to include
:return: binary mask as numpy array
"""
box_polygon_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8)
for name_value, box_value in zip(annotations.names, annotations.boxes):
agent_type = tracked_object_types[name_value]
if agent_type in layers:
# box_value = (x, y, z, length, width, height, yaw) TODO: add intenum
x, y, heading = box_value[0], box_value[1], box_value[-1]
box_length, box_width, box_height = box_value[3], box_value[4], box_value[5]
agent_box = OrientedBox(StateSE2(x, y, heading), box_length, box_width, box_height)
exterior = np.array(agent_box.geometry.exterior.coords).reshape((-1, 1, 2))
exterior = self._coords_to_pixel(exterior)
cv2.fillPoly(box_polygon_mask, [exterior], color=255)
# OpenCV has origin on top-left corner
box_polygon_mask = np.rot90(box_polygon_mask)[::-1]
return box_polygon_mask > 0
@staticmethod
def _query_map_objects(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> List[MapObject]:
"""
Queries map objects
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: list of map objects
"""
# query map api with interesting layers
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self, layers=layers
)
map_objects: List[MapObject] = []
for layer in layers:
map_objects += map_object_dict[layer]
return map_objects
@staticmethod
def _geometry_local_coords(geometry: Any, origin: StateSE2) -> Any:
"""
Transform shapely geometry in local coordinates of origin.
:param geometry: shapely geometry
:param origin: pose dataclass
:return: shapely geometry
"""
a = np.cos(origin.heading)
b = np.sin(origin.heading)
d = -np.sin(origin.heading)
e = np.cos(origin.heading)
xoff = -origin.x
yoff = -origin.y
translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
return rotated_geometry
def _coords_to_pixel(self, coords):
"""
Transform local coordinates in pixel indices of BEV map
:param coords: _description_
:return: _description_
"""
# NOTE: remove half in backward direction
pixel_center = np.array([[0, self._config.bev_pixel_width / 2.0]])
coords_idcs = (coords / self._config.bev_pixel_size) + pixel_center
return coords_idcs.astype(np.int32)
class BoundingBox2DIndex(IntEnum):
_X = 0
_Y = 1
_HEADING = 2
_LENGTH = 3
_WIDTH = 4
@classmethod
def size(cls):
valid_attributes = [
attribute
for attribute in dir(cls)
if attribute.startswith("_")
and not attribute.startswith("__")
and not callable(getattr(cls, attribute))
]
return len(valid_attributes)
@classmethod
@property
def X(cls):
return cls._X
@classmethod
@property
def Y(cls):
return cls._Y
@classmethod
@property
def HEADING(cls):
return cls._HEADING
@classmethod
@property
def LENGTH(cls):
return cls._LENGTH
@classmethod
@property
def WIDTH(cls):
return cls._WIDTH
@classmethod
@property
def POINT(cls):
# assumes X, Y have subsequent indices
return slice(cls._X, cls._Y + 1)
@classmethod
@property
def STATE_SE2(cls):
# assumes X, Y, HEADING have subsequent indices
return slice(cls._X, cls._HEADING + 1)