from __future__ import annotations import io import os from pathlib import Path import numpy as np import numpy.typing as npt from PIL import Image from navsim.planning.simulation.planner.pdm_planner.utils.pdm_geometry_utils import ( convert_absolute_to_relative_se2_array, ) from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling from nuplan.common.actor_state.state_representation import StateSE2 from nuplan.common.maps.abstract_map import AbstractMap from nuplan.common.maps.nuplan_map.map_factory import get_maps_api from nuplan.database.maps_db.gpkg_mapsdb import MAP_LOCATIONS from nuplan.database.utils.pointclouds.lidar import LidarPointCloud from pyquaternion import Quaternion from dataclasses import dataclass, asdict from typing import Any, Dict, List, Optional, Tuple, BinaryIO, Union NAVSIM_INTERVAL_LENGTH: float = 0.5 OPENSCENE_DATA_ROOT = os.environ.get("OPENSCENE_DATA_ROOT") NUPLAN_MAPS_ROOT = os.environ.get("NUPLAN_MAPS_ROOT") @dataclass class Camera: image: Optional[npt.NDArray[np.float32]] = None sensor2lidar_rotation: Optional[npt.NDArray[np.float32]] = None sensor2lidar_translation: Optional[npt.NDArray[np.float32]] = None intrinsics: Optional[npt.NDArray[np.float32]] = None distortion: Optional[npt.NDArray[np.float32]] = None @dataclass class Cameras: cam_f0: Camera cam_l0: Camera cam_l1: Camera cam_l2: Camera cam_r0: Camera cam_r1: Camera cam_r2: Camera cam_b0: Camera def to_dict(self): result = { 'cam_f0': self.cam_f0, 'cam_l0': self.cam_l0, 'cam_l1': self.cam_l1, 'cam_l2': self.cam_l2, 'cam_r0': self.cam_r0, 'cam_r1': self.cam_r1, 'cam_r2': self.cam_r2, 'cam_b0': self.cam_b0 } return result @classmethod def from_camera_dict( cls, sensor_blobs_path: Path, camera_dict: Dict[str, Any], sensor_names: List[str], ) -> Cameras: data_dict: Dict[str, Camera] = {} for camera_name in camera_dict.keys(): camera_identifier = camera_name.lower() if camera_identifier in sensor_names: image_path = sensor_blobs_path / camera_dict[camera_name]["data_path"] data_dict[camera_identifier] = Camera( image=np.array(Image.open(image_path)), sensor2lidar_rotation=camera_dict[camera_name]["sensor2lidar_rotation"], sensor2lidar_translation=camera_dict[camera_name]["sensor2lidar_translation"], intrinsics=camera_dict[camera_name]["cam_intrinsic"], distortion=camera_dict[camera_name]["distortion"], ) else: data_dict[camera_identifier] = Camera() # empty camera return Cameras( cam_f0=data_dict["cam_f0"], cam_l0=data_dict["cam_l0"], cam_l1=data_dict["cam_l1"], cam_l2=data_dict["cam_l2"], cam_r0=data_dict["cam_r0"], cam_r1=data_dict["cam_r1"], cam_r2=data_dict["cam_r2"], cam_b0=data_dict["cam_b0"], ) @dataclass class Lidar: # NOTE: # merged lidar point cloud as (6,n) float32 array with n points # first axis: (x, y, z, intensity, ring, lidar_id), see LidarIndex lidar_pc: Optional[npt.NDArray[np.float32]] = None @staticmethod def _load_bytes(lidar_path: Path) -> BinaryIO: with open(lidar_path, "rb") as fp: return io.BytesIO(fp.read()) @classmethod def from_paths( cls, sensor_blobs_path: Path, lidar_path: Path, sensor_names: List[str], ) -> Lidar: # NOTE: this could be extended to load specific LiDARs in the merged pc if "lidar_pc" in sensor_names: global_lidar_path = sensor_blobs_path / lidar_path lidar_pc = LidarPointCloud.from_buffer(cls._load_bytes(global_lidar_path), "pcd").points return Lidar(lidar_pc) return Lidar() # empty lidar @dataclass class EgoStatus: ego_pose: npt.NDArray[np.float64] ego_velocity: npt.NDArray[np.float32] ego_acceleration: npt.NDArray[np.float32] driving_command: npt.NDArray[np.int] in_global_frame: bool = False # False for AgentInput @dataclass class AgentInput: ego_statuses: List[EgoStatus] cameras: List[Cameras] lidars: List[Lidar] timestamps: List[int] ego2globals: List[np.ndarray] @classmethod def from_scene_dict_list_with_gt_traj( cls, scene_dict_list: List[Dict], sensor_blobs_path: Path, num_history_frames: int, sensor_config: SensorConfig, ) -> Tuple[AgentInput, Trajectory]: agent_input = AgentInput.from_scene_dict_list( scene_dict_list, sensor_blobs_path, num_history_frames, sensor_config ) scene = Scene.from_scene_dict_list( scene_dict_list, sensor_blobs_path, num_history_frames, 10, sensor_config ) return agent_input, scene.get_future_trajectory(int(4 / 0.5)) @classmethod def from_scene_dict_list( cls, scene_dict_list: List[Dict], sensor_blobs_path: Path, num_history_frames: int, sensor_config: SensorConfig, ) -> AgentInput: assert len(scene_dict_list) > 0, "Scene list is empty!" global_ego_poses = [] for frame_idx in range(num_history_frames): ego_translation = scene_dict_list[frame_idx]["ego2global_translation"] ego_quaternion = Quaternion(*scene_dict_list[frame_idx]["ego2global_rotation"]) global_ego_pose = np.array( [ego_translation[0], ego_translation[1], ego_quaternion.yaw_pitch_roll[0]], dtype=np.float64, ) global_ego_poses.append(global_ego_pose) local_ego_poses = convert_absolute_to_relative_se2_array( StateSE2(*global_ego_poses[-1]), np.array(global_ego_poses, dtype=np.float64) ) ego_statuses: List[EgoStatus] = [] cameras: List[Cameras] = [] lidars: List[Lidar] = [] timestamps = [] ego2globals = [] for frame_idx in range(num_history_frames): ego_dynamic_state = scene_dict_list[frame_idx]["ego_dynamic_state"] ego_status = EgoStatus( ego_pose=np.array(local_ego_poses[frame_idx], dtype=np.float32), ego_velocity=np.array(ego_dynamic_state[:2], dtype=np.float32), ego_acceleration=np.array(ego_dynamic_state[2:], dtype=np.float32), driving_command=scene_dict_list[frame_idx]["driving_command"], ) ego_statuses.append(ego_status) sensor_names = sensor_config.get_sensors_at_iteration(frame_idx) cameras.append( Cameras.from_camera_dict( sensor_blobs_path=sensor_blobs_path, camera_dict=scene_dict_list[frame_idx]["cams"], sensor_names=sensor_names, ) ) lidars.append( Lidar.from_paths( sensor_blobs_path=sensor_blobs_path, lidar_path=Path(scene_dict_list[frame_idx]["lidar_path"]), sensor_names=sensor_names, ) ) ego2globals.append(scene_dict_list[frame_idx]['ego2global']) timestamps.append(scene_dict_list[frame_idx]['timestamp']) return AgentInput(ego_statuses, cameras, lidars, timestamps, ego2globals) @dataclass class Annotations: boxes: npt.NDArray[np.float32] names: List[str] velocity_3d: npt.NDArray[np.float32] instance_tokens: List[str] track_tokens: List[str] def __post_init__(self): annotation_lengths: Dict[str, int] = { attribute_name: len(attribute) for attribute_name, attribute in vars(self).items() } assert ( len(set(annotation_lengths.values())) == 1 ), f"Annotations expects all attributes to have equal length, but got {annotation_lengths}" @dataclass class Trajectory: poses: npt.NDArray[np.float32] # local coordinates trajectory_sampling: TrajectorySampling = TrajectorySampling( time_horizon=4, interval_length=0.5 ) def __post_init__(self): assert ( self.poses.ndim == 2 ), "Trajectory poses should have two dimensions for samples and poses." assert ( self.poses.shape[0] == self.trajectory_sampling.num_poses ), "Trajectory poses and sampling have unequal number of poses." assert self.poses.shape[1] == 3, "Trajectory requires (x, y, heading) at last dim." @dataclass class SceneMetadata: log_name: str scene_token: str map_name: str initial_token: str num_history_frames: int num_future_frames: int @dataclass class Frame: token: str timestamp: int roadblock_ids: List[str] traffic_lights: List[Tuple[str, bool]] annotations: Annotations ego_status: EgoStatus lidar: Lidar cameras: Cameras ego2global: np.ndarray @dataclass class Scene: # Ground truth information scene_metadata: SceneMetadata map_api: AbstractMap frames: List[Frame] def get_future_trajectory(self, num_trajectory_frames: Optional[int] = None) -> Trajectory: if num_trajectory_frames > 8: num_trajectory_frames = 8 if num_trajectory_frames is None: num_trajectory_frames = self.scene_metadata.num_future_frames start_frame_idx = self.scene_metadata.num_history_frames - 1 global_ego_poses = [] for frame_idx in range(start_frame_idx, start_frame_idx + num_trajectory_frames + 1): global_ego_poses.append(self.frames[frame_idx].ego_status.ego_pose) local_ego_poses = convert_absolute_to_relative_se2_array( StateSE2(*global_ego_poses[0]), np.array(global_ego_poses[1:], dtype=np.float64) ) return Trajectory( local_ego_poses, TrajectorySampling( num_poses=len(local_ego_poses), interval_length=NAVSIM_INTERVAL_LENGTH, ), ) def get_history_trajectory(self, num_trajectory_frames: Optional[int] = None) -> Trajectory: if num_trajectory_frames is None: num_trajectory_frames = self.scene_metadata.num_history_frames global_ego_poses = [] for frame_idx in range(num_trajectory_frames): global_ego_poses.append(self.frames[frame_idx].ego_status.ego_pose) origin = StateSE2(*global_ego_poses[-1]) local_ego_poses = convert_absolute_to_relative_se2_array( origin, np.array(global_ego_poses, dtype=np.float64) ) return Trajectory( local_ego_poses, TrajectorySampling( num_poses=len(local_ego_poses), interval_length=NAVSIM_INTERVAL_LENGTH, ), ) def get_agent_input(self) -> AgentInput: local_ego_poses = self.get_history_trajectory().poses ego_statuses: List[EgoStatus] = [] cameras: List[Cameras] = [] lidars: List[Lidar] = [] ego2globals, timestamps = [], [] for frame_idx in range(self.scene_metadata.num_history_frames): frame_ego_status = self.frames[frame_idx].ego_status ego_statuses.append( EgoStatus( ego_pose=local_ego_poses[frame_idx], ego_velocity=frame_ego_status.ego_velocity, ego_acceleration=frame_ego_status.ego_acceleration, driving_command=frame_ego_status.driving_command, ) ) cameras.append(self.frames[frame_idx].cameras) lidars.append(self.frames[frame_idx].lidar) ego2globals.append(self.frames[frame_idx].ego2global) timestamps.append(self.frames[frame_idx].timestamp) return AgentInput(ego_statuses, cameras, lidars, timestamps, ego2globals) @classmethod def _build_map_api(cls, map_name: str) -> AbstractMap: assert ( map_name in MAP_LOCATIONS ), f"The map name {map_name} is invalid, must be in {MAP_LOCATIONS}" return get_maps_api(NUPLAN_MAPS_ROOT, "nuplan-maps-v1.0", map_name) @classmethod def _build_annotations( cls, scene_frame: Dict, ) -> Annotations: return Annotations( boxes=scene_frame["anns"]["gt_boxes"], names=scene_frame["anns"]["gt_names"], velocity_3d=scene_frame["anns"]["gt_velocity_3d"], instance_tokens=scene_frame["anns"]["instance_tokens"], track_tokens=scene_frame["anns"]["track_tokens"], ) @classmethod def _build_ego_status( cls, scene_frame: Dict, ) -> EgoStatus: ego_translation = scene_frame["ego2global_translation"] ego_quaternion = Quaternion(*scene_frame["ego2global_rotation"]) global_ego_pose = np.array( [ego_translation[0], ego_translation[1], ego_quaternion.yaw_pitch_roll[0]], dtype=np.float64, ) ego_dynamic_state = scene_frame["ego_dynamic_state"] return EgoStatus( ego_pose=global_ego_pose, ego_velocity=np.array(ego_dynamic_state[:2], dtype=np.float32), ego_acceleration=np.array(ego_dynamic_state[2:], dtype=np.float32), driving_command=scene_frame["driving_command"], in_global_frame=True, ) @classmethod def from_scene_dict_list( cls, scene_dict_list: List[Dict], sensor_blobs_path: Path, num_history_frames: int, num_future_frames: int, sensor_config: SensorConfig, ) -> Scene: assert len(scene_dict_list) >= 0, "Scene list is empty!" scene_metadata = SceneMetadata( log_name=scene_dict_list[num_history_frames - 1]["log_name"], scene_token=scene_dict_list[num_history_frames - 1]["scene_token"], map_name=scene_dict_list[num_history_frames - 1]["map_location"], initial_token=scene_dict_list[num_history_frames - 1]["token"], num_history_frames=num_history_frames, num_future_frames=num_future_frames, ) map_api = cls._build_map_api(scene_metadata.map_name) frames: List[Frame] = [] for frame_idx in range(len(scene_dict_list)): global_ego_status = cls._build_ego_status(scene_dict_list[frame_idx]) annotations = cls._build_annotations(scene_dict_list[frame_idx]) sensor_names = sensor_config.get_sensors_at_iteration(frame_idx) cameras = Cameras.from_camera_dict( sensor_blobs_path=sensor_blobs_path, camera_dict=scene_dict_list[frame_idx]["cams"], sensor_names=sensor_names, ) lidar = Lidar.from_paths( sensor_blobs_path=sensor_blobs_path, lidar_path=Path(scene_dict_list[frame_idx]["lidar_path"]), sensor_names=sensor_names, ) frame = Frame( token=scene_dict_list[frame_idx]["token"], timestamp=scene_dict_list[frame_idx]["timestamp"], roadblock_ids=scene_dict_list[frame_idx]["roadblock_ids"], traffic_lights=scene_dict_list[frame_idx]["traffic_lights"], annotations=annotations, ego_status=global_ego_status, lidar=lidar, cameras=cameras, ego2global=scene_dict_list[frame_idx]['ego2global'] ) frames.append(frame) return Scene(scene_metadata=scene_metadata, map_api=map_api, frames=frames) @dataclass class SceneFilter: num_history_frames: int = 4 num_future_frames: int = 10 frame_interval: Optional[int] = None has_route: bool = True max_scenes: Optional[int] = None log_names: Optional[List[str]] = None tokens: Optional[List[str]] = None # TODO: expand filter options def __post_init__(self): if self.frame_interval is None: self.frame_interval = self.num_frames assert ( self.num_history_frames >= 1 ), "SceneFilter: num_history_frames must greater equal one." assert ( self.num_future_frames >= 0 ), "SceneFilter: num_future_frames must greater equal zero." assert self.frame_interval >= 1, "SceneFilter: frame_interval must greater equal one." @property def num_frames(self) -> int: return self.num_history_frames + self.num_future_frames @dataclass class SensorConfig: # Config values of sensors are either # - bool: Whether to load history or not # - List[int]: For loading specific history steps cam_f0: Union[bool, List[int]] cam_l0: Union[bool, List[int]] cam_l1: Union[bool, List[int]] cam_l2: Union[bool, List[int]] cam_r0: Union[bool, List[int]] cam_r1: Union[bool, List[int]] cam_r2: Union[bool, List[int]] cam_b0: Union[bool, List[int]] lidar_pc: Union[bool, List[int]] def get_sensors_at_iteration(self, iteration: int) -> List[str]: sensors_at_iteration: List[str] = [] for sensor_name, sensor_include in asdict(self).items(): if isinstance(sensor_include, bool) and sensor_include: sensors_at_iteration.append(sensor_name) elif isinstance(sensor_include, list) and iteration in sensor_include: sensors_at_iteration.append(sensor_name) return sensors_at_iteration @classmethod def build_all_sensors(cls, include: Union[bool, List[int]] = True) -> SensorConfig: return SensorConfig( cam_f0=include, cam_l0=include, cam_l1=include, cam_l2=include, cam_r0=include, cam_r1=include, cam_r2=include, cam_b0=include, lidar_pc=include, ) @classmethod def build_cam_sensors(cls) -> SensorConfig: return SensorConfig( cam_f0=True, cam_l0=True, cam_l1=True, cam_l2=True, cam_r0=True, cam_r1=True, cam_r2=True, cam_b0=True, lidar_pc=False, ) @classmethod def build_mm_sensors(cls) -> SensorConfig: return SensorConfig( cam_f0=[3], cam_l0=[3], cam_l1=[3], cam_l2=[3], cam_r0=[3], cam_r1=[3], cam_r2=[3], cam_b0=[3], lidar_pc=[0, 1, 2, 3], ) @classmethod def build_no_sensors(cls) -> SensorConfig: return cls.build_all_sensors(include=False) @dataclass class PDMResults: no_at_fault_collisions: float drivable_area_compliance: float driving_direction_compliance: float ego_progress: float time_to_collision_within_bound: float comfort: float score: float