lkllkl's picture
Upload folder using huggingface_hub
da2e2ac verified
import numpy as np
import numpy.typing as npt
from typing import List, Optional
from shapely import Point, creation
from navsim.planning.simulation.planner.pdm_planner.observation.pdm_occupancy_map import (
PDMDrivableMap,
)
from navsim.planning.simulation.planner.pdm_planner.utils.pdm_path import (
PDMPath,
)
from nuplan.planning.metrics.utils.route_extractor import get_distance_of_closest_baseline_point_to_its_start
from nuplan.common.actor_state.state_representation import Point2D
from nuplan.common.maps.abstract_map import AbstractMap
from nuplan.common.maps.abstract_map_objects import (
GraphEdgeMapObject,
Lane,
LaneConnector,
LaneGraphEdgeMapObject,
PolylineMapObject,
)
from nuplan.common.maps.maps_datatypes import SemanticMapLayer
def get_current_route_objects(map_api: AbstractMap, pose: Point2D) -> List[GraphEdgeMapObject]:
"""
Gets the list including the lane or lane_connectors the pose corresponds to if there exists one, and empty list o.w
:param map_api: map
:param pose: xy coordinates
:return the corresponding route object.
"""
curr_lane = map_api.get_one_map_object(pose, SemanticMapLayer.LANE)
if curr_lane is None:
# Get the list of lane connectors if exists, otherwise it returns and empty list
curr_lane_connectors = map_api.get_all_map_objects(pose, SemanticMapLayer.LANE_CONNECTOR)
route_objects_with_pose = curr_lane_connectors
else:
route_objects_with_pose = [curr_lane]
return route_objects_with_pose # type: ignore
def get_route_obj_with_candidates(
pose: Point2D, candidate_route_objs: List[GraphEdgeMapObject]
) -> List[GraphEdgeMapObject]:
"""
This function uses a candidate set of lane/lane-connectors and return the lane/lane-connector that correponds to the pose
by checking if pose belongs to one of the route objs in candidate_route_objs or their outgoing_edges
:param pose: ego_pose
:param candidate_route_objs: a list of route objects
:return: a list of route objects corresponding to the pose
"""
if not len(candidate_route_objs):
raise ValueError('candidate_route_objs list is empty, no candidates to start with')
# for each pose first check if pose belongs to candidate route objs
route_objects_with_pose = [
one_route_obj for one_route_obj in candidate_route_objs if one_route_obj.contains_point(pose)
]
# if it does not, and candidate set has only one element check wether it's in an outgoing_edge of the previous lane/lane_connector.
# It is expected that ego is eventually assigned to a single lane-connector when it is entering an outgoing_edge, and hence the logic:
if not route_objects_with_pose and len(candidate_route_objs) == 1:
route_objects_with_pose = [
next_route_obj
for next_route_obj in candidate_route_objs[0].outgoing_edges
if next_route_obj.contains_point(pose)
]
return route_objects_with_pose
def remove_extra_lane_connectors(route_objs: List[List[GraphEdgeMapObject]]) -> List[List[GraphEdgeMapObject]]:
"""
# This function iterate through route object and replace field with multiple lane_connectors
# with the one lane_connector ego ends up in.
:param route_objs: a list of route objects.
"""
# start from last object in the route list
last_to_first_route_list = route_objs[::-1]
enum = enumerate(last_to_first_route_list)
for ind, curr_last_obj in enum:
# skip if ind = 0 or if there's a single object in current objects list
if ind == 0 or len(curr_last_obj) <= 1:
continue
# O.w cull down the curr_last_obj using the next obj (prev obj in the reversed list) if possible
if len(curr_last_obj) > len(last_to_first_route_list[ind - 1]):
curr_route_obj_ids = [obj.id for obj in curr_last_obj]
if all([(obj.id in curr_route_obj_ids) for obj in last_to_first_route_list[ind - 1]]):
last_to_first_route_list[ind] = last_to_first_route_list[ind - 1]
# Skip the rest if there's no more than one object left
if len(curr_last_obj) <= 1:
continue
# Otherwise try to see if you can cull down lane_connectors using the lane ego ends up in and its incoming_edges
if last_to_first_route_list[ind - 1] and isinstance(last_to_first_route_list[ind - 1][0], Lane):
next_lane_incoming_edge_ids = [obj.id for obj in last_to_first_route_list[ind - 1][0].incoming_edges]
objs_to_keep = [obj for obj in curr_last_obj if obj.id in next_lane_incoming_edge_ids]
if objs_to_keep:
last_to_first_route_list[ind] = objs_to_keep
return last_to_first_route_list[::-1]
def _get_route(map_api: AbstractMap, poses: List[Point2D]) -> List[List[GraphEdgeMapObject]]:
"""
Returns and sets the sequence of lane and lane connectors corresponding to the trajectory
:param map_api: map
:param poses: a list of xy coordinates
:return list of route objects.
"""
if not len(poses):
raise ValueError('invalid poses passed to get_route()')
route_objs: List[List[GraphEdgeMapObject]] = []
# Find the lane/lane_connector ego belongs to initially
curr_route_obj: List[GraphEdgeMapObject] = []
for ind, pose in enumerate(poses):
if curr_route_obj:
# next, for each pose first check if pose belongs to previously found lane/lane_connectors,
# if it does not, check wether it's in an outgoing_egde of the previous lane/lane_connector
curr_route_obj = get_route_obj_with_candidates(pose, curr_route_obj)
# If route obj is not found using the previous step re-search the map
if not curr_route_obj:
curr_route_obj = get_current_route_objects(map_api, pose)
# Ideally, two successive lane_connectors in the list shouldn't be distinct. However in some cases
# trajectory can slightly goes outside the
# # associated lane_connector and lies inside an irrelevant lane_connector.
# Filter these cases if pose is still close to the previous lane_connector:
# if (
# ind > 1
# and route_objs[-1]
# and isinstance(route_objs[-1][0], LaneConnector)
# and (
# (curr_route_obj and isinstance(curr_route_obj[0], LaneConnector))
# or (not curr_route_obj and map_api.is_in_layer(pose, SemanticMapLayer.INTERSECTION))
# )
# ):
# previous_proximal_route_obj = [obj for obj in route_objs[-1] if
# obj.polygon.distance(Point(*pose)) < 0.5]
#
# if previous_proximal_route_obj:
# curr_route_obj = previous_proximal_route_obj
route_objs.append(curr_route_obj)
# iterate through route object and replace field with multiple lane_connectors with the one lane_connector ego ends up in.
improved_route_obj = remove_extra_lane_connectors(route_objs)
return improved_route_obj
def _extract_metric(
ego_poses: List[Point2D], ego_driven_route: List[List[GraphEdgeMapObject]], n_horizon: int
) -> List[float]:
"""Compute the movement of ego during the past n_horizon samples along the direction of baselines.
:param ego_poses: List of ego poses.
:param ego_driven_route: List of lanes/lane_connectors ego belongs to.
:param n_horizon: Number of samples to sum the movement over.
:return: A list of floats including ego's overall movements in the past n_horizon samples.
"""
progress_along_baseline = []
distance_to_start = None
prev_distance_to_start = None
prev_route_obj_id = None
# If the first pose belongs to a lane/lane_connector store the id in prev_route_obj_id
if ego_driven_route[0]:
prev_route_obj_id = ego_driven_route[0][0].id
# for each pose in the driven_trajectory compute the progress along the baseline of the corresponding lane/lane_connector in driven_route
for ego_pose, ego_route_object in zip(ego_poses, ego_driven_route):
# If pose isn't assigned a lane/lane_connector, there's no driving direction:
if not ego_route_object:
progress_along_baseline.append(0.0)
continue
# If the lane/lane_conn ego is in hasn't changed since last iteration compute the progress along its baseline
# by subtracting its current distance to baseline's starting point from its distace in the previous iteration
if prev_route_obj_id and ego_route_object[0].id == prev_route_obj_id:
distance_to_start = get_distance_of_closest_baseline_point_to_its_start(
ego_route_object[0].baseline_path, ego_pose
)
# If prev_distance_to_start is set, compute the progress by subtracting distance_to_start from it, o.w set it to use in the next iteration
progress_made = (
distance_to_start - prev_distance_to_start
if prev_distance_to_start is not None and distance_to_start
else 0.0
)
progress_along_baseline.append(progress_made)
prev_distance_to_start = distance_to_start
else:
# Reset the parameters when ego first enters a lane/lane-connector
distance_to_start = None
prev_distance_to_start = None
progress_along_baseline.append(0.0)
prev_route_obj_id = ego_route_object[0].id
# Compute progress over n_horizon last samples for each time point
progress_over_n_horizon = [
sum(progress_along_baseline[max(0, ind - n_horizon) : ind + 1])
for ind, _ in enumerate(progress_along_baseline)
]
return progress_over_n_horizon
def calc_driving_direction_compliance(map_api,
ego_coords,
BBCoordsIndex,
num_proposals,
config,
proposal_sampling):
"""
Calculate the average distance from the vehicle to the centerline.
"""
horizon = int(
config.driving_direction_horizon / proposal_sampling.interval_length
)
driving_direction_score = np.ones(num_proposals)
# Get the list of lane or lane_connectors associated to ego at each time instance, and store to use in other metrics
# Point(*ego_coords[proposal_idx, time_idx, BBCoordsIndex.CENTER])
for proposal_idx in range(num_proposals):
poses: List[Point2D] = []
for time_idx in range(ego_coords.shape[1]):
pose = Point(*ego_coords[proposal_idx, time_idx, BBCoordsIndex.CENTER])
poses.append(Point2D(pose.x, pose.y))
ego_driven_route = _get_route(map_api, poses)
# n_horizon = ego_coords.shape[1]
progress_over_interval = _extract_metric(poses, ego_driven_route, horizon)
max_negative_progress_over_interval = abs(min(progress_over_interval))
if max_negative_progress_over_interval < config.driving_direction_compliance_threshold:
driving_direction_score[proposal_idx] = 1.0
elif max_negative_progress_over_interval < config.driving_direction_violation_threshold:
driving_direction_score[proposal_idx] = 0.5
else:
driving_direction_score[proposal_idx] = 0.0
# if driving_direction_score[proposal_idx] != 1.0:
# for i in range(100000):
# print(driving_direction_score[proposal_idx])
return driving_direction_score
def check_driving_direction_compliance(future_positions: np.ndarray, lane_centers: np.ndarray) -> bool:
"""
Checks if the driving direction complies with the lane directions.
Args:
future_positions (np.ndarray): The future positions of the trajectory.
lane_centers (np.ndarray): A 3D array where each slice along the first dimension represents a lane.
Returns:
bool: True if driving direction is compliant at every timestamp, False otherwise.
"""
# Calculate direction vectors for each trajectory segment
direction_vectors = future_positions[1:] - future_positions[:-1] # Shape: (T-1, 2)
# Normalize direction vectors
direction_vector_norms = np.linalg.norm(direction_vectors, axis=1, keepdims=True)
normalized_direction_vectors = direction_vectors / np.where(direction_vector_norms == 0, 1,
direction_vector_norms) # Shape: (T-1, 2)
# Define cosine threshold for compliance
cos_threshold = np.cos(np.deg2rad(45))
# Initialize arrays to keep track of the nearest segments
min_distances = np.full(future_positions.shape[0] - 1, np.inf) # Shape: (T-1,)
best_cos_thetas = np.full(future_positions.shape[0] - 1, -np.inf) # Shape: (T-1,)
# Iterate through each lane in the 3D array
for lane in lane_centers: # Shape of lane: (M, 2), where M is the number of center points in the lane
segment_starts = lane[:-1] # Shape: (M-1, 2)
segment_ends = lane[1:] # Shape: (M-1, 2)
# Vectorized projection of trajectory points onto lane segments
projections = self.project_points_to_segments(future_positions[:-1], segment_starts,
segment_ends) # Shape: (T-1, M-1, 2)
# Calculate distances in a vectorized way
distances = np.linalg.norm(projections - future_positions[:-1, np.newaxis, :], axis=2) # Shape: (T-1, M-1)
# Determine the nearest lane segment for each trajectory point
nearest_indices = np.argmin(distances, axis=1) # Shape: (T-1,)
# Calculate the direction of the nearest lane segments
nearest_segment_starts = segment_starts[nearest_indices] # Shape: (T-1, 2)
nearest_segment_ends = segment_ends[nearest_indices] # Shape: (T-1, 2)
nearest_segment_directions = nearest_segment_ends - nearest_segment_starts # Shape: (T-1, 2)
# Normalize nearest lane segment directions
nearest_segment_norms = np.linalg.norm(nearest_segment_directions, axis=1, keepdims=True)
normalized_nearest_directions = nearest_segment_directions / np.where(nearest_segment_norms == 0, 1, nearest_segment_norms) # Shape: (T-1, 2)
# Calculate cosines of angles between trajectory directions and nearest lane segment directions
cos_thetas = np.sum(normalized_direction_vectors * normalized_nearest_directions, axis=1) # Shape: (T-1,)
# Update the minimum distances and best cosine values for compliance check
closer_segments = distances[np.arange(distances.shape[0]), nearest_indices] < min_distances
min_distances[closer_segments] = distances[np.arange(distances.shape[0]), nearest_indices][closer_segments]
best_cos_thetas[closer_segments] = cos_thetas[closer_segments]
# Check if all trajectory segments are compliant with their closest lane direction
return 1.0 * (best_cos_thetas >= cos_threshold).sum() / best_cos_thetas.shape[0]