navsim_ours / navsim /agents /dm /dm_features.py
lkllkl's picture
Upload folder using huggingface_hub
da2e2ac verified
from enum import IntEnum
from typing import Any, Dict, List, Tuple
import cv2
import numpy as np
import numpy.typing as npt
import torch
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.common.actor_state.oriented_box import OrientedBox
from nuplan.common.actor_state.state_representation import StateSE2, TimePoint, StateVector2D
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters
from nuplan.common.geometry.convert import absolute_to_relative_poses
from nuplan.common.maps.abstract_map import AbstractMap, SemanticMapLayer, MapObject
from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling
from shapely import affinity
from shapely.geometry import Polygon, LineString
from torchvision import transforms
from navsim.agents.dm.dm_config import DMConfig
from navsim.agents.vadv2.vadv2_config import Vadv2Config
from navsim.common.dataclasses import AgentInput, Scene, Annotations
from navsim.common.enums import BoundingBoxIndex
from navsim.evaluate.pdm_score import transform_trajectory, get_trajectory_as_array
from navsim.planning.scenario_builder.navsim_scenario_utils import tracked_object_types
from navsim.planning.simulation.planner.pdm_planner.utils.pdm_enums import StateIndex
from navsim.planning.training.abstract_feature_target_builder import (
AbstractFeatureBuilder,
AbstractTargetBuilder,
)
class DMFeatureBuilder(AbstractFeatureBuilder):
def __init__(self, config: DMConfig):
self._config = config
def get_unique_name(self) -> str:
"""Inherited, see superclass."""
return "dm_feature"
def compute_features(self, agent_input: AgentInput) -> Dict[str, torch.Tensor]:
"""Inherited, see superclass."""
features = {}
features["camera_feature"] = self._get_camera_feature(agent_input)
if self._config.use_back_view:
features["camera_feature_back"] = self._get_camera_feature_back(agent_input)
sensor2lidar_rotation, sensor2lidar_translation, intrinsics = [], [], []
# agent_input.cameras[-1]
# camera_timestamp = [agent_input.cameras[-2], agent_input.cameras[-1]]
camera_timestamp = [agent_input.cameras[-1]]
for camera in camera_timestamp:
sensor2lidar_rotation_tmp, sensor2lidar_translation_tmp, intrinsics_tmp = [], [], []
flag = False
for cam_k, cam in camera.to_dict().items():
features[f"intrinsics_{cam_k}"] = cam.intrinsics
features[f"sensor2lidar_rotation_{cam_k}"] = cam.sensor2lidar_rotation
features[f"sensor2lidar_translation_{cam_k}"] = cam.sensor2lidar_translation
if cam.intrinsics is not None and np.any(cam.intrinsics):
flag = True
features[f"intrinsics_{cam_k}"] = torch.tensor(features[f"intrinsics_{cam_k}"])
features[f"sensor2lidar_rotation_{cam_k}"] = torch.tensor(
features[f"sensor2lidar_rotation_{cam_k}"])
features[f"sensor2lidar_translation_{cam_k}"] = torch.tensor(
features[f"sensor2lidar_translation_{cam_k}"])
sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_l0"])
sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_f0"])
sensor2lidar_rotation_tmp.append(features["sensor2lidar_rotation_cam_r0"])
sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_l0"])
sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_f0"])
sensor2lidar_translation_tmp.append(features["sensor2lidar_translation_cam_r0"])
intrinsics_tmp.append(features["intrinsics_cam_l0"])
intrinsics_tmp.append(features["intrinsics_cam_f0"])
intrinsics_tmp.append(features["intrinsics_cam_r0"])
if flag:
sensor2lidar_rotation = sensor2lidar_rotation_tmp
sensor2lidar_translation = sensor2lidar_translation_tmp
intrinsics = intrinsics_tmp
# sensor2lidar_rotation.append(torch.stack(sensor2lidar_rotation_tmp))
# sensor2lidar_translation.append(torch.stack(sensor2lidar_translation_tmp))
# intrinsics.append(torch.stack(intrinsics_tmp))
else:
sensor2lidar_rotation.append(None)
sensor2lidar_translation.append(None)
intrinsics.append(None)
features["sensor2lidar_rotation"] = sensor2lidar_rotation
features["sensor2lidar_translation"] = sensor2lidar_translation
features["intrinsics"] = intrinsics
ego_status_list = []
for i in range(self._config.num_ego_status):
# i=0: idx=-1
# i=1: idx=-2
# i=2: idx=-3
# i=3: idx=-4
idx = - (i + 1)
ego_status_list += [
torch.tensor(agent_input.ego_statuses[idx].driving_command, dtype=torch.float32),
torch.tensor(agent_input.ego_statuses[idx].ego_velocity, dtype=torch.float32),
torch.tensor(agent_input.ego_statuses[idx].ego_acceleration, dtype=torch.float32),
]
features["status_feature"] = torch.concatenate(
ego_status_list
)
features["history_waypoints"] = torch.concatenate(
[torch.tensor(agent_input.ego_statuses[-2].ego_pose, dtype=torch.float32)[None],
torch.tensor(agent_input.ego_statuses[-1].ego_pose, dtype=torch.float32)[None]],
dim=0)
return features
def _get_camera_feature(self, agent_input: AgentInput) -> torch.Tensor:
"""
Extract stitched camera from AgentInput
:param agent_input: input dataclass
:return: stitched front view image as torch tensor
"""
# print(len(agent_input.cameras), len(agent_input.timestamps))
# print(agent_input.cameras[-2], agent_input.cameras[-1])
cameras = [agent_input.cameras[-1]]
image_list = []
for camera in cameras:
image = camera.cam_l0.image
if image is not None and image.size > 0 and np.any(image):
l0 = camera.cam_l0.image[28:-28, 416:-416]
f0 = camera.cam_f0.image[28:-28]
r0 = camera.cam_r0.image[28:-28, 416:-416]
# Crop to ensure 4:1 aspect ratio
# l0 = cameras.cam_l0.image[28:-28, 416:-416]
# f0 = cameras.cam_f0.image[28:-28]
# r0 = cameras.cam_r0.image[28:-28, 416:-416]
# stitch l0, f0, r0 images
stitched_image = np.concatenate([l0, f0, r0], axis=1)
# assert (self._config.camera_width==)
# print(self._config.camera_width, self._config.camera_height)
resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height))
tensor_image = transforms.ToTensor()(resized_image)
# print(tensor_image.shape)
image_list.append(tensor_image)
else:
# if camera.cam_l0.image.all() == None:
image_list.append(None)
return image_list
def _get_camera_feature_back(self, agent_input: AgentInput) -> torch.Tensor:
cameras = agent_input.cameras[-1]
# Crop to ensure 4:1 aspect ratio
l2 = cameras.cam_l2.image[28:-28, 416:-416]
b0 = cameras.cam_b0.image[28:-28]
r2 = cameras.cam_r2.image[28:-28, 416:-416]
# stitch l0, f0, r0 images
stitched_image = np.concatenate([l2, b0, r2], axis=1)
resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height))
tensor_image = transforms.ToTensor()(resized_image)
return tensor_image
class DMTargetBuilder(AbstractTargetBuilder):
def __init__(self, config: DMConfig):
self._config = config
self.v_params = get_pacifica_parameters()
def get_unique_name(self) -> str:
"""Inherited, see superclass."""
return "dm_target"
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
"""Inherited, see superclass."""
future_traj = scene.get_future_trajectory(
num_trajectory_frames=self._config.trajectory_sampling.num_poses
)
trajectory = torch.tensor(future_traj.poses)
frame_idx = scene.scene_metadata.num_history_frames - 1
annotations = scene.frames[frame_idx].annotations
agent_states, agent_labels = self._compute_agent_targets(annotations)
ego_state = EgoState.build_from_rear_axle(
StateSE2(*scene.frames[frame_idx].ego_status.ego_pose),
tire_steering_angle=0.0,
vehicle_parameters=self.v_params,
time_point=TimePoint(scene.frames[frame_idx].timestamp),
rear_axle_velocity_2d=StateVector2D(
*scene.frames[frame_idx].ego_status.ego_velocity
),
rear_axle_acceleration_2d=StateVector2D(
*scene.frames[frame_idx].ego_status.ego_acceleration
),
)
trans_traj = transform_trajectory(
future_traj, ego_state
)
interpolated_traj = get_trajectory_as_array(
trans_traj,
TrajectorySampling(num_poses=40, interval_length=0.1),
ego_state.time_point
)
rel_poses = absolute_to_relative_poses([StateSE2(*tmp) for tmp in
interpolated_traj[:, StateIndex.STATE_SE2]])
# skip the curr frame
final_traj = [pose.serialize() for pose in rel_poses[1:]]
final_traj = torch.tensor(final_traj)
return {
"trajectory": trajectory,
"agent_states": agent_states,
"agent_labels": agent_labels,
"interpolated_traj": final_traj
}
def _compute_agent_targets(self, annotations: Annotations) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Extracts 2D agent bounding boxes in ego coordinates
:param annotations: annotation dataclass
:return: tuple of bounding box values and labels (binary)
"""
max_agents = self._config.num_bounding_boxes
agent_states_list: List[npt.NDArray[np.float32]] = []
def _xy_in_lidar(x: float, y: float, config: Vadv2Config) -> bool:
return (config.lidar_min_x <= x <= config.lidar_max_x) and (
config.lidar_min_y <= y <= config.lidar_max_y
)
for box, name in zip(annotations.boxes, annotations.names):
box_x, box_y, box_heading, box_length, box_width = (
box[BoundingBoxIndex.X],
box[BoundingBoxIndex.Y],
box[BoundingBoxIndex.HEADING],
box[BoundingBoxIndex.LENGTH],
box[BoundingBoxIndex.WIDTH],
)
if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config):
agent_states_list.append(
np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32)
)
agents_states_arr = np.array(agent_states_list)
# filter num_instances nearest
agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32)
agent_labels = np.zeros(max_agents, dtype=bool)
if len(agents_states_arr) > 0:
distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1)
argsort = np.argsort(distances)[:max_agents]
# filter detections
agents_states_arr = agents_states_arr[argsort]
agent_states[: len(agents_states_arr)] = agents_states_arr
agent_labels[: len(agents_states_arr)] = True
return torch.tensor(agent_states), torch.tensor(agent_labels)
class BoundingBox2DIndex(IntEnum):
_X = 0
_Y = 1
_HEADING = 2
_LENGTH = 3
_WIDTH = 4
@classmethod
def size(cls):
valid_attributes = [
attribute
for attribute in dir(cls)
if attribute.startswith("_")
and not attribute.startswith("__")
and not callable(getattr(cls, attribute))
]
return len(valid_attributes)
@classmethod
@property
def X(cls):
return cls._X
@classmethod
@property
def Y(cls):
return cls._Y
@classmethod
@property
def HEADING(cls):
return cls._HEADING
@classmethod
@property
def LENGTH(cls):
return cls._LENGTH
@classmethod
@property
def WIDTH(cls):
return cls._WIDTH
@classmethod
@property
def POINT(cls):
# assumes X, Y have subsequent indices
return slice(cls._X, cls._Y + 1)
@classmethod
@property
def STATE_SE2(cls):
# assumes X, Y, HEADING have subsequent indices
return slice(cls._X, cls._HEADING + 1)