from __future__ import annotations from typing import List, Dict, Any import cv2 import numpy as np import numpy.typing as npt import torch from nuplan.common.actor_state.state_representation import StateSE2 from nuplan.common.maps.abstract_map import AbstractMap from nuplan.common.maps.maps_datatypes import SemanticMapLayer from shapely import affinity, LineString from shapely.geometry import Polygon from det_map.data.datasets.dataclasses import Scene from navsim.planning.training.abstract_feature_target_builder import AbstractTargetBuilder import networkx as nx # from mmdet.datasets.pipelines import to_tensor from mmcv.parallel import DataContainer as DC class LiDARInstanceLines(object): """Line instance in LIDAR coordinates""" def __init__(self, instance_line_list, instance_labels, sample_dist=1, num_samples=250, padding=False, fixed_num=-1, padding_value=-10000, patch_size=None): assert isinstance(instance_line_list, list) assert patch_size is not None if len(instance_line_list) != 0: assert isinstance(instance_line_list[0], LineString) self.patch_size = patch_size self.max_x = self.patch_size / 2 self.max_y = self.patch_size / 2 self.sample_dist = sample_dist self.num_samples = num_samples self.padding = padding self.fixed_num = fixed_num self.padding_value = padding_value self.instance_list = instance_line_list self.instance_labels = instance_labels @property def fixed_num_sampled_points(self): """ return torch.Tensor([N,fixed_num,2]), in xmin, ymin, xmax, ymax form N means the num of instances """ assert len(self.instance_list) != 0 instance_points_list = [] for instance in self.instance_list: distances = np.linspace(0, instance.length, self.fixed_num) sampled_points = np.array([list(instance.interpolate(distance).coords) for distance in distances]).reshape( -1, 2) instance_points_list.append(sampled_points) instance_points_array = np.array(instance_points_list) instance_points_tensor = torch.tensor(instance_points_array) instance_points_tensor = instance_points_tensor.to( dtype=torch.float32) instance_points_tensor[:, :, 0] = torch.clamp(instance_points_tensor[:, :, 0], min=-self.max_x, max=self.max_x) instance_points_tensor[:, :, 1] = torch.clamp(instance_points_tensor[:, :, 1], min=-self.max_y, max=self.max_y) return instance_points_tensor @property def shift_fixed_num_sampled_points_v2(self): """ return [instances_num, num_shifts, fixed_num, 2] """ assert len(self.instance_list) != 0 instances_list = [] for idx, instance in enumerate(self.instance_list): # import ipdb;ipdb.set_trace() # instance_label = self.instance_labels[idx] distances = np.linspace(0, instance.length, self.fixed_num) poly_pts = np.array(list(instance.coords)) start_pts = poly_pts[0] end_pts = poly_pts[-1] is_poly = np.equal(start_pts, end_pts) is_poly = is_poly.all() shift_pts_list = [] pts_num, coords_num = poly_pts.shape shift_num = pts_num - 1 final_shift_num = self.fixed_num - 1 # if instance_label == 3: # import ipdb;ipdb.set_trace() # 永远是centerline sampled_points = np.array( [list(instance.interpolate(distance).coords) for distance in distances]).reshape(-1, 2) shift_pts_list.append(sampled_points) multi_shifts_pts = np.stack(shift_pts_list, axis=0) shifts_num, _, _ = multi_shifts_pts.shape if shifts_num > final_shift_num: index = np.random.choice(multi_shifts_pts.shape[0], final_shift_num, replace=False) multi_shifts_pts = multi_shifts_pts[index] multi_shifts_pts_tensor = torch.tensor(multi_shifts_pts) multi_shifts_pts_tensor = multi_shifts_pts_tensor.to( dtype=torch.float32) multi_shifts_pts_tensor[:, :, 0] = torch.clamp(multi_shifts_pts_tensor[:, :, 0], min=-self.max_x, max=self.max_x) multi_shifts_pts_tensor[:, :, 1] = torch.clamp(multi_shifts_pts_tensor[:, :, 1], min=-self.max_y, max=self.max_y) # if not is_poly: if multi_shifts_pts_tensor.shape[0] < final_shift_num: padding = torch.full([final_shift_num - multi_shifts_pts_tensor.shape[0], self.fixed_num, 2], self.padding_value) multi_shifts_pts_tensor = torch.cat([multi_shifts_pts_tensor, padding], dim=0) instances_list.append(multi_shifts_pts_tensor) instances_tensor = torch.stack(instances_list, dim=0) instances_tensor = instances_tensor.to( dtype=torch.float32) return instances_tensor class MapTargetBuilder(AbstractTargetBuilder): def __init__(self): super().__init__() lidar_resolution_width = 256 lidar_resolution_height = 256 self.dense_layers: List[SemanticMapLayer] = [ SemanticMapLayer.DRIVABLE_AREA, SemanticMapLayer.CROSSWALK ] self.dense_layers_labels = [ 1, 2 ] self.discrete_layers: List[SemanticMapLayer] = [ SemanticMapLayer.LANE, SemanticMapLayer.LANE_CONNECTOR, ] self.radius = 32.0 self.bev_pixel_width: int = lidar_resolution_width self.bev_pixel_height: int = lidar_resolution_height self.bev_pixel_size: float = 0.25 self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width) self.padding_value = -10000 self.sample_dist = 1 self.num_samples = 250 self.padding = False self.fixed_num = 20 def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any: """ Transform shapely geometry in local coordinates of origin. :param geometry: shapely geometry :param origin: pose dataclass :return: shapely geometry """ a = np.cos(origin.heading) b = np.sin(origin.heading) d = -np.sin(origin.heading) e = np.cos(origin.heading) xoff = -origin.x yoff = -origin.y translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff]) rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0]) return rotated_geometry def _coords_to_pixel(self, coords): """ Transform local coordinates in pixel indices of BEV map :param coords: _description_ :return: _description_ """ # NOTE: remove half in backward direction pixel_center = np.array([[0, self.bev_pixel_width / 2.0]]) coords_idcs = (coords / self.bev_pixel_size) + pixel_center return coords_idcs.astype(np.int32) def _compute_map_polygon_mask( self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer] ) -> npt.NDArray[np.bool_]: """ Compute binary mask given a map layer class :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :param layers: map layers :return: binary mask as numpy array """ map_object_dict = map_api.get_proximal_map_objects( point=ego_pose.point, radius=self.radius, layers=layers ) map_polygon_mask = np.zeros(self.bev_semantic_frame[::-1], dtype=np.uint8) for layer in layers: for map_object in map_object_dict[layer]: polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose) exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2)) exterior = self._coords_to_pixel(exterior) cv2.fillPoly(map_polygon_mask, [exterior], color=255) # OpenCV has origin on top-left corner map_polygon_mask = np.rot90(map_polygon_mask)[::-1] return map_polygon_mask > 0 def _compute_map_linestrings( self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer] ) -> npt.NDArray[np.bool_]: """ Compute binary of linestring given a map layer class :param map_api: map interface of nuPlan :param ego_pose: ego pose in global frame :param layers: map layers :return: binary mask as numpy array """ map_object_dict = map_api.get_proximal_map_objects( point=ego_pose.point, radius=self.radius, layers=layers ) something = [] incoming_something = [] outcoming_something = [] for layer in layers: for map_object in map_object_dict[layer]: linestring: LineString = self._geometry_local_coords( map_object.baseline_path.linestring, ego_pose ) something.append(linestring) for incoming_edge in map_object.incoming_edges: incomingstring: LineString = self._geometry_local_coords( incoming_edge.baseline_path.linestring, ego_pose ) incoming_something.append(incomingstring) for outgoing_edge in map_object.outgoing_edges: outcomingstring: LineString = self._geometry_local_coords( outgoing_edge.baseline_path.linestring, ego_pose ) outcoming_something.append(outcomingstring) # todo points = np.array(linestring.coords).reshape((-1, 1, 2)) return something, incoming_something, outcoming_something def union_centerline(self, centerline_list, incoming_list, outcoming_list): pts_G = nx.DiGraph() junction_pts_list = [] start_pt = np.array(centerline_list[0].coords).round(3)[0] end_pt = np.array(centerline_list[-1].coords).round(3)[-1] for centerline_geom in centerline_list: centerline_pts = np.array(centerline_geom.coords).round(3) start_pt = centerline_pts[0] end_pt = centerline_pts[-1] for idx, pts in enumerate(centerline_pts[:-1]): pts_G.add_edge(tuple(centerline_pts[idx]),tuple(centerline_pts[idx+1])) valid_incoming_num = 0 for pred_geom in incoming_list: valid_incoming_num += 1 pred_pt = np.array(pred_geom.coords).round(3)[-1] pts_G.add_edge(tuple(pred_pt), tuple(start_pt)) valid_outgoing_num = 0 for succ_geom in outcoming_list: valid_outgoing_num += 1 succ_pt = np.array(succ_geom.coords).round(3)[0] pts_G.add_edge(tuple(end_pt), tuple(succ_pt)) roots = (v for v, d in pts_G.in_degree() if d == 0) leaves = [v for v, d in pts_G.out_degree() if d == 0] all_paths = [] for root in roots: paths = nx.all_simple_paths(pts_G, root, leaves) all_paths.extend(paths) final_centerline_paths = [] for path in all_paths: merged_line = LineString(path) merged_line = merged_line.simplify(0.2, preserve_topology=True) final_centerline_paths.append(merged_line) return final_centerline_paths def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]: map_api = scene.map_api ego_statuses = [frame.ego_status for frame in scene.frames] ego2globals = [frame.ego2global for frame in scene.frames] # Last one is the current frame ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose) # dense # dense_semantic_map = np.zeros(self.bev_semantic_frame, dtype=np.int64) # for layer, label in zip(self.dense_layers, self.dense_layers_labels): # entity_mask = self._compute_map_polygon_mask(map_api, ego_status_curr, [layer]) # dense_semantic_map[entity_mask] = label # discrete # centerline_list map_dict = {'centerline': []} line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, ego_status_curr, list(self.discrete_layers)) centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings) for instance in centerline_list: map_dict['centerline'].append(np.array(instance.coords)) vectors = [] gt_labels = [] gt_instance = [] instance_list = map_dict['centerline'] for instance in instance_list: vectors.append(LineString(np.array(instance))) for instance in vectors: gt_instance.append(instance) gt_labels.append(0) gt_semantic_mask=None gt_pv_semantic_mask=None gt_labels = torch.tensor(gt_labels) # print(type(gt_labels)) gt_instance = LiDARInstanceLines(gt_instance, gt_labels, self.sample_dist, self.num_samples, self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2) # gt_instance = DC(gt_instance, cpu_only=True) # gt_labels = DC(gt_labels, cpu_only=False) return {"dense_el": None, "gt_bboxes_3d": gt_instance, "gt_labels_3d": gt_labels}