import numpy as np import numpy.typing as npt from typing import List, Optional from shapely import Point, creation from navsim.planning.simulation.planner.pdm_planner.observation.pdm_occupancy_map import ( PDMDrivableMap, ) from navsim.planning.simulation.planner.pdm_planner.utils.pdm_path import ( PDMPath, ) from nuplan.planning.metrics.utils.route_extractor import get_distance_of_closest_baseline_point_to_its_start from nuplan.common.actor_state.state_representation import Point2D from nuplan.common.maps.abstract_map import AbstractMap from nuplan.common.maps.abstract_map_objects import ( GraphEdgeMapObject, Lane, LaneConnector, LaneGraphEdgeMapObject, PolylineMapObject, ) from nuplan.common.maps.maps_datatypes import SemanticMapLayer def get_current_route_objects(map_api: AbstractMap, pose: Point2D) -> List[GraphEdgeMapObject]: """ Gets the list including the lane or lane_connectors the pose corresponds to if there exists one, and empty list o.w :param map_api: map :param pose: xy coordinates :return the corresponding route object. """ curr_lane = map_api.get_one_map_object(pose, SemanticMapLayer.LANE) if curr_lane is None: # Get the list of lane connectors if exists, otherwise it returns and empty list curr_lane_connectors = map_api.get_all_map_objects(pose, SemanticMapLayer.LANE_CONNECTOR) route_objects_with_pose = curr_lane_connectors else: route_objects_with_pose = [curr_lane] return route_objects_with_pose # type: ignore def get_route_obj_with_candidates( pose: Point2D, candidate_route_objs: List[GraphEdgeMapObject] ) -> List[GraphEdgeMapObject]: """ This function uses a candidate set of lane/lane-connectors and return the lane/lane-connector that correponds to the pose by checking if pose belongs to one of the route objs in candidate_route_objs or their outgoing_edges :param pose: ego_pose :param candidate_route_objs: a list of route objects :return: a list of route objects corresponding to the pose """ if not len(candidate_route_objs): raise ValueError('candidate_route_objs list is empty, no candidates to start with') # for each pose first check if pose belongs to candidate route objs route_objects_with_pose = [ one_route_obj for one_route_obj in candidate_route_objs if one_route_obj.contains_point(pose) ] # if it does not, and candidate set has only one element check wether it's in an outgoing_edge of the previous lane/lane_connector. # It is expected that ego is eventually assigned to a single lane-connector when it is entering an outgoing_edge, and hence the logic: if not route_objects_with_pose and len(candidate_route_objs) == 1: route_objects_with_pose = [ next_route_obj for next_route_obj in candidate_route_objs[0].outgoing_edges if next_route_obj.contains_point(pose) ] return route_objects_with_pose def remove_extra_lane_connectors(route_objs: List[List[GraphEdgeMapObject]]) -> List[List[GraphEdgeMapObject]]: """ # This function iterate through route object and replace field with multiple lane_connectors # with the one lane_connector ego ends up in. :param route_objs: a list of route objects. """ # start from last object in the route list last_to_first_route_list = route_objs[::-1] enum = enumerate(last_to_first_route_list) for ind, curr_last_obj in enum: # skip if ind = 0 or if there's a single object in current objects list if ind == 0 or len(curr_last_obj) <= 1: continue # O.w cull down the curr_last_obj using the next obj (prev obj in the reversed list) if possible if len(curr_last_obj) > len(last_to_first_route_list[ind - 1]): curr_route_obj_ids = [obj.id for obj in curr_last_obj] if all([(obj.id in curr_route_obj_ids) for obj in last_to_first_route_list[ind - 1]]): last_to_first_route_list[ind] = last_to_first_route_list[ind - 1] # Skip the rest if there's no more than one object left if len(curr_last_obj) <= 1: continue # Otherwise try to see if you can cull down lane_connectors using the lane ego ends up in and its incoming_edges if last_to_first_route_list[ind - 1] and isinstance(last_to_first_route_list[ind - 1][0], Lane): next_lane_incoming_edge_ids = [obj.id for obj in last_to_first_route_list[ind - 1][0].incoming_edges] objs_to_keep = [obj for obj in curr_last_obj if obj.id in next_lane_incoming_edge_ids] if objs_to_keep: last_to_first_route_list[ind] = objs_to_keep return last_to_first_route_list[::-1] def _get_route(map_api: AbstractMap, poses: List[Point2D]) -> List[List[GraphEdgeMapObject]]: """ Returns and sets the sequence of lane and lane connectors corresponding to the trajectory :param map_api: map :param poses: a list of xy coordinates :return list of route objects. """ if not len(poses): raise ValueError('invalid poses passed to get_route()') route_objs: List[List[GraphEdgeMapObject]] = [] # Find the lane/lane_connector ego belongs to initially curr_route_obj: List[GraphEdgeMapObject] = [] for ind, pose in enumerate(poses): if curr_route_obj: # next, for each pose first check if pose belongs to previously found lane/lane_connectors, # if it does not, check wether it's in an outgoing_egde of the previous lane/lane_connector curr_route_obj = get_route_obj_with_candidates(pose, curr_route_obj) # If route obj is not found using the previous step re-search the map if not curr_route_obj: curr_route_obj = get_current_route_objects(map_api, pose) # Ideally, two successive lane_connectors in the list shouldn't be distinct. However in some cases # trajectory can slightly goes outside the # # associated lane_connector and lies inside an irrelevant lane_connector. # Filter these cases if pose is still close to the previous lane_connector: # if ( # ind > 1 # and route_objs[-1] # and isinstance(route_objs[-1][0], LaneConnector) # and ( # (curr_route_obj and isinstance(curr_route_obj[0], LaneConnector)) # or (not curr_route_obj and map_api.is_in_layer(pose, SemanticMapLayer.INTERSECTION)) # ) # ): # previous_proximal_route_obj = [obj for obj in route_objs[-1] if # obj.polygon.distance(Point(*pose)) < 0.5] # # if previous_proximal_route_obj: # curr_route_obj = previous_proximal_route_obj route_objs.append(curr_route_obj) # iterate through route object and replace field with multiple lane_connectors with the one lane_connector ego ends up in. improved_route_obj = remove_extra_lane_connectors(route_objs) return improved_route_obj def _extract_metric( ego_poses: List[Point2D], ego_driven_route: List[List[GraphEdgeMapObject]], n_horizon: int ) -> List[float]: """Compute the movement of ego during the past n_horizon samples along the direction of baselines. :param ego_poses: List of ego poses. :param ego_driven_route: List of lanes/lane_connectors ego belongs to. :param n_horizon: Number of samples to sum the movement over. :return: A list of floats including ego's overall movements in the past n_horizon samples. """ progress_along_baseline = [] distance_to_start = None prev_distance_to_start = None prev_route_obj_id = None # If the first pose belongs to a lane/lane_connector store the id in prev_route_obj_id if ego_driven_route[0]: prev_route_obj_id = ego_driven_route[0][0].id # for each pose in the driven_trajectory compute the progress along the baseline of the corresponding lane/lane_connector in driven_route for ego_pose, ego_route_object in zip(ego_poses, ego_driven_route): # If pose isn't assigned a lane/lane_connector, there's no driving direction: if not ego_route_object: progress_along_baseline.append(0.0) continue # If the lane/lane_conn ego is in hasn't changed since last iteration compute the progress along its baseline # by subtracting its current distance to baseline's starting point from its distace in the previous iteration if prev_route_obj_id and ego_route_object[0].id == prev_route_obj_id: distance_to_start = get_distance_of_closest_baseline_point_to_its_start( ego_route_object[0].baseline_path, ego_pose ) # If prev_distance_to_start is set, compute the progress by subtracting distance_to_start from it, o.w set it to use in the next iteration progress_made = ( distance_to_start - prev_distance_to_start if prev_distance_to_start is not None and distance_to_start else 0.0 ) progress_along_baseline.append(progress_made) prev_distance_to_start = distance_to_start else: # Reset the parameters when ego first enters a lane/lane-connector distance_to_start = None prev_distance_to_start = None progress_along_baseline.append(0.0) prev_route_obj_id = ego_route_object[0].id # Compute progress over n_horizon last samples for each time point progress_over_n_horizon = [ sum(progress_along_baseline[max(0, ind - n_horizon) : ind + 1]) for ind, _ in enumerate(progress_along_baseline) ] return progress_over_n_horizon def calc_driving_direction_compliance(map_api, ego_coords, BBCoordsIndex, num_proposals, config, proposal_sampling): """ Calculate the average distance from the vehicle to the centerline. """ horizon = int( config.driving_direction_horizon / proposal_sampling.interval_length ) driving_direction_score = np.ones(num_proposals) # Get the list of lane or lane_connectors associated to ego at each time instance, and store to use in other metrics # Point(*ego_coords[proposal_idx, time_idx, BBCoordsIndex.CENTER]) for proposal_idx in range(num_proposals): poses: List[Point2D] = [] for time_idx in range(ego_coords.shape[1]): pose = Point(*ego_coords[proposal_idx, time_idx, BBCoordsIndex.CENTER]) poses.append(Point2D(pose.x, pose.y)) ego_driven_route = _get_route(map_api, poses) # n_horizon = ego_coords.shape[1] progress_over_interval = _extract_metric(poses, ego_driven_route, horizon) max_negative_progress_over_interval = abs(min(progress_over_interval)) if max_negative_progress_over_interval < config.driving_direction_compliance_threshold: driving_direction_score[proposal_idx] = 1.0 elif max_negative_progress_over_interval < config.driving_direction_violation_threshold: driving_direction_score[proposal_idx] = 0.5 else: driving_direction_score[proposal_idx] = 0.0 # if driving_direction_score[proposal_idx] != 1.0: # for i in range(100000): # print(driving_direction_score[proposal_idx]) return driving_direction_score def check_driving_direction_compliance(future_positions: np.ndarray, lane_centers: np.ndarray) -> bool: """ Checks if the driving direction complies with the lane directions. Args: future_positions (np.ndarray): The future positions of the trajectory. lane_centers (np.ndarray): A 3D array where each slice along the first dimension represents a lane. Returns: bool: True if driving direction is compliant at every timestamp, False otherwise. """ # Calculate direction vectors for each trajectory segment direction_vectors = future_positions[1:] - future_positions[:-1] # Shape: (T-1, 2) # Normalize direction vectors direction_vector_norms = np.linalg.norm(direction_vectors, axis=1, keepdims=True) normalized_direction_vectors = direction_vectors / np.where(direction_vector_norms == 0, 1, direction_vector_norms) # Shape: (T-1, 2) # Define cosine threshold for compliance cos_threshold = np.cos(np.deg2rad(45)) # Initialize arrays to keep track of the nearest segments min_distances = np.full(future_positions.shape[0] - 1, np.inf) # Shape: (T-1,) best_cos_thetas = np.full(future_positions.shape[0] - 1, -np.inf) # Shape: (T-1,) # Iterate through each lane in the 3D array for lane in lane_centers: # Shape of lane: (M, 2), where M is the number of center points in the lane segment_starts = lane[:-1] # Shape: (M-1, 2) segment_ends = lane[1:] # Shape: (M-1, 2) # Vectorized projection of trajectory points onto lane segments projections = self.project_points_to_segments(future_positions[:-1], segment_starts, segment_ends) # Shape: (T-1, M-1, 2) # Calculate distances in a vectorized way distances = np.linalg.norm(projections - future_positions[:-1, np.newaxis, :], axis=2) # Shape: (T-1, M-1) # Determine the nearest lane segment for each trajectory point nearest_indices = np.argmin(distances, axis=1) # Shape: (T-1,) # Calculate the direction of the nearest lane segments nearest_segment_starts = segment_starts[nearest_indices] # Shape: (T-1, 2) nearest_segment_ends = segment_ends[nearest_indices] # Shape: (T-1, 2) nearest_segment_directions = nearest_segment_ends - nearest_segment_starts # Shape: (T-1, 2) # Normalize nearest lane segment directions nearest_segment_norms = np.linalg.norm(nearest_segment_directions, axis=1, keepdims=True) normalized_nearest_directions = nearest_segment_directions / np.where(nearest_segment_norms == 0, 1, nearest_segment_norms) # Shape: (T-1, 2) # Calculate cosines of angles between trajectory directions and nearest lane segment directions cos_thetas = np.sum(normalized_direction_vectors * normalized_nearest_directions, axis=1) # Shape: (T-1,) # Update the minimum distances and best cosine values for compliance check closer_segments = distances[np.arange(distances.shape[0]), nearest_indices] < min_distances min_distances[closer_segments] = distances[np.arange(distances.shape[0]), nearest_indices][closer_segments] best_cos_thetas[closer_segments] = cos_thetas[closer_segments] # Check if all trajectory segments are compliant with their closest lane direction return 1.0 * (best_cos_thetas >= cos_threshold).sum() / best_cos_thetas.shape[0]