from enum import IntEnum from typing import Any, Dict, List, Tuple import cv2 import numpy as np import numpy.typing as npt import torch from nuplan.common.actor_state.ego_state import EgoState from nuplan.common.actor_state.oriented_box import OrientedBox from nuplan.common.actor_state.state_representation import StateSE2, TimePoint, StateVector2D from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters from nuplan.common.geometry.convert import absolute_to_relative_poses from nuplan.common.maps.abstract_map import AbstractMap, SemanticMapLayer, MapObject from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling from shapely import affinity from shapely.geometry import Polygon, LineString from torchvision import transforms from navsim.agents.dm.dm_config import DMConfig from navsim.agents.vadv2.vadv2_config import Vadv2Config from navsim.common.dataclasses import AgentInput, Scene, Annotations from navsim.common.enums import BoundingBoxIndex from navsim.evaluate.pdm_score import transform_trajectory, get_trajectory_as_array from navsim.planning.scenario_builder.navsim_scenario_utils import tracked_object_types from navsim.planning.simulation.planner.pdm_planner.utils.pdm_enums import StateIndex from navsim.planning.training.abstract_feature_target_builder import ( AbstractFeatureBuilder, AbstractTargetBuilder, ) class DMFeatureBuilder(AbstractFeatureBuilder): def __init__(self, config: DMConfig): self._config = config def get_unique_name(self) -> str: """Inherited, see superclass.""" return "dm_feature" def compute_features(self, agent_input: AgentInput) -> Dict[str, torch.Tensor]: """Inherited, see superclass.""" features = {} features["camera_feature"] = self._get_camera_feature(agent_input) if self._config.use_back_view: features["camera_feature_back"] = self._get_camera_feature_back(agent_input) sensor2lidar_rotation, sensor2lidar_translation, intrinsics = [], [], [] # agent_input.cameras[-1] # camera_timestamp = [agent_input.cameras[-2], agent_input.cameras[-1]] camera_timestamp = [agent_input.cameras[-1]] for camera in camera_timestamp: sensor2lidar_rotation_tmp, sensor2lidar_translation_tmp, intrinsics_tmp = [], [], [] flag = False for cam_k, cam in camera.to_dict().items(): features[f"intrinsics_{cam_k}"] = cam.intrinsics features[f"sensor2lidar_rotation_{cam_k}"] = cam.sensor2lidar_rotation features[f"sensor2lidar_translation_{cam_k}"] = cam.sensor2lidar_translation if cam.intrinsics is not None and np.any(cam.intrinsics): flag = True features[f"intrinsics_{cam_k}"] = torch.tensor(features[f"intrinsics_{cam_k}"]) features[f"sensor2lidar_rotation_{cam_k}"] = torch.tensor( features[f"sensor2lidar_rotation_{cam_k}"]) features[f"sensor2lidar_translation_{cam_k}"] = torch.tensor( features[f"sensor2lidar_translation_{cam_k}"]) sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_l0"]) sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_f0"]) sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_r0"]) sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_l0"]) sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_f0"]) sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_r0"]) intrinsics_tmp.append(features["intrinsics_cam_l0"]) intrinsics_tmp.append(features["intrinsics_cam_f0"]) intrinsics_tmp.append(features["intrinsics_cam_r0"]) if flag: sensor2lidar_rotation = sensor2lidar_rotation_tmp sensor2lidar_translation = sensor2lidar_translation_tmp intrinsics = intrinsics_tmp # sensor2lidar_rotation.append(torch.stack(sensor2lidar_rotation_tmp)) # sensor2lidar_translation.append(torch.stack(sensor2lidar_translation_tmp)) # intrinsics.append(torch.stack(intrinsics_tmp)) else: sensor2lidar_rotation.append(None) sensor2lidar_translation.append(None) intrinsics.append(None) features["sensor2lidar_rotation"] = sensor2lidar_rotation features["sensor2lidar_translation"] = sensor2lidar_translation features["intrinsics"] = intrinsics ego_status_list = [] for i in range(self._config.num_ego_status): # i=0: idx=-1 # i=1: idx=-2 # i=2: idx=-3 # i=3: idx=-4 idx = - (i + 1) ego_status_list += [ torch.tensor(agent_input.ego_statuses[idx].driving_command, dtype=torch.float32), torch.tensor(agent_input.ego_statuses[idx].ego_velocity, dtype=torch.float32), torch.tensor(agent_input.ego_statuses[idx].ego_acceleration, dtype=torch.float32), ] features["status_feature"] = torch.concatenate( ego_status_list ) features["history_waypoints"] = torch.concatenate( [torch.tensor(agent_input.ego_statuses[-2].ego_pose, dtype=torch.float32)[None], torch.tensor(agent_input.ego_statuses[-1].ego_pose, dtype=torch.float32)[None]], dim=0) return features def _get_camera_feature(self, agent_input: AgentInput) -> torch.Tensor: """ Extract stitched camera from AgentInput :param agent_input: input dataclass :return: stitched front view image as torch tensor """ # print(len(agent_input.cameras), len(agent_input.timestamps)) # print(agent_input.cameras[-2], agent_input.cameras[-1]) cameras = [agent_input.cameras[-1]] image_list = [] for camera in cameras: image = camera.cam_l0.image if image is not None and image.size > 0 and np.any(image): l0 = camera.cam_l0.image[28:-28, 416:-416] f0 = camera.cam_f0.image[28:-28] r0 = camera.cam_r0.image[28:-28, 416:-416] # Crop to ensure 4:1 aspect ratio # l0 = cameras.cam_l0.image[28:-28, 416:-416] # f0 = cameras.cam_f0.image[28:-28] # r0 = cameras.cam_r0.image[28:-28, 416:-416] # stitch l0, f0, r0 images stitched_image = np.concatenate([l0, f0, r0], axis=1) # assert (self._config.camera_width==) # print(self._config.camera_width, self._config.camera_height) resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height)) tensor_image = transforms.ToTensor()(resized_image) # print(tensor_image.shape) image_list.append(tensor_image) else: # if camera.cam_l0.image.all() == None: image_list.append(None) return image_list def _get_camera_feature_back(self, agent_input: AgentInput) -> torch.Tensor: cameras = agent_input.cameras[-1] # Crop to ensure 4:1 aspect ratio l2 = cameras.cam_l2.image[28:-28, 416:-416] b0 = cameras.cam_b0.image[28:-28] r2 = cameras.cam_r2.image[28:-28, 416:-416] # stitch l0, f0, r0 images stitched_image = np.concatenate([l2, b0, r2], axis=1) resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height)) tensor_image = transforms.ToTensor()(resized_image) return tensor_image class DMTargetBuilder(AbstractTargetBuilder): def __init__(self, config: DMConfig): self._config = config self.v_params = get_pacifica_parameters() def get_unique_name(self) -> str: """Inherited, see superclass.""" return "dm_target" def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]: """Inherited, see superclass.""" future_traj = scene.get_future_trajectory( num_trajectory_frames=self._config.trajectory_sampling.num_poses ) trajectory = torch.tensor(future_traj.poses) frame_idx = scene.scene_metadata.num_history_frames - 1 annotations = scene.frames[frame_idx].annotations agent_states, agent_labels = self._compute_agent_targets(annotations) ego_state = EgoState.build_from_rear_axle( StateSE2(*scene.frames[frame_idx].ego_status.ego_pose), tire_steering_angle=0.0, vehicle_parameters=self.v_params, time_point=TimePoint(scene.frames[frame_idx].timestamp), rear_axle_velocity_2d=StateVector2D( *scene.frames[frame_idx].ego_status.ego_velocity ), rear_axle_acceleration_2d=StateVector2D( *scene.frames[frame_idx].ego_status.ego_acceleration ), ) trans_traj = transform_trajectory( future_traj, ego_state ) interpolated_traj = get_trajectory_as_array( trans_traj, TrajectorySampling(num_poses=40, interval_length=0.1), ego_state.time_point ) rel_poses = absolute_to_relative_poses([StateSE2(*tmp) for tmp in interpolated_traj[:, StateIndex.STATE_SE2]]) # skip the curr frame final_traj = [pose.serialize() for pose in rel_poses[1:]] final_traj = torch.tensor(final_traj) return { "trajectory": trajectory, "agent_states": agent_states, "agent_labels": agent_labels, "interpolated_traj": final_traj } def _compute_agent_targets(self, annotations: Annotations) -> Tuple[torch.Tensor, torch.Tensor]: """ Extracts 2D agent bounding boxes in ego coordinates :param annotations: annotation dataclass :return: tuple of bounding box values and labels (binary) """ max_agents = self._config.num_bounding_boxes agent_states_list: List[npt.NDArray[np.float32]] = [] def _xy_in_lidar(x: float, y: float, config: Vadv2Config) -> bool: return (config.lidar_min_x <= x <= config.lidar_max_x) and ( config.lidar_min_y <= y <= config.lidar_max_y ) for box, name in zip(annotations.boxes, annotations.names): box_x, box_y, box_heading, box_length, box_width = ( box[BoundingBoxIndex.X], box[BoundingBoxIndex.Y], box[BoundingBoxIndex.HEADING], box[BoundingBoxIndex.LENGTH], box[BoundingBoxIndex.WIDTH], ) if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config): agent_states_list.append( np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32) ) agents_states_arr = np.array(agent_states_list) # filter num_instances nearest agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32) agent_labels = np.zeros(max_agents, dtype=bool) if len(agents_states_arr) > 0: distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1) argsort = np.argsort(distances)[:max_agents] # filter detections agents_states_arr = agents_states_arr[argsort] agent_states[: len(agents_states_arr)] = agents_states_arr agent_labels[: len(agents_states_arr)] = True return torch.tensor(agent_states), torch.tensor(agent_labels) class BoundingBox2DIndex(IntEnum): _X = 0 _Y = 1 _HEADING = 2 _LENGTH = 3 _WIDTH = 4 @classmethod def size(cls): valid_attributes = [ attribute for attribute in dir(cls) if attribute.startswith("_") and not attribute.startswith("__") and not callable(getattr(cls, attribute)) ] return len(valid_attributes) @classmethod @property def X(cls): return cls._X @classmethod @property def Y(cls): return cls._Y @classmethod @property def HEADING(cls): return cls._HEADING @classmethod @property def LENGTH(cls): return cls._LENGTH @classmethod @property def WIDTH(cls): return cls._WIDTH @classmethod @property def POINT(cls): # assumes X, Y have subsequent indices return slice(cls._X, cls._Y + 1) @classmethod @property def STATE_SE2(cls): # assumes X, Y, HEADING have subsequent indices return slice(cls._X, cls._HEADING + 1)