|
from __future__ import annotations
|
|
|
|
from typing import List, Dict, Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import numpy.typing as npt
|
|
import torch
|
|
from nuplan.common.actor_state.state_representation import StateSE2
|
|
from nuplan.common.maps.abstract_map import AbstractMap
|
|
from nuplan.common.maps.maps_datatypes import SemanticMapLayer
|
|
from shapely import affinity, LineString
|
|
from shapely.geometry import Polygon
|
|
|
|
from det_map.data.datasets.dataclasses import Scene
|
|
from navsim.planning.training.abstract_feature_target_builder import AbstractTargetBuilder
|
|
import networkx as nx
|
|
|
|
from mmcv.parallel import DataContainer as DC
|
|
|
|
class LiDARInstanceLines(object):
|
|
"""Line instance in LIDAR coordinates"""
|
|
|
|
def __init__(self,
|
|
instance_line_list,
|
|
instance_labels,
|
|
sample_dist=1,
|
|
num_samples=250,
|
|
padding=False,
|
|
fixed_num=-1,
|
|
padding_value=-10000,
|
|
patch_size=None):
|
|
assert isinstance(instance_line_list, list)
|
|
assert patch_size is not None
|
|
if len(instance_line_list) != 0:
|
|
assert isinstance(instance_line_list[0], LineString)
|
|
self.patch_size = patch_size
|
|
self.max_x = self.patch_size / 2
|
|
self.max_y = self.patch_size / 2
|
|
self.sample_dist = sample_dist
|
|
self.num_samples = num_samples
|
|
self.padding = padding
|
|
self.fixed_num = fixed_num
|
|
self.padding_value = padding_value
|
|
|
|
self.instance_list = instance_line_list
|
|
self.instance_labels = instance_labels
|
|
|
|
@property
|
|
def fixed_num_sampled_points(self):
|
|
"""
|
|
return torch.Tensor([N,fixed_num,2]), in xmin, ymin, xmax, ymax form
|
|
N means the num of instances
|
|
"""
|
|
assert len(self.instance_list) != 0
|
|
instance_points_list = []
|
|
for instance in self.instance_list:
|
|
distances = np.linspace(0, instance.length, self.fixed_num)
|
|
sampled_points = np.array([list(instance.interpolate(distance).coords) for distance in distances]).reshape(
|
|
-1, 2)
|
|
instance_points_list.append(sampled_points)
|
|
instance_points_array = np.array(instance_points_list)
|
|
instance_points_tensor = torch.tensor(instance_points_array)
|
|
instance_points_tensor = instance_points_tensor.to(
|
|
dtype=torch.float32)
|
|
instance_points_tensor[:, :, 0] = torch.clamp(instance_points_tensor[:, :, 0], min=-self.max_x, max=self.max_x)
|
|
instance_points_tensor[:, :, 1] = torch.clamp(instance_points_tensor[:, :, 1], min=-self.max_y, max=self.max_y)
|
|
return instance_points_tensor
|
|
|
|
@property
|
|
def shift_fixed_num_sampled_points_v2(self):
|
|
"""
|
|
return [instances_num, num_shifts, fixed_num, 2]
|
|
"""
|
|
assert len(self.instance_list) != 0
|
|
instances_list = []
|
|
for idx, instance in enumerate(self.instance_list):
|
|
|
|
|
|
distances = np.linspace(0, instance.length, self.fixed_num)
|
|
poly_pts = np.array(list(instance.coords))
|
|
start_pts = poly_pts[0]
|
|
end_pts = poly_pts[-1]
|
|
is_poly = np.equal(start_pts, end_pts)
|
|
is_poly = is_poly.all()
|
|
shift_pts_list = []
|
|
pts_num, coords_num = poly_pts.shape
|
|
shift_num = pts_num - 1
|
|
final_shift_num = self.fixed_num - 1
|
|
|
|
|
|
|
|
sampled_points = np.array(
|
|
[list(instance.interpolate(distance).coords) for distance in distances]).reshape(-1, 2)
|
|
shift_pts_list.append(sampled_points)
|
|
|
|
|
|
multi_shifts_pts = np.stack(shift_pts_list, axis=0)
|
|
shifts_num, _, _ = multi_shifts_pts.shape
|
|
|
|
if shifts_num > final_shift_num:
|
|
index = np.random.choice(multi_shifts_pts.shape[0], final_shift_num, replace=False)
|
|
multi_shifts_pts = multi_shifts_pts[index]
|
|
|
|
multi_shifts_pts_tensor = torch.tensor(multi_shifts_pts)
|
|
multi_shifts_pts_tensor = multi_shifts_pts_tensor.to(
|
|
dtype=torch.float32)
|
|
|
|
multi_shifts_pts_tensor[:, :, 0] = torch.clamp(multi_shifts_pts_tensor[:, :, 0], min=-self.max_x,
|
|
max=self.max_x)
|
|
multi_shifts_pts_tensor[:, :, 1] = torch.clamp(multi_shifts_pts_tensor[:, :, 1], min=-self.max_y,
|
|
max=self.max_y)
|
|
|
|
if multi_shifts_pts_tensor.shape[0] < final_shift_num:
|
|
padding = torch.full([final_shift_num - multi_shifts_pts_tensor.shape[0], self.fixed_num, 2],
|
|
self.padding_value)
|
|
multi_shifts_pts_tensor = torch.cat([multi_shifts_pts_tensor, padding], dim=0)
|
|
instances_list.append(multi_shifts_pts_tensor)
|
|
instances_tensor = torch.stack(instances_list, dim=0)
|
|
instances_tensor = instances_tensor.to(
|
|
dtype=torch.float32)
|
|
return instances_tensor
|
|
|
|
|
|
class MapTargetBuilder(AbstractTargetBuilder):
|
|
def __init__(self):
|
|
super().__init__()
|
|
lidar_resolution_width = 256
|
|
lidar_resolution_height = 256
|
|
self.dense_layers: List[SemanticMapLayer] = [
|
|
SemanticMapLayer.DRIVABLE_AREA,
|
|
SemanticMapLayer.CROSSWALK
|
|
]
|
|
self.dense_layers_labels = [
|
|
1, 2
|
|
]
|
|
|
|
self.discrete_layers: List[SemanticMapLayer] = [
|
|
SemanticMapLayer.LANE,
|
|
SemanticMapLayer.LANE_CONNECTOR,
|
|
]
|
|
|
|
self.radius = 32.0
|
|
self.bev_pixel_width: int = lidar_resolution_width
|
|
self.bev_pixel_height: int = lidar_resolution_height
|
|
self.bev_pixel_size: float = 0.25
|
|
self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width)
|
|
self.padding_value = -10000
|
|
self.sample_dist = 1
|
|
self.num_samples = 250
|
|
self.padding = False
|
|
self.fixed_num = 20
|
|
|
|
def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any:
|
|
"""
|
|
Transform shapely geometry in local coordinates of origin.
|
|
:param geometry: shapely geometry
|
|
:param origin: pose dataclass
|
|
:return: shapely geometry
|
|
"""
|
|
|
|
a = np.cos(origin.heading)
|
|
b = np.sin(origin.heading)
|
|
d = -np.sin(origin.heading)
|
|
e = np.cos(origin.heading)
|
|
xoff = -origin.x
|
|
yoff = -origin.y
|
|
|
|
translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
|
|
rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
|
|
|
|
return rotated_geometry
|
|
|
|
def _coords_to_pixel(self, coords):
|
|
"""
|
|
Transform local coordinates in pixel indices of BEV map
|
|
:param coords: _description_
|
|
:return: _description_
|
|
"""
|
|
|
|
|
|
pixel_center = np.array([[0, self.bev_pixel_width / 2.0]])
|
|
coords_idcs = (coords / self.bev_pixel_size) + pixel_center
|
|
|
|
return coords_idcs.astype(np.int32)
|
|
|
|
def _compute_map_polygon_mask(
|
|
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
|
) -> npt.NDArray[np.bool_]:
|
|
"""
|
|
Compute binary mask given a map layer class
|
|
:param map_api: map interface of nuPlan
|
|
:param ego_pose: ego pose in global frame
|
|
:param layers: map layers
|
|
:return: binary mask as numpy array
|
|
"""
|
|
|
|
map_object_dict = map_api.get_proximal_map_objects(
|
|
point=ego_pose.point, radius=self.radius, layers=layers
|
|
)
|
|
map_polygon_mask = np.zeros(self.bev_semantic_frame[::-1], dtype=np.uint8)
|
|
for layer in layers:
|
|
for map_object in map_object_dict[layer]:
|
|
polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose)
|
|
exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2))
|
|
exterior = self._coords_to_pixel(exterior)
|
|
cv2.fillPoly(map_polygon_mask, [exterior], color=255)
|
|
|
|
map_polygon_mask = np.rot90(map_polygon_mask)[::-1]
|
|
return map_polygon_mask > 0
|
|
|
|
def _compute_map_linestrings(
|
|
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
|
) -> npt.NDArray[np.bool_]:
|
|
"""
|
|
Compute binary of linestring given a map layer class
|
|
:param map_api: map interface of nuPlan
|
|
:param ego_pose: ego pose in global frame
|
|
:param layers: map layers
|
|
:return: binary mask as numpy array
|
|
"""
|
|
map_object_dict = map_api.get_proximal_map_objects(
|
|
point=ego_pose.point, radius=self.radius, layers=layers
|
|
)
|
|
something = []
|
|
incoming_something = []
|
|
outcoming_something = []
|
|
for layer in layers:
|
|
for map_object in map_object_dict[layer]:
|
|
linestring: LineString = self._geometry_local_coords(
|
|
map_object.baseline_path.linestring, ego_pose
|
|
)
|
|
something.append(linestring)
|
|
for incoming_edge in map_object.incoming_edges:
|
|
incomingstring: LineString = self._geometry_local_coords(
|
|
incoming_edge.baseline_path.linestring, ego_pose
|
|
)
|
|
incoming_something.append(incomingstring)
|
|
|
|
for outgoing_edge in map_object.outgoing_edges:
|
|
outcomingstring: LineString = self._geometry_local_coords(
|
|
outgoing_edge.baseline_path.linestring, ego_pose
|
|
)
|
|
outcoming_something.append(outcomingstring)
|
|
|
|
points = np.array(linestring.coords).reshape((-1, 1, 2))
|
|
|
|
return something, incoming_something, outcoming_something
|
|
|
|
def union_centerline(self, centerline_list, incoming_list, outcoming_list):
|
|
pts_G = nx.DiGraph()
|
|
junction_pts_list = []
|
|
start_pt = np.array(centerline_list[0].coords).round(3)[0]
|
|
end_pt = np.array(centerline_list[-1].coords).round(3)[-1]
|
|
for centerline_geom in centerline_list:
|
|
centerline_pts = np.array(centerline_geom.coords).round(3)
|
|
start_pt = centerline_pts[0]
|
|
end_pt = centerline_pts[-1]
|
|
for idx, pts in enumerate(centerline_pts[:-1]):
|
|
pts_G.add_edge(tuple(centerline_pts[idx]),tuple(centerline_pts[idx+1]))
|
|
|
|
valid_incoming_num = 0
|
|
for pred_geom in incoming_list:
|
|
valid_incoming_num += 1
|
|
pred_pt = np.array(pred_geom.coords).round(3)[-1]
|
|
pts_G.add_edge(tuple(pred_pt), tuple(start_pt))
|
|
|
|
valid_outgoing_num = 0
|
|
for succ_geom in outcoming_list:
|
|
valid_outgoing_num += 1
|
|
succ_pt = np.array(succ_geom.coords).round(3)[0]
|
|
pts_G.add_edge(tuple(end_pt), tuple(succ_pt))
|
|
|
|
roots = (v for v, d in pts_G.in_degree() if d == 0)
|
|
leaves = [v for v, d in pts_G.out_degree() if d == 0]
|
|
all_paths = []
|
|
for root in roots:
|
|
paths = nx.all_simple_paths(pts_G, root, leaves)
|
|
all_paths.extend(paths)
|
|
final_centerline_paths = []
|
|
for path in all_paths:
|
|
merged_line = LineString(path)
|
|
merged_line = merged_line.simplify(0.2, preserve_topology=True)
|
|
final_centerline_paths.append(merged_line)
|
|
return final_centerline_paths
|
|
|
|
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
|
|
map_api = scene.map_api
|
|
ego_statuses = [frame.ego_status for frame in scene.frames]
|
|
ego2globals = [frame.ego2global for frame in scene.frames]
|
|
|
|
ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
map_dict = {'centerline': []}
|
|
line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, ego_status_curr, list(self.discrete_layers))
|
|
centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings)
|
|
for instance in centerline_list:
|
|
map_dict['centerline'].append(np.array(instance.coords))
|
|
|
|
vectors = []
|
|
gt_labels = []
|
|
gt_instance = []
|
|
instance_list = map_dict['centerline']
|
|
for instance in instance_list:
|
|
vectors.append(LineString(np.array(instance)))
|
|
for instance in vectors:
|
|
gt_instance.append(instance)
|
|
gt_labels.append(0)
|
|
gt_semantic_mask=None
|
|
gt_pv_semantic_mask=None
|
|
gt_labels = torch.tensor(gt_labels)
|
|
|
|
|
|
gt_instance = LiDARInstanceLines(gt_instance, gt_labels, self.sample_dist, self.num_samples,
|
|
self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2)
|
|
|
|
|
|
return {"dense_el": None,
|
|
"gt_bboxes_3d": gt_instance,
|
|
"gt_labels_3d": gt_labels}
|
|
|