from typing import Dict, List import numpy as np import numpy.typing as npt import shapely import torch from nuplan.common.actor_state.ego_state import EgoState from nuplan.common.actor_state.state_representation import StateSE2 from nuplan.common.actor_state.state_representation import TimePoint, StateVector2D from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters from nuplan.common.geometry.convert import absolute_to_relative_poses from nuplan.common.maps.abstract_map import PolygonMapObject from nuplan.common.maps.maps_datatypes import SemanticMapLayer from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling from shapely import affinity, LineString from navsim.agents.hydra.hydra_features import BoundingBox2DIndex from navsim.agents.hydra_plantf.hydra_plantf_config import HydraPlantfConfig from navsim.common.dataclasses import AgentInput, Scene from navsim.common.dataclasses import Annotations from navsim.common.enums import BoundingBoxIndex from navsim.evaluate.pdm_score import transform_trajectory, get_trajectory_as_array from navsim.planning.simulation.planner.pdm_planner.utils.pdm_enums import StateIndex from navsim.planning.training.abstract_feature_target_builder import ( AbstractFeatureBuilder, AbstractTargetBuilder, ) def interpolate_polyline(points: np.ndarray, t: int) -> np.ndarray: """copy from av2-api""" if points.ndim != 2: raise ValueError("Input array must be (N,2) or (N,3) in shape.") # the number of points on the curve itself n, _ = points.shape # equally spaced in arclength -- the number of points that will be uniformly interpolated eq_spaced_points = np.linspace(0, 1, t) # Compute the chordal arclength of each segment. # Compute differences between each x coord, to get the dx's # Do the same to get dy's. Then the hypotenuse length is computed as a norm. chordlen: np.ndarray = np.linalg.norm(np.diff(points, axis=0), axis=1) # type: ignore # Normalize the arclengths to a unit total chordlen = chordlen / np.sum(chordlen) # cumulative arclength cumarc: np.ndarray = np.zeros(len(chordlen) + 1) cumarc[1:] = np.cumsum(chordlen) # which interval did each point fall in, in terms of eq_spaced_points? (bin index) tbins: np.ndarray = np.digitize(eq_spaced_points, bins=cumarc).astype(int) # type: ignore # #catch any problems at the ends tbins[np.where((tbins <= 0) | (eq_spaced_points <= 0))] = 1 # type: ignore tbins[np.where((tbins >= n) | (eq_spaced_points >= 1))] = n - 1 chordlen[tbins - 1] = np.where( chordlen[tbins - 1] == 0, chordlen[tbins - 1] + 1e-6, chordlen[tbins - 1] ) s = np.divide((eq_spaced_points - cumarc[tbins - 1]), chordlen[tbins - 1]) anchors = points[tbins - 1, :] # broadcast to scale each row of `points` by a different row of s offsets = (points[tbins, :] - points[tbins - 1, :]) * s.reshape(-1, 1) points_interp: np.ndarray = anchors + offsets return points_interp class HydraPlantfFeatureBuilder(AbstractFeatureBuilder): def __init__(self, config: HydraPlantfConfig): super().__init__() self._config = config self.max_map_objs = 50 self.polygon_types = [ SemanticMapLayer.LANE, SemanticMapLayer.LANE_CONNECTOR, SemanticMapLayer.CROSSWALK, ] self.agent_types = [ "vehicle", "pedestrian", "bicycle" ] def get_unique_name(self) -> str: """Inherited, see superclass.""" return "hydraplantf_feature" def _compute_agent_features(self, annotations: Annotations): max_agents = self._config.num_bounding_boxes agent_states_list: List[npt.NDArray[np.float32]] = [] agent_category_list = [] def _xy_in_lidar(x: float, y: float, config: HydraPlantfConfig) -> bool: return (config.lidar_min_x <= x <= config.lidar_max_x) and ( config.lidar_min_y <= y <= config.lidar_max_y ) for box, name, velo in zip(annotations.boxes, annotations.names, annotations.velocity_3d): box_x, box_y, box_heading, box_length, box_width = ( box[BoundingBoxIndex.X], box[BoundingBoxIndex.Y], box[BoundingBoxIndex.HEADING], box[BoundingBoxIndex.LENGTH], box[BoundingBoxIndex.WIDTH], ) velo_x, velo_y = ( velo[0], velo[1] ) if (name == "vehicle" or name == "pedestrian" or name == "bicycle") \ and _xy_in_lidar(box_x, box_y, self._config): agent_states_list.append( np.array([box_x, box_y, np.cos(box_heading), np.sin(box_heading), box_length, box_width, velo_x, velo_y], dtype=np.float32) ) agent_category_list.append( self.agent_types.index(name) ) agents_states_arr = np.array(agent_states_list) agents_category_arr = np.array(agent_category_list) # filter num_instances nearest agent_states = np.zeros((max_agents, 8), dtype=np.float32) agent_category = np.zeros(max_agents, dtype=np.int8) valid_mask = np.zeros(max_agents, dtype=bool) if len(agents_states_arr) > 0: distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1) argsort = np.argsort(distances)[:max_agents] # filter detections agents_states_arr = agents_states_arr[argsort] agents_category_arr = agents_category_arr[argsort] valid_len = len(agents_states_arr) agent_states[:valid_len] = agents_states_arr agent_category[:valid_len] = agents_category_arr valid_mask[:valid_len] = True return { 'states': torch.tensor(agent_states), 'categories': torch.tensor(agent_category), 'valid_mask': torch.tensor(valid_mask) } def compute_features(self, agent_input: AgentInput, scene: Scene) -> Dict[str, torch.Tensor]: """Inherited, see superclass.""" features = { 'agent': {}, 'map': {} } annotations = scene.frames[-1].annotations ego_status_list = [] for i in range(self._config.num_ego_status): # i=0: idx=-1 # i=1: idx=-2 # i=2: idx=-3 # i=3: idx=-4 idx = - (i + 1) ego_status_list += [ torch.tensor(agent_input.ego_statuses[idx].driving_command, dtype=torch.float32), torch.tensor(agent_input.ego_statuses[idx].ego_velocity, dtype=torch.float32), torch.tensor(agent_input.ego_statuses[idx].ego_acceleration, dtype=torch.float32), ] features["status_feature"] = torch.concatenate( ego_status_list ) agent_features = self._compute_agent_features(annotations) map_features = self._compute_map_features(scene.map_api, StateSE2(*scene.frames[-1].ego_status.ego_pose)) features['agent'].update(agent_features) features['map'].update(map_features) return features def _sample_discrete_path(self, linestring: LineString, num_points: int): xs, ys = linestring.xy path = np.stack([np.array([x, y]) for x, y in zip(xs, ys)], axis=0) return interpolate_polyline(path, num_points) def _geometry_local_coords(self, geometry, origin: StateSE2): a = np.cos(origin.heading) b = np.sin(origin.heading) d = -np.sin(origin.heading) e = np.cos(origin.heading) xoff = -origin.x yoff = -origin.y translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff]) rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0]) return rotated_geometry def _compute_map_features(self, map_api, ego_pose): radius = 32 # 20 points for a map object sample_points = 20 map_objects = map_api.get_proximal_map_objects( ego_pose.point, radius, [ SemanticMapLayer.LANE, SemanticMapLayer.LANE_CONNECTOR, SemanticMapLayer.CROSSWALK, ], ) lane_objects = ( map_objects[SemanticMapLayer.LANE] + map_objects[SemanticMapLayer.LANE_CONNECTOR] ) crosswalk_objects = map_objects[SemanticMapLayer.CROSSWALK] object_ids = [int(obj.id) for obj in lane_objects + crosswalk_objects] object_types = ( [SemanticMapLayer.LANE] * len(map_objects[SemanticMapLayer.LANE]) + [SemanticMapLayer.LANE_CONNECTOR] * len(map_objects[SemanticMapLayer.LANE_CONNECTOR]) + [SemanticMapLayer.CROSSWALK] * len(map_objects[SemanticMapLayer.CROSSWALK]) ) M = len(lane_objects) + len(crosswalk_objects) P = sample_points point_position = np.zeros((M, 3, P, 2), dtype=np.float64) point_vector = np.zeros((M, 3, P, 2), dtype=np.float64) point_side = np.zeros((M, 3), dtype=np.int8) point_orientation = np.zeros((M, 3, P), dtype=np.float64) polygon_center = np.zeros((M, 3), dtype=np.float64) polygon_position = np.zeros((M, 2), dtype=np.float64) polygon_orientation = np.zeros(M, dtype=np.float64) polygon_type = np.zeros(M, dtype=np.int8) for lane in lane_objects: object_id = int(lane.id) idx = object_ids.index(object_id) transformed_baseline_path = self._geometry_local_coords( lane.baseline_path.linestring, ego_pose ) transformed_left_path = self._geometry_local_coords( lane.left_boundary.linestring, ego_pose ) transformed_right_path = self._geometry_local_coords( lane.right_boundary.linestring, ego_pose ) centerline = self._sample_discrete_path( transformed_baseline_path, sample_points + 1 ) left_bound = self._sample_discrete_path( transformed_left_path, sample_points + 1 ) right_bound = self._sample_discrete_path( transformed_right_path, sample_points + 1 ) edges = np.stack([centerline, left_bound, right_bound], axis=0) point_vector[idx] = edges[:, 1:] - edges[:, :-1] point_position[idx] = edges[:, :-1] point_orientation[idx] = np.arctan2( point_vector[idx, :, :, 1], point_vector[idx, :, :, 0] ) point_side[idx] = np.arange(3) polygon_center[idx] = np.concatenate( [ centerline[int(sample_points / 2)], [point_orientation[idx, 0, int(sample_points / 2)]], ], axis=-1, ) polygon_position[idx] = centerline[0] polygon_orientation[idx] = point_orientation[idx, 0, 0] polygon_type[idx] = self.polygon_types.index(object_types[idx]) for crosswalk in crosswalk_objects: idx = object_ids.index(int(crosswalk.id)) edges = self._get_crosswalk_edges(crosswalk, ego_pose) point_vector[idx] = edges[:, 1:] - edges[:, :-1] point_position[idx] = edges[:, :-1] point_orientation[idx] = np.arctan2( point_vector[idx, :, :, 1], point_vector[idx, :, :, 0] ) point_side[idx] = np.arange(3) polygon_center[idx] = np.concatenate( [ edges[0, int(sample_points / 2)], [point_orientation[idx, 0, int(sample_points / 2)]], ], axis=-1, ) polygon_position[idx] = edges[0, 0] polygon_orientation[idx] = point_orientation[idx, 0, 0] polygon_type[idx] = self.polygon_types.index(object_types[idx]) features = { "point_position": point_position, "point_vector": point_vector, "point_orientation": point_orientation, "point_side": point_side, "polygon_center": polygon_center, "polygon_position": polygon_position, "polygon_orientation": polygon_orientation, "polygon_type": polygon_type, } point_position = features["point_position"] x_max, x_min = 32, -32 y_max, y_min = 32, -32 valid_mask = ( (point_position[:, 0, :, 0] < x_max) & (point_position[:, 0, :, 0] > x_min) & (point_position[:, 0, :, 1] < y_max) & (point_position[:, 0, :, 1] > y_min) ) valid_polygon = valid_mask.any(-1) features["valid_mask"] = valid_mask for k, v in features.items(): valid_v = v[valid_polygon] obj_cnt = valid_v.shape[0] if obj_cnt >= self.max_map_objs: features[k] = valid_v[:self.max_map_objs] else: pads = np.zeros((self.max_map_objs - obj_cnt, *valid_v.shape[1:]), dtype=valid_v.dtype) features[k] = np.concatenate([valid_v, pads], axis=0) return features def _get_crosswalk_edges( self, crosswalk: PolygonMapObject, ego_pose, sample_points: int = 21 ): transformed_poly = self._geometry_local_coords(crosswalk.polygon, ego_pose) bbox = shapely.minimum_rotated_rectangle(transformed_poly) coords = np.stack(bbox.exterior.coords.xy, axis=-1) edge1 = coords[[3, 0]] # right boundary edge2 = coords[[2, 1]] # left boundary edges = np.stack([(edge1 + edge2) * 0.5, edge2, edge1], axis=0) # [3, 2, 2] vector = edges[:, 1] - edges[:, 0] # [3, 2] steps = np.linspace(0, 1, sample_points, endpoint=True)[None, :] points = edges[:, 0][:, None, :] + vector[:, None, :] * steps[:, :, None] return points class HydraPlantfTargetBuilder(AbstractTargetBuilder): def __init__(self, config: HydraPlantfConfig): super().__init__() self._config = config self.v_params = get_pacifica_parameters() def get_unique_name(self) -> str: """Inherited, see superclass.""" return "hydraplantf_target" def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]: """Inherited, see superclass.""" future_traj = scene.get_future_trajectory( num_trajectory_frames=self._config.trajectory_sampling.num_poses ) trajectory = torch.tensor(future_traj.poses) frame_idx = scene.scene_metadata.num_history_frames - 1 ego_state = EgoState.build_from_rear_axle( StateSE2(*scene.frames[frame_idx].ego_status.ego_pose), tire_steering_angle=0.0, vehicle_parameters=self.v_params, time_point=TimePoint(scene.frames[frame_idx].timestamp), rear_axle_velocity_2d=StateVector2D( *scene.frames[frame_idx].ego_status.ego_velocity ), rear_axle_acceleration_2d=StateVector2D( *scene.frames[frame_idx].ego_status.ego_acceleration ), ) trans_traj = transform_trajectory( future_traj, ego_state ) interpolated_traj = get_trajectory_as_array( trans_traj, TrajectorySampling(num_poses=40, interval_length=0.1), ego_state.time_point ) rel_poses = absolute_to_relative_poses([StateSE2(*tmp) for tmp in interpolated_traj[:, StateIndex.STATE_SE2]]) # skip the curr frame final_traj = [pose.serialize() for pose in rel_poses[1:]] final_traj = torch.tensor(final_traj) return { "trajectory": trajectory, "interpolated_traj": final_traj }