File size: 14,395 Bytes
da2e2ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
from __future__ import annotations
from typing import List, Dict, Any
import cv2
import numpy as np
import numpy.typing as npt
import torch
from nuplan.common.actor_state.state_representation import StateSE2
from nuplan.common.maps.abstract_map import AbstractMap
from nuplan.common.maps.maps_datatypes import SemanticMapLayer
from shapely import affinity, LineString
from shapely.geometry import Polygon
from det_map.data.datasets.dataclasses import Scene
from navsim.planning.training.abstract_feature_target_builder import AbstractTargetBuilder
import networkx as nx
# from mmdet.datasets.pipelines import to_tensor
from mmcv.parallel import DataContainer as DC
class LiDARInstanceLines(object):
"""Line instance in LIDAR coordinates"""
def __init__(self,
instance_line_list,
instance_labels,
sample_dist=1,
num_samples=250,
padding=False,
fixed_num=-1,
padding_value=-10000,
patch_size=None):
assert isinstance(instance_line_list, list)
assert patch_size is not None
if len(instance_line_list) != 0:
assert isinstance(instance_line_list[0], LineString)
self.patch_size = patch_size
self.max_x = self.patch_size / 2
self.max_y = self.patch_size / 2
self.sample_dist = sample_dist
self.num_samples = num_samples
self.padding = padding
self.fixed_num = fixed_num
self.padding_value = padding_value
self.instance_list = instance_line_list
self.instance_labels = instance_labels
@property
def fixed_num_sampled_points(self):
"""
return torch.Tensor([N,fixed_num,2]), in xmin, ymin, xmax, ymax form
N means the num of instances
"""
assert len(self.instance_list) != 0
instance_points_list = []
for instance in self.instance_list:
distances = np.linspace(0, instance.length, self.fixed_num)
sampled_points = np.array([list(instance.interpolate(distance).coords) for distance in distances]).reshape(
-1, 2)
instance_points_list.append(sampled_points)
instance_points_array = np.array(instance_points_list)
instance_points_tensor = torch.tensor(instance_points_array)
instance_points_tensor = instance_points_tensor.to(
dtype=torch.float32)
instance_points_tensor[:, :, 0] = torch.clamp(instance_points_tensor[:, :, 0], min=-self.max_x, max=self.max_x)
instance_points_tensor[:, :, 1] = torch.clamp(instance_points_tensor[:, :, 1], min=-self.max_y, max=self.max_y)
return instance_points_tensor
@property
def shift_fixed_num_sampled_points_v2(self):
"""
return [instances_num, num_shifts, fixed_num, 2]
"""
assert len(self.instance_list) != 0
instances_list = []
for idx, instance in enumerate(self.instance_list):
# import ipdb;ipdb.set_trace()
# instance_label = self.instance_labels[idx]
distances = np.linspace(0, instance.length, self.fixed_num)
poly_pts = np.array(list(instance.coords))
start_pts = poly_pts[0]
end_pts = poly_pts[-1]
is_poly = np.equal(start_pts, end_pts)
is_poly = is_poly.all()
shift_pts_list = []
pts_num, coords_num = poly_pts.shape
shift_num = pts_num - 1
final_shift_num = self.fixed_num - 1
# if instance_label == 3:
# import ipdb;ipdb.set_trace()
# 永远是centerline
sampled_points = np.array(
[list(instance.interpolate(distance).coords) for distance in distances]).reshape(-1, 2)
shift_pts_list.append(sampled_points)
multi_shifts_pts = np.stack(shift_pts_list, axis=0)
shifts_num, _, _ = multi_shifts_pts.shape
if shifts_num > final_shift_num:
index = np.random.choice(multi_shifts_pts.shape[0], final_shift_num, replace=False)
multi_shifts_pts = multi_shifts_pts[index]
multi_shifts_pts_tensor = torch.tensor(multi_shifts_pts)
multi_shifts_pts_tensor = multi_shifts_pts_tensor.to(
dtype=torch.float32)
multi_shifts_pts_tensor[:, :, 0] = torch.clamp(multi_shifts_pts_tensor[:, :, 0], min=-self.max_x,
max=self.max_x)
multi_shifts_pts_tensor[:, :, 1] = torch.clamp(multi_shifts_pts_tensor[:, :, 1], min=-self.max_y,
max=self.max_y)
# if not is_poly:
if multi_shifts_pts_tensor.shape[0] < final_shift_num:
padding = torch.full([final_shift_num - multi_shifts_pts_tensor.shape[0], self.fixed_num, 2],
self.padding_value)
multi_shifts_pts_tensor = torch.cat([multi_shifts_pts_tensor, padding], dim=0)
instances_list.append(multi_shifts_pts_tensor)
instances_tensor = torch.stack(instances_list, dim=0)
instances_tensor = instances_tensor.to(
dtype=torch.float32)
return instances_tensor
class MapTargetBuilder(AbstractTargetBuilder):
def __init__(self):
super().__init__()
lidar_resolution_width = 256
lidar_resolution_height = 256
self.dense_layers: List[SemanticMapLayer] = [
SemanticMapLayer.DRIVABLE_AREA,
SemanticMapLayer.CROSSWALK
]
self.dense_layers_labels = [
1, 2
]
self.discrete_layers: List[SemanticMapLayer] = [
SemanticMapLayer.LANE,
SemanticMapLayer.LANE_CONNECTOR,
]
self.radius = 32.0
self.bev_pixel_width: int = lidar_resolution_width
self.bev_pixel_height: int = lidar_resolution_height
self.bev_pixel_size: float = 0.25
self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width)
self.padding_value = -10000
self.sample_dist = 1
self.num_samples = 250
self.padding = False
self.fixed_num = 20
def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any:
"""
Transform shapely geometry in local coordinates of origin.
:param geometry: shapely geometry
:param origin: pose dataclass
:return: shapely geometry
"""
a = np.cos(origin.heading)
b = np.sin(origin.heading)
d = -np.sin(origin.heading)
e = np.cos(origin.heading)
xoff = -origin.x
yoff = -origin.y
translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
return rotated_geometry
def _coords_to_pixel(self, coords):
"""
Transform local coordinates in pixel indices of BEV map
:param coords: _description_
:return: _description_
"""
# NOTE: remove half in backward direction
pixel_center = np.array([[0, self.bev_pixel_width / 2.0]])
coords_idcs = (coords / self.bev_pixel_size) + pixel_center
return coords_idcs.astype(np.int32)
def _compute_map_polygon_mask(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
"""
Compute binary mask given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
"""
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self.radius, layers=layers
)
map_polygon_mask = np.zeros(self.bev_semantic_frame[::-1], dtype=np.uint8)
for layer in layers:
for map_object in map_object_dict[layer]:
polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose)
exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2))
exterior = self._coords_to_pixel(exterior)
cv2.fillPoly(map_polygon_mask, [exterior], color=255)
# OpenCV has origin on top-left corner
map_polygon_mask = np.rot90(map_polygon_mask)[::-1]
return map_polygon_mask > 0
def _compute_map_linestrings(
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
) -> npt.NDArray[np.bool_]:
"""
Compute binary of linestring given a map layer class
:param map_api: map interface of nuPlan
:param ego_pose: ego pose in global frame
:param layers: map layers
:return: binary mask as numpy array
"""
map_object_dict = map_api.get_proximal_map_objects(
point=ego_pose.point, radius=self.radius, layers=layers
)
something = []
incoming_something = []
outcoming_something = []
for layer in layers:
for map_object in map_object_dict[layer]:
linestring: LineString = self._geometry_local_coords(
map_object.baseline_path.linestring, ego_pose
)
something.append(linestring)
for incoming_edge in map_object.incoming_edges:
incomingstring: LineString = self._geometry_local_coords(
incoming_edge.baseline_path.linestring, ego_pose
)
incoming_something.append(incomingstring)
for outgoing_edge in map_object.outgoing_edges:
outcomingstring: LineString = self._geometry_local_coords(
outgoing_edge.baseline_path.linestring, ego_pose
)
outcoming_something.append(outcomingstring)
# todo
points = np.array(linestring.coords).reshape((-1, 1, 2))
return something, incoming_something, outcoming_something
def union_centerline(self, centerline_list, incoming_list, outcoming_list):
pts_G = nx.DiGraph()
junction_pts_list = []
start_pt = np.array(centerline_list[0].coords).round(3)[0]
end_pt = np.array(centerline_list[-1].coords).round(3)[-1]
for centerline_geom in centerline_list:
centerline_pts = np.array(centerline_geom.coords).round(3)
start_pt = centerline_pts[0]
end_pt = centerline_pts[-1]
for idx, pts in enumerate(centerline_pts[:-1]):
pts_G.add_edge(tuple(centerline_pts[idx]),tuple(centerline_pts[idx+1]))
valid_incoming_num = 0
for pred_geom in incoming_list:
valid_incoming_num += 1
pred_pt = np.array(pred_geom.coords).round(3)[-1]
pts_G.add_edge(tuple(pred_pt), tuple(start_pt))
valid_outgoing_num = 0
for succ_geom in outcoming_list:
valid_outgoing_num += 1
succ_pt = np.array(succ_geom.coords).round(3)[0]
pts_G.add_edge(tuple(end_pt), tuple(succ_pt))
roots = (v for v, d in pts_G.in_degree() if d == 0)
leaves = [v for v, d in pts_G.out_degree() if d == 0]
all_paths = []
for root in roots:
paths = nx.all_simple_paths(pts_G, root, leaves)
all_paths.extend(paths)
final_centerline_paths = []
for path in all_paths:
merged_line = LineString(path)
merged_line = merged_line.simplify(0.2, preserve_topology=True)
final_centerline_paths.append(merged_line)
return final_centerline_paths
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
map_api = scene.map_api
ego_statuses = [frame.ego_status for frame in scene.frames]
ego2globals = [frame.ego2global for frame in scene.frames]
# Last one is the current frame
ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose)
# dense
# dense_semantic_map = np.zeros(self.bev_semantic_frame, dtype=np.int64)
# for layer, label in zip(self.dense_layers, self.dense_layers_labels):
# entity_mask = self._compute_map_polygon_mask(map_api, ego_status_curr, [layer])
# dense_semantic_map[entity_mask] = label
# discrete
# centerline_list
map_dict = {'centerline': []}
line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, ego_status_curr, list(self.discrete_layers))
centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings)
for instance in centerline_list:
map_dict['centerline'].append(np.array(instance.coords))
vectors = []
gt_labels = []
gt_instance = []
instance_list = map_dict['centerline']
for instance in instance_list:
vectors.append(LineString(np.array(instance)))
for instance in vectors:
gt_instance.append(instance)
gt_labels.append(0)
gt_semantic_mask=None
gt_pv_semantic_mask=None
gt_labels = torch.tensor(gt_labels)
# print(type(gt_labels))
gt_instance = LiDARInstanceLines(gt_instance, gt_labels, self.sample_dist, self.num_samples,
self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2)
# gt_instance = DC(gt_instance, cpu_only=True)
# gt_labels = DC(gt_labels, cpu_only=False)
return {"dense_el": None,
"gt_bboxes_3d": gt_instance,
"gt_labels_3d": gt_labels}
|