from __future__ import annotations
from typing import List, Dict, Any
import cv2
import numpy as np
import numpy.typing as npt
import torch
from nuplan.common.actor_state.state_representation import StateSE2
from nuplan.common.maps.abstract_map import AbstractMap
from nuplan.common.maps.maps_datatypes import SemanticMapLayer
from shapely import affinity, LineString
from shapely.geometry import Polygon
from det_map.data.datasets.dataclasses import Scene
from navsim.planning.training.abstract_feature_target_builder import AbstractTargetBuilder
import networkx as nx
from mmcv.parallel import DataContainer as DC
class LiDARInstanceLines(object):
"""Line instance in LIDAR coordinates"""
def __init__(self,
assert isinstance(instance_line_list, list)
assert patch_size is not None
if len(instance_line_list) != 0:
assert isinstance(instance_line_list[0], LineString)
self.patch_size = patch_size
self.max_x = self.patch_size / 2
self.max_y = self.patch_size / 2
self.sample_dist = sample_dist
self.num_samples = num_samples
self.padding = padding
self.fixed_num = fixed_num
self.padding_value = padding_value
self.instance_list = instance_line_list
self.instance_labels = instance_labels
def fixed_num_sampled_points(self):
return torch.Tensor([N,fixed_num,2]), in xmin, ymin, xmax, ymax form
N means the num of instances
assert len(self.instance_list) != 0
instance_points_list = []
for instance in self.instance_list:
distances = np.linspace(0, instance.length, self.fixed_num)
sampled_points = np.array([list(instance.interpolate(distance).coords) for distance in distances]).reshape(
-1, 2)
instance_points_array = np.array(instance_points_list)
instance_points_tensor = torch.tensor(instance_points_array)
instance_points_tensor = instance_points_tensor.to(
instance_points_tensor[:, :, 0] = torch.clamp(instance_points_tensor[:, :, 0], min=-self.max_x, max=self.max_x)
instance_points_tensor[:, :, 1] = torch.clamp(instance_points_tensor[:, :, 1], min=-self.max_y, max=self.max_y)
return instance_points_tensor
def shift_fixed_num_sampled_points_v2(self):
return [instances_num, num_shifts, fixed_num, 2]
assert len(self.instance_list) != 0
instances_list = []
for idx, instance in enumerate(self.instance_list):
distances = np.linspace(0, instance.length, self.fixed_num)
poly_pts = np.array(list(instance.coords))
start_pts = poly_pts[0]
end_pts = poly_pts[-1]
is_poly = np.equal(start_pts, end_pts)
is_poly = is_poly.all()
shift_pts_list = []
pts_num, coords_num = poly_pts.shape
shift_num = pts_num - 1
final_shift_num = self.fixed_num - 1
sampled_points = np.array(
[list(instance.interpolate(distance).coords) for distance in distances]).reshape(-1, 2)
multi_shifts_pts = np.stack(shift_pts_list, axis=0)
shifts_num, _, _ = multi_shifts_pts.shape
if shifts_num > final_shift_num:
index = np.random.choice(multi_shifts_pts.shape[0], final_shift_num, replace=False)
multi_shifts_pts = multi_shifts_pts[index]
multi_shifts_pts_tensor = torch.tensor(multi_shifts_pts)
multi_shifts_pts_tensor = multi_shifts_pts_tensor.to(
multi_shifts_pts_tensor[:, :, 0] = torch.clamp(multi_shifts_pts_tensor[:, :, 0], min=-self.max_x,
multi_shifts_pts_tensor[:, :, 1] = torch.clamp(multi_shifts_pts_tensor[:, :, 1], min=-self.max_y,
if multi_shifts_pts_tensor.shape[0] < final_shift_num:
padding = torch.full([final_shift_num - multi_shifts_pts_tensor.shape[0], self.fixed_num, 2],
multi_shifts_pts_tensor = torch.cat([multi_shifts_pts_tensor, padding], dim=0)
instances_tensor = torch.stack(instances_list, dim=0)
instances_tensor = instances_tensor.to(
return instances_tensor
class MapTargetBuilder(AbstractTargetBuilder):
def __init__(self):
lidar_resolution_width = 256
lidar_resolution_height = 256
self.dense_layers: List[SemanticMapLayer] = [
self.dense_layers_labels = [
1, 2
self.discrete_layers: List[SemanticMapLayer] = [
self.radius = 32.0
self.bev_pixel_width: int = lidar_resolution_width
self.bev_pixel_height: int = lidar_resolution_height
self.bev_pixel_size: float = 0.25
self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width)
self.padding_value = -10000
self.sample_dist = 1
self.num_samples = 250
self.padding = False
self.fixed_num = 20
def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any:
Transform shapely geometry in local coordinates of origin.
:param geometry: shapely geometry
:param origin: pose dataclass
:return: shapely geometry
a = np.cos(origin.heading)
b = np.sin(origin.heading)
d = -np.sin(origin.heading)
e = np.cos(origin.heading)
xoff = -origin.x
yoff = -origin.y
translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
return rotated_geometry
def _coords_to_pixel(self, coords):
Transform local coordinates in pixel indices of BEV map
:param coords: _description_
:return: _description_
pixel_center = np.array([[0, self.bev_pixel_width / 2.0]])
coords_idcs = (coords / self.bev_pixel_size) + pixel_center
return coords_idcs.astype(np.int32)
def _compute_map_polygon_mask(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
Compute binary mask given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self.radius, layers=layers
map_polygon_mask = np.zeros(self.bev_semantic_frame[::-1], dtype=np.uint8)
for layer in layers:
for map_object in map_object_dict[layer]:
polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose)
exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2))
exterior = self._coords_to_pixel(exterior)
cv2.fillPoly(map_polygon_mask, [exterior], color=255)
map_polygon_mask = np.rot90(map_polygon_mask)[::-1]
return map_polygon_mask > 0
def _compute_map_linestrings(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
Compute binary of linestring given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self.radius, layers=layers
something = []
incoming_something = []
outcoming_something = []
for layer in layers:
for map_object in map_object_dict[layer]:
linestring: LineString = self._geometry_local_coords(
map_object.baseline_path.linestring, ego_pose
for incoming_edge in map_object.incoming_edges:
incomingstring: LineString = self._geometry_local_coords(
incoming_edge.baseline_path.linestring, ego_pose
for outgoing_edge in map_object.outgoing_edges:
outcomingstring: LineString = self._geometry_local_coords(
outgoing_edge.baseline_path.linestring, ego_pose
points = np.array(linestring.coords).reshape((-1, 1, 2))
return something, incoming_something, outcoming_something
def union_centerline(self, centerline_list, incoming_list, outcoming_list):
pts_G = nx.DiGraph()
junction_pts_list = []
start_pt = np.array(centerline_list[0].coords).round(3)[0]
end_pt = np.array(centerline_list[-1].coords).round(3)[-1]
for centerline_geom in centerline_list:
centerline_pts = np.array(centerline_geom.coords).round(3)
start_pt = centerline_pts[0]
end_pt = centerline_pts[-1]
for idx, pts in enumerate(centerline_pts[:-1]):
valid_incoming_num = 0
for pred_geom in incoming_list:
valid_incoming_num += 1
pred_pt = np.array(pred_geom.coords).round(3)[-1]
pts_G.add_edge(tuple(pred_pt), tuple(start_pt))
valid_outgoing_num = 0
for succ_geom in outcoming_list:
valid_outgoing_num += 1
succ_pt = np.array(succ_geom.coords).round(3)[0]
pts_G.add_edge(tuple(end_pt), tuple(succ_pt))
roots = (v for v, d in pts_G.in_degree() if d == 0)
leaves = [v for v, d in pts_G.out_degree() if d == 0]
all_paths = []
for root in roots:
paths = nx.all_simple_paths(pts_G, root, leaves)
final_centerline_paths = []
for path in all_paths:
merged_line = LineString(path)
merged_line = merged_line.simplify(0.2, preserve_topology=True)
return final_centerline_paths
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
map_api = scene.map_api
ego_statuses = [frame.ego_status for frame in scene.frames]
ego2globals = [frame.ego2global for frame in scene.frames]
ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose)
map_dict = {'centerline': []}
line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, ego_status_curr, list(self.discrete_layers))
centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings)
for instance in centerline_list:
vectors = []
gt_labels = []
gt_instance = []
instance_list = map_dict['centerline']
for instance in instance_list:
for instance in vectors:
gt_labels = torch.tensor(gt_labels)
gt_instance = LiDARInstanceLines(gt_instance, gt_labels, self.sample_dist, self.num_samples,
self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2)
return {"dense_el": None,
"gt_bboxes_3d": gt_instance,
"gt_labels_3d": gt_labels}