|
import numpy as np |
|
import numpy.typing as npt |
|
from typing import List, Optional |
|
from shapely import Point, creation |
|
from navsim.planning.simulation.planner.pdm_planner.observation.pdm_occupancy_map import ( |
|
PDMDrivableMap, |
|
) |
|
from navsim.planning.simulation.planner.pdm_planner.utils.pdm_path import ( |
|
PDMPath, |
|
) |
|
from nuplan.planning.metrics.utils.route_extractor import get_distance_of_closest_baseline_point_to_its_start |
|
from nuplan.common.actor_state.state_representation import Point2D |
|
from nuplan.common.maps.abstract_map import AbstractMap |
|
from nuplan.common.maps.abstract_map_objects import ( |
|
GraphEdgeMapObject, |
|
Lane, |
|
LaneConnector, |
|
LaneGraphEdgeMapObject, |
|
PolylineMapObject, |
|
) |
|
from nuplan.common.maps.maps_datatypes import SemanticMapLayer |
|
|
|
|
|
def get_current_route_objects(map_api: AbstractMap, pose: Point2D) -> List[GraphEdgeMapObject]: |
|
""" |
|
Gets the list including the lane or lane_connectors the pose corresponds to if there exists one, and empty list o.w |
|
:param map_api: map |
|
:param pose: xy coordinates |
|
:return the corresponding route object. |
|
""" |
|
curr_lane = map_api.get_one_map_object(pose, SemanticMapLayer.LANE) |
|
if curr_lane is None: |
|
|
|
curr_lane_connectors = map_api.get_all_map_objects(pose, SemanticMapLayer.LANE_CONNECTOR) |
|
route_objects_with_pose = curr_lane_connectors |
|
else: |
|
route_objects_with_pose = [curr_lane] |
|
|
|
return route_objects_with_pose |
|
|
|
|
|
def get_route_obj_with_candidates( |
|
pose: Point2D, candidate_route_objs: List[GraphEdgeMapObject] |
|
) -> List[GraphEdgeMapObject]: |
|
""" |
|
This function uses a candidate set of lane/lane-connectors and return the lane/lane-connector that correponds to the pose |
|
by checking if pose belongs to one of the route objs in candidate_route_objs or their outgoing_edges |
|
:param pose: ego_pose |
|
:param candidate_route_objs: a list of route objects |
|
:return: a list of route objects corresponding to the pose |
|
""" |
|
if not len(candidate_route_objs): |
|
raise ValueError('candidate_route_objs list is empty, no candidates to start with') |
|
|
|
|
|
route_objects_with_pose = [ |
|
one_route_obj for one_route_obj in candidate_route_objs if one_route_obj.contains_point(pose) |
|
] |
|
|
|
|
|
|
|
if not route_objects_with_pose and len(candidate_route_objs) == 1: |
|
route_objects_with_pose = [ |
|
next_route_obj |
|
for next_route_obj in candidate_route_objs[0].outgoing_edges |
|
if next_route_obj.contains_point(pose) |
|
] |
|
return route_objects_with_pose |
|
|
|
|
|
def remove_extra_lane_connectors(route_objs: List[List[GraphEdgeMapObject]]) -> List[List[GraphEdgeMapObject]]: |
|
""" |
|
# This function iterate through route object and replace field with multiple lane_connectors |
|
# with the one lane_connector ego ends up in. |
|
:param route_objs: a list of route objects. |
|
""" |
|
|
|
last_to_first_route_list = route_objs[::-1] |
|
enum = enumerate(last_to_first_route_list) |
|
for ind, curr_last_obj in enum: |
|
|
|
if ind == 0 or len(curr_last_obj) <= 1: |
|
continue |
|
|
|
if len(curr_last_obj) > len(last_to_first_route_list[ind - 1]): |
|
curr_route_obj_ids = [obj.id for obj in curr_last_obj] |
|
if all([(obj.id in curr_route_obj_ids) for obj in last_to_first_route_list[ind - 1]]): |
|
last_to_first_route_list[ind] = last_to_first_route_list[ind - 1] |
|
|
|
if len(curr_last_obj) <= 1: |
|
continue |
|
|
|
if last_to_first_route_list[ind - 1] and isinstance(last_to_first_route_list[ind - 1][0], Lane): |
|
next_lane_incoming_edge_ids = [obj.id for obj in last_to_first_route_list[ind - 1][0].incoming_edges] |
|
objs_to_keep = [obj for obj in curr_last_obj if obj.id in next_lane_incoming_edge_ids] |
|
if objs_to_keep: |
|
last_to_first_route_list[ind] = objs_to_keep |
|
|
|
return last_to_first_route_list[::-1] |
|
|
|
|
|
def _get_route(map_api: AbstractMap, poses: List[Point2D]) -> List[List[GraphEdgeMapObject]]: |
|
""" |
|
Returns and sets the sequence of lane and lane connectors corresponding to the trajectory |
|
:param map_api: map |
|
:param poses: a list of xy coordinates |
|
:return list of route objects. |
|
""" |
|
if not len(poses): |
|
raise ValueError('invalid poses passed to get_route()') |
|
|
|
route_objs: List[List[GraphEdgeMapObject]] = [] |
|
|
|
|
|
curr_route_obj: List[GraphEdgeMapObject] = [] |
|
|
|
for ind, pose in enumerate(poses): |
|
if curr_route_obj: |
|
|
|
|
|
curr_route_obj = get_route_obj_with_candidates(pose, curr_route_obj) |
|
|
|
|
|
if not curr_route_obj: |
|
curr_route_obj = get_current_route_objects(map_api, pose) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
route_objs.append(curr_route_obj) |
|
|
|
|
|
improved_route_obj = remove_extra_lane_connectors(route_objs) |
|
return improved_route_obj |
|
|
|
def _extract_metric( |
|
ego_poses: List[Point2D], ego_driven_route: List[List[GraphEdgeMapObject]], n_horizon: int |
|
) -> List[float]: |
|
"""Compute the movement of ego during the past n_horizon samples along the direction of baselines. |
|
:param ego_poses: List of ego poses. |
|
:param ego_driven_route: List of lanes/lane_connectors ego belongs to. |
|
:param n_horizon: Number of samples to sum the movement over. |
|
:return: A list of floats including ego's overall movements in the past n_horizon samples. |
|
""" |
|
progress_along_baseline = [] |
|
distance_to_start = None |
|
prev_distance_to_start = None |
|
prev_route_obj_id = None |
|
|
|
if ego_driven_route[0]: |
|
prev_route_obj_id = ego_driven_route[0][0].id |
|
|
|
|
|
for ego_pose, ego_route_object in zip(ego_poses, ego_driven_route): |
|
|
|
if not ego_route_object: |
|
progress_along_baseline.append(0.0) |
|
continue |
|
|
|
|
|
if prev_route_obj_id and ego_route_object[0].id == prev_route_obj_id: |
|
distance_to_start = get_distance_of_closest_baseline_point_to_its_start( |
|
ego_route_object[0].baseline_path, ego_pose |
|
) |
|
|
|
progress_made = ( |
|
distance_to_start - prev_distance_to_start |
|
if prev_distance_to_start is not None and distance_to_start |
|
else 0.0 |
|
) |
|
progress_along_baseline.append(progress_made) |
|
prev_distance_to_start = distance_to_start |
|
else: |
|
|
|
distance_to_start = None |
|
prev_distance_to_start = None |
|
progress_along_baseline.append(0.0) |
|
prev_route_obj_id = ego_route_object[0].id |
|
|
|
|
|
progress_over_n_horizon = [ |
|
sum(progress_along_baseline[max(0, ind - n_horizon) : ind + 1]) |
|
for ind, _ in enumerate(progress_along_baseline) |
|
] |
|
return progress_over_n_horizon |
|
|
|
def calc_driving_direction_compliance(map_api, |
|
ego_coords, |
|
BBCoordsIndex, |
|
num_proposals, |
|
config, |
|
proposal_sampling): |
|
""" |
|
Calculate the average distance from the vehicle to the centerline. |
|
""" |
|
|
|
horizon = int( |
|
config.driving_direction_horizon / proposal_sampling.interval_length |
|
) |
|
driving_direction_score = np.ones(num_proposals) |
|
|
|
|
|
|
|
for proposal_idx in range(num_proposals): |
|
poses: List[Point2D] = [] |
|
for time_idx in range(ego_coords.shape[1]): |
|
pose = Point(*ego_coords[proposal_idx, time_idx, BBCoordsIndex.CENTER]) |
|
poses.append(Point2D(pose.x, pose.y)) |
|
|
|
ego_driven_route = _get_route(map_api, poses) |
|
|
|
progress_over_interval = _extract_metric(poses, ego_driven_route, horizon) |
|
max_negative_progress_over_interval = abs(min(progress_over_interval)) |
|
if max_negative_progress_over_interval < config.driving_direction_compliance_threshold: |
|
driving_direction_score[proposal_idx] = 1.0 |
|
elif max_negative_progress_over_interval < config.driving_direction_violation_threshold: |
|
driving_direction_score[proposal_idx] = 0.5 |
|
else: |
|
driving_direction_score[proposal_idx] = 0.0 |
|
|
|
|
|
|
|
return driving_direction_score |
|
|
|
def check_driving_direction_compliance(future_positions: np.ndarray, lane_centers: np.ndarray) -> bool: |
|
""" |
|
Checks if the driving direction complies with the lane directions. |
|
|
|
Args: |
|
future_positions (np.ndarray): The future positions of the trajectory. |
|
lane_centers (np.ndarray): A 3D array where each slice along the first dimension represents a lane. |
|
Returns: |
|
bool: True if driving direction is compliant at every timestamp, False otherwise. |
|
""" |
|
|
|
direction_vectors = future_positions[1:] - future_positions[:-1] |
|
|
|
direction_vector_norms = np.linalg.norm(direction_vectors, axis=1, keepdims=True) |
|
normalized_direction_vectors = direction_vectors / np.where(direction_vector_norms == 0, 1, |
|
direction_vector_norms) |
|
|
|
cos_threshold = np.cos(np.deg2rad(45)) |
|
|
|
min_distances = np.full(future_positions.shape[0] - 1, np.inf) |
|
best_cos_thetas = np.full(future_positions.shape[0] - 1, -np.inf) |
|
|
|
for lane in lane_centers: |
|
segment_starts = lane[:-1] |
|
segment_ends = lane[1:] |
|
|
|
projections = self.project_points_to_segments(future_positions[:-1], segment_starts, |
|
segment_ends) |
|
|
|
distances = np.linalg.norm(projections - future_positions[:-1, np.newaxis, :], axis=2) |
|
|
|
nearest_indices = np.argmin(distances, axis=1) |
|
|
|
nearest_segment_starts = segment_starts[nearest_indices] |
|
nearest_segment_ends = segment_ends[nearest_indices] |
|
nearest_segment_directions = nearest_segment_ends - nearest_segment_starts |
|
|
|
nearest_segment_norms = np.linalg.norm(nearest_segment_directions, axis=1, keepdims=True) |
|
normalized_nearest_directions = nearest_segment_directions / np.where(nearest_segment_norms == 0, 1, nearest_segment_norms) |
|
|
|
cos_thetas = np.sum(normalized_direction_vectors * normalized_nearest_directions, axis=1) |
|
|
|
closer_segments = distances[np.arange(distances.shape[0]), nearest_indices] < min_distances |
|
min_distances[closer_segments] = distances[np.arange(distances.shape[0]), nearest_indices][closer_segments] |
|
best_cos_thetas[closer_segments] = cos_thetas[closer_segments] |
|
|
|
return 1.0 * (best_cos_thetas >= cos_threshold).sum() / best_cos_thetas.shape[0] |