|
from enum import IntEnum |
|
from typing import Any, Dict, List, Tuple |
|
|
|
import cv2 |
|
import numpy as np |
|
import numpy.typing as npt |
|
import torch |
|
from nuplan.common.actor_state.ego_state import EgoState |
|
from nuplan.common.actor_state.oriented_box import OrientedBox |
|
from nuplan.common.actor_state.state_representation import StateSE2, TimePoint, StateVector2D |
|
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType |
|
from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters |
|
from nuplan.common.geometry.convert import absolute_to_relative_poses |
|
from nuplan.common.maps.abstract_map import AbstractMap, SemanticMapLayer, MapObject |
|
from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling |
|
from shapely import affinity |
|
from shapely.geometry import Polygon, LineString |
|
from torchvision import transforms |
|
|
|
from navsim.agents.dm.dm_config import DMConfig |
|
from navsim.agents.vadv2.vadv2_config import Vadv2Config |
|
from navsim.common.dataclasses import AgentInput, Scene, Annotations |
|
from navsim.common.enums import BoundingBoxIndex |
|
from navsim.evaluate.pdm_score import transform_trajectory, get_trajectory_as_array |
|
from navsim.planning.scenario_builder.navsim_scenario_utils import tracked_object_types |
|
from navsim.planning.simulation.planner.pdm_planner.utils.pdm_enums import StateIndex |
|
from navsim.planning.training.abstract_feature_target_builder import ( |
|
AbstractFeatureBuilder, |
|
AbstractTargetBuilder, |
|
) |
|
|
|
|
|
class DMFeatureBuilder(AbstractFeatureBuilder): |
|
def __init__(self, config: DMConfig): |
|
self._config = config |
|
|
|
def get_unique_name(self) -> str: |
|
"""Inherited, see superclass.""" |
|
return "dm_feature" |
|
|
|
def compute_features(self, agent_input: AgentInput) -> Dict[str, torch.Tensor]: |
|
"""Inherited, see superclass.""" |
|
features = {} |
|
|
|
features["camera_feature"] = self._get_camera_feature(agent_input) |
|
if self._config.use_back_view: |
|
features["camera_feature_back"] = self._get_camera_feature_back(agent_input) |
|
|
|
sensor2lidar_rotation, sensor2lidar_translation, intrinsics = [], [], [] |
|
|
|
|
|
|
|
camera_timestamp = [agent_input.cameras[-1]] |
|
for camera in camera_timestamp: |
|
sensor2lidar_rotation_tmp, sensor2lidar_translation_tmp, intrinsics_tmp = [], [], [] |
|
flag = False |
|
for cam_k, cam in camera.to_dict().items(): |
|
features[f"intrinsics_{cam_k}"] = cam.intrinsics |
|
features[f"sensor2lidar_rotation_{cam_k}"] = cam.sensor2lidar_rotation |
|
features[f"sensor2lidar_translation_{cam_k}"] = cam.sensor2lidar_translation |
|
if cam.intrinsics is not None and np.any(cam.intrinsics): |
|
flag = True |
|
features[f"intrinsics_{cam_k}"] = torch.tensor(features[f"intrinsics_{cam_k}"]) |
|
features[f"sensor2lidar_rotation_{cam_k}"] = torch.tensor( |
|
features[f"sensor2lidar_rotation_{cam_k}"]) |
|
features[f"sensor2lidar_translation_{cam_k}"] = torch.tensor( |
|
features[f"sensor2lidar_translation_{cam_k}"]) |
|
|
|
sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_l0"]) |
|
sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_f0"]) |
|
sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_r0"]) |
|
|
|
sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_l0"]) |
|
sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_f0"]) |
|
sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_r0"]) |
|
|
|
intrinsics_tmp.append(features["intrinsics_cam_l0"]) |
|
intrinsics_tmp.append(features["intrinsics_cam_f0"]) |
|
intrinsics_tmp.append(features["intrinsics_cam_r0"]) |
|
|
|
if flag: |
|
sensor2lidar_rotation = sensor2lidar_rotation_tmp |
|
sensor2lidar_translation = sensor2lidar_translation_tmp |
|
intrinsics = intrinsics_tmp |
|
|
|
|
|
|
|
else: |
|
sensor2lidar_rotation.append(None) |
|
sensor2lidar_translation.append(None) |
|
intrinsics.append(None) |
|
features["sensor2lidar_rotation"] = sensor2lidar_rotation |
|
features["sensor2lidar_translation"] = sensor2lidar_translation |
|
features["intrinsics"] = intrinsics |
|
|
|
ego_status_list = [] |
|
for i in range(self._config.num_ego_status): |
|
|
|
|
|
|
|
|
|
idx = - (i + 1) |
|
ego_status_list += [ |
|
torch.tensor(agent_input.ego_statuses[idx].driving_command, dtype=torch.float32), |
|
torch.tensor(agent_input.ego_statuses[idx].ego_velocity, dtype=torch.float32), |
|
torch.tensor(agent_input.ego_statuses[idx].ego_acceleration, dtype=torch.float32), |
|
] |
|
|
|
features["status_feature"] = torch.concatenate( |
|
ego_status_list |
|
) |
|
features["history_waypoints"] = torch.concatenate( |
|
[torch.tensor(agent_input.ego_statuses[-2].ego_pose, dtype=torch.float32)[None], |
|
torch.tensor(agent_input.ego_statuses[-1].ego_pose, dtype=torch.float32)[None]], |
|
dim=0) |
|
|
|
return features |
|
|
|
def _get_camera_feature(self, agent_input: AgentInput) -> torch.Tensor: |
|
""" |
|
Extract stitched camera from AgentInput |
|
:param agent_input: input dataclass |
|
:return: stitched front view image as torch tensor |
|
""" |
|
|
|
|
|
cameras = [agent_input.cameras[-1]] |
|
image_list = [] |
|
for camera in cameras: |
|
image = camera.cam_l0.image |
|
if image is not None and image.size > 0 and np.any(image): |
|
l0 = camera.cam_l0.image[28:-28, 416:-416] |
|
f0 = camera.cam_f0.image[28:-28] |
|
r0 = camera.cam_r0.image[28:-28, 416:-416] |
|
|
|
|
|
|
|
|
|
|
|
|
|
stitched_image = np.concatenate([l0, f0, r0], axis=1) |
|
|
|
|
|
resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height)) |
|
tensor_image = transforms.ToTensor()(resized_image) |
|
|
|
image_list.append(tensor_image) |
|
else: |
|
|
|
image_list.append(None) |
|
|
|
return image_list |
|
|
|
def _get_camera_feature_back(self, agent_input: AgentInput) -> torch.Tensor: |
|
cameras = agent_input.cameras[-1] |
|
|
|
|
|
l2 = cameras.cam_l2.image[28:-28, 416:-416] |
|
b0 = cameras.cam_b0.image[28:-28] |
|
r2 = cameras.cam_r2.image[28:-28, 416:-416] |
|
|
|
|
|
stitched_image = np.concatenate([l2, b0, r2], axis=1) |
|
resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height)) |
|
tensor_image = transforms.ToTensor()(resized_image) |
|
|
|
return tensor_image |
|
|
|
|
|
class DMTargetBuilder(AbstractTargetBuilder): |
|
def __init__(self, config: DMConfig): |
|
self._config = config |
|
self.v_params = get_pacifica_parameters() |
|
|
|
def get_unique_name(self) -> str: |
|
"""Inherited, see superclass.""" |
|
return "dm_target" |
|
|
|
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]: |
|
"""Inherited, see superclass.""" |
|
future_traj = scene.get_future_trajectory( |
|
num_trajectory_frames=self._config.trajectory_sampling.num_poses |
|
) |
|
trajectory = torch.tensor(future_traj.poses) |
|
frame_idx = scene.scene_metadata.num_history_frames - 1 |
|
annotations = scene.frames[frame_idx].annotations |
|
agent_states, agent_labels = self._compute_agent_targets(annotations) |
|
ego_state = EgoState.build_from_rear_axle( |
|
StateSE2(*scene.frames[frame_idx].ego_status.ego_pose), |
|
tire_steering_angle=0.0, |
|
vehicle_parameters=self.v_params, |
|
time_point=TimePoint(scene.frames[frame_idx].timestamp), |
|
rear_axle_velocity_2d=StateVector2D( |
|
*scene.frames[frame_idx].ego_status.ego_velocity |
|
), |
|
rear_axle_acceleration_2d=StateVector2D( |
|
*scene.frames[frame_idx].ego_status.ego_acceleration |
|
), |
|
) |
|
trans_traj = transform_trajectory( |
|
future_traj, ego_state |
|
) |
|
interpolated_traj = get_trajectory_as_array( |
|
trans_traj, |
|
TrajectorySampling(num_poses=40, interval_length=0.1), |
|
ego_state.time_point |
|
) |
|
rel_poses = absolute_to_relative_poses([StateSE2(*tmp) for tmp in |
|
interpolated_traj[:, StateIndex.STATE_SE2]]) |
|
|
|
final_traj = [pose.serialize() for pose in rel_poses[1:]] |
|
final_traj = torch.tensor(final_traj) |
|
|
|
return { |
|
"trajectory": trajectory, |
|
"agent_states": agent_states, |
|
"agent_labels": agent_labels, |
|
"interpolated_traj": final_traj |
|
} |
|
|
|
def _compute_agent_targets(self, annotations: Annotations) -> Tuple[torch.Tensor, torch.Tensor]: |
|
""" |
|
Extracts 2D agent bounding boxes in ego coordinates |
|
:param annotations: annotation dataclass |
|
:return: tuple of bounding box values and labels (binary) |
|
""" |
|
|
|
max_agents = self._config.num_bounding_boxes |
|
agent_states_list: List[npt.NDArray[np.float32]] = [] |
|
|
|
def _xy_in_lidar(x: float, y: float, config: Vadv2Config) -> bool: |
|
return (config.lidar_min_x <= x <= config.lidar_max_x) and ( |
|
config.lidar_min_y <= y <= config.lidar_max_y |
|
) |
|
|
|
for box, name in zip(annotations.boxes, annotations.names): |
|
box_x, box_y, box_heading, box_length, box_width = ( |
|
box[BoundingBoxIndex.X], |
|
box[BoundingBoxIndex.Y], |
|
box[BoundingBoxIndex.HEADING], |
|
box[BoundingBoxIndex.LENGTH], |
|
box[BoundingBoxIndex.WIDTH], |
|
) |
|
|
|
if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config): |
|
agent_states_list.append( |
|
np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32) |
|
) |
|
|
|
agents_states_arr = np.array(agent_states_list) |
|
|
|
|
|
agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32) |
|
agent_labels = np.zeros(max_agents, dtype=bool) |
|
|
|
if len(agents_states_arr) > 0: |
|
distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1) |
|
argsort = np.argsort(distances)[:max_agents] |
|
|
|
|
|
agents_states_arr = agents_states_arr[argsort] |
|
agent_states[: len(agents_states_arr)] = agents_states_arr |
|
agent_labels[: len(agents_states_arr)] = True |
|
|
|
return torch.tensor(agent_states), torch.tensor(agent_labels) |
|
|
|
class BoundingBox2DIndex(IntEnum): |
|
_X = 0 |
|
_Y = 1 |
|
_HEADING = 2 |
|
_LENGTH = 3 |
|
_WIDTH = 4 |
|
|
|
@classmethod |
|
def size(cls): |
|
valid_attributes = [ |
|
attribute |
|
for attribute in dir(cls) |
|
if attribute.startswith("_") |
|
and not attribute.startswith("__") |
|
and not callable(getattr(cls, attribute)) |
|
] |
|
return len(valid_attributes) |
|
|
|
@classmethod |
|
@property |
|
def X(cls): |
|
return cls._X |
|
|
|
@classmethod |
|
@property |
|
def Y(cls): |
|
return cls._Y |
|
|
|
@classmethod |
|
@property |
|
def HEADING(cls): |
|
return cls._HEADING |
|
|
|
@classmethod |
|
@property |
|
def LENGTH(cls): |
|
return cls._LENGTH |
|
|
|
@classmethod |
|
@property |
|
def WIDTH(cls): |
|
return cls._WIDTH |
|
|
|
@classmethod |
|
@property |
|
def POINT(cls): |
|
|
|
return slice(cls._X, cls._Y + 1) |
|
|
|
@classmethod |
|
@property |
|
def STATE_SE2(cls): |
|
|
|
return slice(cls._X, cls._HEADING + 1) |
|
|