from enum import IntEnum from typing import Any, Dict, List, Tuple import cv2 import numpy as np import numpy.typing as npt import torch from nuplan.common.actor_state.ego_state import EgoState from nuplan.common.actor_state.oriented_box import OrientedBox from nuplan.common.actor_state.state_representation import StateSE2, TimePoint, StateVector2D from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters from nuplan.common.geometry.convert import absolute_to_relative_poses from nuplan.common.maps.abstract_map import AbstractMap, SemanticMapLayer, MapObject from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling from shapely import affinity from shapely.geometry import Polygon, LineString from torchvision import transforms from det_map.data.datasets.lidar_utils import transform_points from navsim.agents.vadv2.vadv2_config import Vadv2Config from navsim.common.dataclasses import AgentInput, Scene, Annotations from navsim.common.enums import BoundingBoxIndex, LidarIndex from navsim.evaluate.pdm_score import transform_trajectory, get_trajectory_as_array from navsim.planning.scenario_builder.navsim_scenario_utils import tracked_object_types from navsim.planning.simulation.planner.pdm_planner.utils.pdm_enums import StateIndex from navsim.planning.training.abstract_feature_target_builder import ( AbstractFeatureBuilder, AbstractTargetBuilder, ) # todo temporal input # todo velocity regression class Vadv2FeatureBuilder(AbstractFeatureBuilder): def __init__(self, config: Vadv2Config): self._config = config def get_unique_name(self) -> str: """Inherited, see superclass.""" return "transfuser_feature" def compute_features(self, agent_input: AgentInput) -> Dict[str, torch.Tensor]: """Inherited, see superclass.""" features = {} features["camera_feature"] = self._get_camera_feature(agent_input) # todo pers bev if self._config.use_pers_bev_embed: features["pers_bev"] = self._get_pers_bev(agent_input) if self._config.lidar_seq_len == 4: features["lidar_feature"] = self._get_lidar_feature_4f(agent_input) else: features["lidar_feature"] = self._get_lidar_feature(agent_input) ego_status_list = [] for i in range(self._config.num_ego_status): # i=0: idx=-1 # i=1: idx=-2 # i=2: idx=-3 # i=3: idx=-4 idx = - (i + 1) ego_status_list += [ torch.tensor(agent_input.ego_statuses[idx].driving_command, dtype=torch.float32), torch.tensor(agent_input.ego_statuses[idx].ego_velocity, dtype=torch.float32), torch.tensor(agent_input.ego_statuses[idx].ego_acceleration, dtype=torch.float32), ] features["status_feature"] = torch.concatenate( ego_status_list ) # # add image_meta # cams_all_frames = [[ # tmp.cam_f0, # tmp.cam_l0, # # tmp.cam_l1, # # tmp.cam_l2, # tmp.cam_r0, # # tmp.cam_r1, # # tmp.cam_r2, # # tmp.cam_b0 # ] for tmp in agent_input.cameras] # # image, canvas, sensor2lidar_rotation, sensor2lidar_translation, intrinsics, distortion, post_rot, post_tran = [], [], [], [], [], [], [], [] # for cams_frame_t in cams_all_frames: # image_t, canvas_t, sensor2lidar_rotation_t, sensor2lidar_translation_t, intrinsics_t, distortion_t, post_rot_t, post_tran_t = [], [], [], [], [], [], [], [] # for cam in cams_frame_t: # cam_processed: Camera = img_pipeline(cam) # image_t.append(cam_processed.image) # canvas_t.append(cam_processed.canvas) # sensor2lidar_rotation_t.append(cam_processed.sensor2lidar_rotation) # sensor2lidar_translation_t.append(cam_processed.sensor2lidar_translation) # intrinsics_t.append(cam_processed.intrinsics) # distortion_t.append(cam_processed.distortion) # post_rot_t.append(cam_processed.post_rot) # post_tran_t.append(cam_processed.post_tran) # image.append(torch.stack(image_t)) # canvas.append(torch.stack(canvas_t)) # sensor2lidar_rotation.append(torch.stack(sensor2lidar_rotation_t)) # sensor2lidar_translation.append(torch.stack(sensor2lidar_translation_t)) # intrinsics.append(torch.stack(intrinsics_t)) # distortion.append(torch.stack(distortion_t)) # post_rot.append(torch.stack(post_rot_t)) # post_tran.append(torch.stack(post_tran_t)) # # features["sensor2lidar_rotation"] = torch.stack(sensor2lidar_rotation).to(imgs) # features["sensor2lidar_translation"] = torch.stack(sensor2lidar_translation).to(imgs) # features["intrinsics"] = torch.stack(intrinsics).to(imgs) return features def _get_pers_bev(self, agent_input: AgentInput) -> torch.Tensor: return None def _get_camera_feature(self, agent_input: AgentInput) -> torch.Tensor: """ Extract stitched camera from AgentInput :param agent_input: input dataclass :return: stitched front view image as torch tensor """ cameras = agent_input.cameras[-1] # Crop to ensure 4:1 aspect ratio l0 = cameras.cam_l0.image[28:-28, 416:-416] f0 = cameras.cam_f0.image[28:-28] r0 = cameras.cam_r0.image[28:-28, 416:-416] # stitch l0, f0, r0 images stitched_image = np.concatenate([l0, f0, r0], axis=1) resized_image = cv2.resize(stitched_image, (self._config.camera_width, self._config.camera_height)) tensor_image = transforms.ToTensor()(resized_image) return tensor_image def _get_lidar_feature(self, agent_input: AgentInput) -> torch.Tensor: """ Compute LiDAR feature as 2D histogram, according to Transfuser :param agent_input: input dataclass :return: LiDAR histogram as torch tensors """ # only consider (x,y,z) & swap axes for (N,3) numpy array lidar_pc = agent_input.lidars[-1].lidar_pc[LidarIndex.POSITION].T # NOTE: Code from # https://github.com/autonomousvision/carla_garage/blob/main/team_code/data.py#L873 def splat_points(point_cloud): # 256 x 256 grid xbins = np.linspace( self._config.lidar_min_x, self._config.lidar_max_x, (self._config.lidar_max_x - self._config.lidar_min_x) * int(self._config.pixels_per_meter) + 1, ) ybins = np.linspace( self._config.lidar_min_y, self._config.lidar_max_y, (self._config.lidar_max_y - self._config.lidar_min_y) * int(self._config.pixels_per_meter) + 1, ) hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0] hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel overhead_splat = hist / self._config.hist_max_per_pixel return overhead_splat # Remove points above the vehicle lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar] below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height] above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height] above_features = splat_points(above) if self._config.use_ground_plane: below_features = splat_points(below) features = np.stack([below_features, above_features], axis=-1) else: features = np.stack([above_features], axis=-1) features = np.transpose(features, (2, 0, 1)).astype(np.float32) return torch.tensor(features) def _get_lidar_feature_4f(self, agent_input: AgentInput) -> torch.Tensor: """ Compute LiDAR feature as 2D histogram, according to Transfuser :param agent_input: input dataclass :return: LiDAR histogram as torch tensors """ # only consider (x,y,z) & swap axes for (N,3) numpy array # lidar_pc = agent_input.lidars[-1].lidar_pc[LidarIndex.POSITION].T lidars = [np.copy(tmp.lidar_pc) for tmp in agent_input.lidars] # timestamps_ori = agent_input.timestamps # timestamps = [(timestamps_ori[-1] - tmp) / 1e6 for tmp in timestamps_ori] ego2globals = [tmp for tmp in agent_input.ego2globals] global2ego_key = np.linalg.inv(ego2globals[-1]) # ego2global, global2ego key frame lidars_warped = [transform_points(transform_points(pts, mat), global2ego_key) for pts, mat in zip(lidars[:-1], ego2globals[:-1])] lidars_warped.append(lidars[-1]) # NOTE: Code from # https://github.com/autonomousvision/carla_garage/blob/main/team_code/data.py#L873 def splat_points(point_cloud): # 256 x 256 grid xbins = np.linspace( self._config.lidar_min_x, self._config.lidar_max_x, (self._config.lidar_max_x - self._config.lidar_min_x) * int(self._config.pixels_per_meter) + 1, ) ybins = np.linspace( self._config.lidar_min_y, self._config.lidar_max_y, (self._config.lidar_max_y - self._config.lidar_min_y) * int(self._config.pixels_per_meter) + 1, ) hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0] hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel overhead_splat = hist / self._config.hist_max_per_pixel return overhead_splat # Remove points above the vehicle lidar_feats = [] for lidar_pc in lidars_warped: lidar_pc = lidar_pc.T lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar] below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height] above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height] above_features = splat_points(above) if self._config.use_ground_plane: below_features = splat_points(below) features = np.stack([below_features, above_features], axis=-1) else: features = np.stack([above_features], axis=-1) features = np.transpose(features, (2, 0, 1)).astype(np.float32) # append timestamps lidar_feats.append(torch.tensor(features)) return torch.cat(lidar_feats, 0).contiguous() class Vadv2TargetBuilder(AbstractTargetBuilder): def __init__(self, config: Vadv2Config): self._config = config self.v_params = get_pacifica_parameters() # lidar_resolution_width = 256 # lidar_resolution_height = 256 # self.dense_layers: List[SemanticMapLayer] = [ # SemanticMapLayer.DRIVABLE_AREA, # SemanticMapLayer.CROSSWALK # ] # self.dense_layers_labels = [ # 1, 2 # ] # self.discrete_layers: List[SemanticMapLayer] = [ # SemanticMapLayer.LANE, # SemanticMapLayer.LANE_CONNECTOR, # ] # self.radius = 32.0 # self.bev_pixel_width: int = lidar_resolution_width # self.bev_pixel_height: int = lidar_resolution_height # self.bev_pixel_size: float = 0.25 # self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width) # self.padding_value = -10000 # self.sample_dist = 1 # self.num_samples = 250 # self.padding = False # self.fixed_num = 20 def get_unique_name(self) -> str: """Inherited, see superclass.""" return "transfuser_target" def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]: """Inherited, see superclass.""" future_traj = scene.get_future_trajectory( num_trajectory_frames=self._config.trajectory_sampling.num_poses ) trajectory = torch.tensor(future_traj.poses) frame_idx = scene.scene_metadata.num_history_frames - 1 annotations = scene.frames[frame_idx].annotations ego_pose = StateSE2(*scene.frames[frame_idx].ego_status.ego_pose) agent_states, agent_labels = self._compute_agent_targets(annotations) bev_semantic_map = self._compute_bev_semantic_map(annotations, scene.map_api, ego_pose) ego_state = EgoState.build_from_rear_axle( StateSE2(*scene.frames[frame_idx].ego_status.ego_pose), tire_steering_angle=0.0, vehicle_parameters=self.v_params, time_point=TimePoint(scene.frames[frame_idx].timestamp), rear_axle_velocity_2d=StateVector2D( *scene.frames[frame_idx].ego_status.ego_velocity ), rear_axle_acceleration_2d=StateVector2D( *scene.frames[frame_idx].ego_status.ego_acceleration ), ) trans_traj = transform_trajectory( future_traj, ego_state ) interpolated_traj = get_trajectory_as_array( trans_traj, TrajectorySampling(num_poses=40, interval_length=0.1), ego_state.time_point ) rel_poses = absolute_to_relative_poses([StateSE2(*tmp) for tmp in interpolated_traj[:, StateIndex.STATE_SE2]]) # skip the curr frame final_traj = [pose.serialize() for pose in rel_poses[1:]] final_traj = torch.tensor(final_traj) #TODO:map # map_api = scene.map_api # ego_statuses = [frame.ego_status for frame in scene.frames] # ego2globals = [frame.ego2global for frame in scene.frames] # # Last one is the current frame # ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose) # # dense # # dense_semantic_map = np.zeros(self.bev_semantic_frame, dtype=np.int64) # # for layer, label in zip(self.dense_layers, self.dense_layers_labels): # # entity_mask = self._compute_map_polygon_mask(map_api, ego_status_curr, [layer]) # # dense_semantic_map[entity_mask] = label # # discrete # # centerline_list # map_dict = {'centerline': []} # line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, # ego_status_curr, # list( # self.discrete_layers)) # centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings) # for instance in centerline_list: # map_dict['centerline'].append(np.array(instance.coords)) # vectors = [] # gt_labels = [] # gt_instance = [] # instance_list = map_dict['centerline'] # for instance in instance_list: # vectors.append(LineString(np.array(instance))) # for instance in vectors: # gt_instance.append(instance) # gt_labels.append(0) # gt_semantic_mask = None # gt_pv_semantic_mask = None # gt_instance = LiDARInstanceLines(gt_instance, self.sample_dist, self.num_samples, # self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2) return { #"gt_depth":????????????? # "gt_bboxes_3d": gt_instance, # "gt_labels_3d": gt_labels, "trajectory": trajectory, "agent_states": agent_states, "agent_labels": agent_labels, "bev_semantic_map": bev_semantic_map, "interpolated_traj": final_traj } def _compute_agent_targets(self, annotations: Annotations) -> Tuple[torch.Tensor, torch.Tensor]: """ Extracts 2D agent bounding boxes in ego coordinates :param annotations: annotation dataclass :return: tuple of bounding box values and labels (binary) """ max_agents = self._config.num_bounding_boxes agent_states_list: List[npt.NDArray[np.float32]] = [] def _xy_in_lidar(x: float, y: float, config: Vadv2Config) -> bool: return (config.lidar_min_x <= x <= config.lidar_max_x) and ( config.lidar_min_y <= y <= config.lidar_max_y ) for box, name in zip(annotations.boxes, annotations.names): box_x, box_y, box_heading, box_length, box_width = ( box[BoundingBoxIndex.X], box[BoundingBoxIndex.Y], box[BoundingBoxIndex.HEADING], box[BoundingBoxIndex.LENGTH], box[BoundingBoxIndex.WIDTH], ) if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config): agent_states_list.append( np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32) ) agents_states_arr = np.array(agent_states_list) # filter num_instances nearest agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32) agent_labels = np.zeros(max_agents, dtype=bool) if len(agents_states_arr) > 0: distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1) argsort = np.argsort(distances)[:max_agents] # filter detections agents_states_arr = agents_states_arr[argsort] agent_states[: len(agents_states_arr)] = agents_states_arr agent_labels[: len(agents_states_arr)] = True return torch.tensor(agent_states), torch.tensor(agent_labels) def _compute_bev_semantic_map( self, annotations: Annotations, map_api: AbstractMap, ego_pose: StateSE2 ) -> torch.Tensor: """ Creates sematic map in BEV :param annotations: annotation dataclass :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :return: 2D torch tensor of semantic labels """ bev_semantic_map = np.zeros(self._config.bev_semantic_frame, dtype=np.int64) for label, (entity_type, layers) in self._config.bev_semantic_classes.items(): if entity_type == "polygon": entity_mask = self._compute_map_polygon_mask(map_api, ego_pose, layers) elif entity_type == "linestring": entity_mask = self._compute_map_linestring_mask(map_api, ego_pose, layers) else: entity_mask = self._compute_box_mask(annotations, layers) bev_semantic_map[entity_mask] = label return torch.Tensor(bev_semantic_map) def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any: """ Transform shapely geometry in local coordinates of origin. :param geometry: shapely geometry :param origin: pose dataclass :return: shapely geometry """ a = np.cos(origin.heading) b = np.sin(origin.heading) d = -np.sin(origin.heading) e = np.cos(origin.heading) xoff = -origin.x yoff = -origin.y translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff]) rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0]) return rotated_geometry def _coords_to_pixel(self, coords): """ Transform local coordinates in pixel indices of BEV map :param coords: _description_ :return: _description_ """ # NOTE: remove half in backward direction pixel_center = np.array([[0, self.bev_pixel_width / 2.0]]) coords_idcs = (coords / self.bev_pixel_size) + pixel_center return coords_idcs.astype(np.int32) def _compute_map_linestrings( self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer] ) -> npt.NDArray[np.bool_]: """ Compute binary of linestring given a map layer class :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :param layers: map layers :return: binary mask as numpy array """ map_object_dict = map_api.get_proximal_map_objects( point=ego_pose.point, radius=self.radius, layers=layers ) something = [] incoming_something = [] outcoming_something = [] for layer in layers: for map_object in map_object_dict[layer]: linestring: LineString = self._geometry_local_coords( map_object.baseline_path.linestring, ego_pose ) something.append(linestring) for incoming_edge in map_object.incoming_edges: incomingstring: LineString = self._geometry_local_coords( incoming_edge.baseline_path.linestring, ego_pose ) incoming_something.append(incomingstring) for outgoing_edge in map_object.outgoing_edges: outcomingstring: LineString = self._geometry_local_coords( outgoing_edge.baseline_path.linestring, ego_pose ) outcoming_something.append(outcomingstring) # todo points = np.array(linestring.coords).reshape((-1, 1, 2)) return something, incoming_something, outcoming_something def union_centerline(self, centerline_list, incoming_list, outcoming_list): pts_G = nx.DiGraph() junction_pts_list = [] start_pt = np.array(centerline_list[0].coords).round(3)[0] end_pt = np.array(centerline_list[-1].coords).round(3)[-1] for centerline_geom in centerline_list: centerline_pts = np.array(centerline_geom.coords).round(3) start_pt = centerline_pts[0] end_pt = centerline_pts[-1] for idx, pts in enumerate(centerline_pts[:-1]): pts_G.add_edge(tuple(centerline_pts[idx]), tuple(centerline_pts[idx + 1])) valid_incoming_num = 0 for pred_geom in incoming_list: valid_incoming_num += 1 pred_pt = np.array(pred_geom.coords).round(3)[-1] pts_G.add_edge(tuple(pred_pt), tuple(start_pt)) valid_outgoing_num = 0 for succ_geom in outcoming_list: valid_outgoing_num += 1 succ_pt = np.array(succ_geom.coords).round(3)[0] pts_G.add_edge(tuple(end_pt), tuple(succ_pt)) roots = (v for v, d in pts_G.in_degree() if d == 0) leaves = [v for v, d in pts_G.out_degree() if d == 0] all_paths = [] for root in roots: paths = nx.all_simple_paths(pts_G, root, leaves) all_paths.extend(paths) final_centerline_paths = [] for path in all_paths: merged_line = LineString(path) merged_line = merged_line.simplify(0.2, preserve_topology=True) final_centerline_paths.append(merged_line) return final_centerline_paths # def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]: # map_api = scene.map_api # ego_statuses = [frame.ego_status for frame in scene.frames] # ego2globals = [frame.ego2global for frame in scene.frames] # # Last one is the current frame # ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose) # # # dense # # dense_semantic_map = np.zeros(self.bev_semantic_frame, dtype=np.int64) # # for layer, label in zip(self.dense_layers, self.dense_layers_labels): # # entity_mask = self._compute_map_polygon_mask(map_api, ego_status_curr, [layer]) # # dense_semantic_map[entity_mask] = label # # # discrete # # centerline_list # map_dict = {'centerline': []} # line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, # ego_status_curr, # list( # self.discrete_layers)) # centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings) # for instance in centerline_list: # map_dict['centerline'].append(np.array(instance.coords)) # # vectors = [] # gt_labels = [] # gt_instance = [] # instance_list = map_dict['centerline'] # for instance in instance_list: # vectors.append(LineString(np.array(instance))) # for instance in vectors: # gt_instance.append(instance) # gt_labels.append(0) # gt_semantic_mask = None # gt_pv_semantic_mask = None # gt_instance = LiDARInstanceLines(gt_instance, self.sample_dist, self.num_samples, # self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2) # # return {"dense_el": None, # "gt_bboxes_3d": gt_instance, # "gt_labels_3d": gt_labels} def _compute_map_polygon_mask( self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer] ) -> npt.NDArray[np.bool_]: """ Compute binary mask given a map layer class :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :param layers: map layers :return: binary mask as numpy array """ map_object_dict = map_api.get_proximal_map_objects( point=ego_pose.point, radius=self._config.bev_radius, layers=layers ) map_polygon_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8) for layer in layers: for map_object in map_object_dict[layer]: polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose) exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2)) exterior = self._coords_to_pixel(exterior) cv2.fillPoly(map_polygon_mask, [exterior], color=255) # OpenCV has origin on top-left corner map_polygon_mask = np.rot90(map_polygon_mask)[::-1] return map_polygon_mask > 0 def _compute_map_linestring_mask( self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer] ) -> npt.NDArray[np.bool_]: """ Compute binary of linestring given a map layer class :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :param layers: map layers :return: binary mask as numpy array """ map_object_dict = map_api.get_proximal_map_objects( point=ego_pose.point, radius=self._config.bev_radius, layers=layers ) map_linestring_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8) for layer in layers: for map_object in map_object_dict[layer]: linestring: LineString = self._geometry_local_coords( map_object.baseline_path.linestring, ego_pose ) points = np.array(linestring.coords).reshape((-1, 1, 2)) points = self._coords_to_pixel(points) cv2.polylines(map_linestring_mask, [points], isClosed=False, color=255, thickness=2) # OpenCV has origin on top-left corner map_linestring_mask = np.rot90(map_linestring_mask)[::-1] return map_linestring_mask > 0 def _compute_box_mask( self, annotations: Annotations, layers: TrackedObjectType ) -> npt.NDArray[np.bool_]: """ Compute binary of bounding boxes in BEV space :param annotations: annotation dataclass :param layers: bounding box labels to include :return: binary mask as numpy array """ box_polygon_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8) for name_value, box_value in zip(annotations.names, annotations.boxes): agent_type = tracked_object_types[name_value] if agent_type in layers: # box_value = (x, y, z, length, width, height, yaw) TODO: add intenum x, y, heading = box_value[0], box_value[1], box_value[-1] box_length, box_width, box_height = box_value[3], box_value[4], box_value[5] agent_box = OrientedBox(StateSE2(x, y, heading), box_length, box_width, box_height) exterior = np.array(agent_box.geometry.exterior.coords).reshape((-1, 1, 2)) exterior = self._coords_to_pixel(exterior) cv2.fillPoly(box_polygon_mask, [exterior], color=255) # OpenCV has origin on top-left corner box_polygon_mask = np.rot90(box_polygon_mask)[::-1] return box_polygon_mask > 0 @staticmethod def _query_map_objects( self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer] ) -> List[MapObject]: """ Queries map objects :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :param layers: map layers :return: list of map objects """ # query map api with interesting layers map_object_dict = map_api.get_proximal_map_objects( point=ego_pose.point, radius=self, layers=layers ) map_objects: List[MapObject] = [] for layer in layers: map_objects += map_object_dict[layer] return map_objects @staticmethod def _geometry_local_coords(geometry: Any, origin: StateSE2) -> Any: """ Transform shapely geometry in local coordinates of origin. :param geometry: shapely geometry :param origin: pose dataclass :return: shapely geometry """ a = np.cos(origin.heading) b = np.sin(origin.heading) d = -np.sin(origin.heading) e = np.cos(origin.heading) xoff = -origin.x yoff = -origin.y translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff]) rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0]) return rotated_geometry def _coords_to_pixel(self, coords): """ Transform local coordinates in pixel indices of BEV map :param coords: _description_ :return: _description_ """ # NOTE: remove half in backward direction pixel_center = np.array([[0, self._config.bev_pixel_width / 2.0]]) coords_idcs = (coords / self._config.bev_pixel_size) + pixel_center return coords_idcs.astype(np.int32) class BoundingBox2DIndex(IntEnum): _X = 0 _Y = 1 _HEADING = 2 _LENGTH = 3 _WIDTH = 4 @classmethod def size(cls): valid_attributes = [ attribute for attribute in dir(cls) if attribute.startswith("_") and not attribute.startswith("__") and not callable(getattr(cls, attribute)) ] return len(valid_attributes) @classmethod @property def X(cls): return cls._X @classmethod @property def Y(cls): return cls._Y @classmethod @property def HEADING(cls): return cls._HEADING @classmethod @property def LENGTH(cls): return cls._LENGTH @classmethod @property def WIDTH(cls): return cls._WIDTH @classmethod @property def POINT(cls): # assumes X, Y have subsequent indices return slice(cls._X, cls._Y + 1) @classmethod @property def STATE_SE2(cls): # assumes X, Y, HEADING have subsequent indices return slice(cls._X, cls._HEADING + 1)