|
import io |
|
import logging |
|
import os |
|
import pickle |
|
import uuid |
|
from pathlib import Path |
|
|
|
import hydra |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import torch |
|
from PIL import Image, ImageDraw |
|
from hydra.utils import instantiate |
|
from matplotlib.collections import LineCollection |
|
from nuplan.planning.utils.multithreading.worker_utils import worker_map |
|
from omegaconf import DictConfig |
|
from tqdm import tqdm |
|
|
|
from navsim.common.dataclasses import AgentInput, Scene |
|
from navsim.common.dataclasses import SensorConfig |
|
from navsim.common.dataloader import SceneLoader |
|
from navsim.planning.script.builders.worker_pool_builder import build_worker |
|
from navsim.visualization.private import view_points |
|
|
|
""" |
|
ckpt -> pkl + valid score |
|
|
|
""" |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
CONFIG_PATH = "../../navsim/planning/script/config/pdm_scoring" |
|
CONFIG_NAME = "run_pdm_score_ddp" |
|
|
|
vocab = np.load(f'{os.getenv("NAVSIM_DEVKIT_ROOT")}/traj_final/test_8192_kmeans.npy') |
|
gt_scores = pickle.load(open(f'{os.getenv("NAVSIM_TRAJPDM_ROOT")}/vocab_score_full_8192_navtest/navtest.pkl', 'rb')) |
|
subscores = pickle.load(open(f'{os.getenv("NAVSIM_EXP_ROOT")}/hydra_offset_vov_fixedpading_bs8x8_ckpt/epoch09.pkl', 'rb')) |
|
output_dir = f'{os.getenv("NAVSIM_EXP_ROOT")}/offset_vis' |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
norm = plt.Normalize(vmin=0.0, vmax=1.0) |
|
cmap = plt.get_cmap('viridis') |
|
|
|
|
|
def get_overlay(poses, cam2lidar_rot, cam2lidar_tran, cam_intrin, color=(255, 0, 0, 255)): |
|
coordinates = np.zeros((3, poses.shape[0])) |
|
coordinates[0] = poses[:, 0] |
|
coordinates[1] = poses[:, 1] |
|
coordinates[2] = 0.0 |
|
|
|
lidar2cam_rot = np.linalg.inv(cam2lidar_rot) |
|
coordinates -= cam2lidar_tran.reshape(-1, 1) |
|
coordinates = np.dot(lidar2cam_rot, coordinates) |
|
coordinates = np.dot(cam_intrin, coordinates) |
|
heights = coordinates[2, :] |
|
points = view_points(coordinates[:3, :], np.eye(3), normalize=True) |
|
points[2, :] = heights |
|
|
|
mask = np.ones(points.shape[1], dtype=bool) |
|
canvas_size = (1080, 1920) |
|
mask = np.logical_and(mask, points[0, :] < canvas_size[1] - 1) |
|
mask = np.logical_and(mask, points[0, :] > 0) |
|
mask = np.logical_and(mask, points[1, :] < canvas_size[0] - 1) |
|
mask = np.logical_and(mask, points[1, :] > 0) |
|
|
|
points = points[:, mask] |
|
depth = heights[mask] |
|
|
|
points = np.int16(np.round(points[:2, :])) |
|
depth = np.int16(np.round(depth)) |
|
overlay_img = Image.new("RGBA", (canvas_size[1], canvas_size[0]), (255, 255, 255, 0)) |
|
draw = ImageDraw.Draw(overlay_img) |
|
|
|
depth_canvas = np.zeros(canvas_size, dtype=np.int16) |
|
for (col, row), d in zip(points.T, depth): |
|
depth_canvas[row, col] = d |
|
|
|
depth_canvas = torch.from_numpy(depth_canvas) |
|
|
|
inds = (depth_canvas > 0).nonzero() |
|
for ind in inds: |
|
y, x = ind |
|
x, y = x.item(), y.item() |
|
r = 5 |
|
draw.ellipse((x - r, y - r, x + r, y + r), fill=color) |
|
|
|
return overlay_img |
|
|
|
|
|
def get_distribution(scores, vocab, gt_traj): |
|
metrics = ['imi', 'noc', 'da', 'comfort', 'progress'] |
|
|
|
fig, axes = plt.subplots(2, 3, figsize=(16.2, 10.8)) |
|
|
|
for i, ax in enumerate(axes.flat): |
|
metric = metrics[i] |
|
vocab_scores = scores[metric].exp().cpu().numpy() |
|
|
|
if metric == 'imi': |
|
vocab_scores *= 10 |
|
|
|
line_collection = LineCollection(vocab[..., :2], |
|
colors=[cmap(norm(score)) for score in vocab_scores], |
|
alpha=[1.0 if score > 0.1 else 0.001 for score in vocab_scores]) |
|
ax.set_xlim(-5, 65) |
|
ax.set_ylim(-25, 25) |
|
ax.add_collection(line_collection) |
|
|
|
|
|
if metric == 'imi': |
|
ax.plot(gt_traj[:, 0], gt_traj[:, 1], c='r', alpha=1.0) |
|
|
|
ax.set_title(f"Metric {metric}") |
|
fig.colorbar(plt.cm.ScalarMappable(norm=norm, cmap=cmap), cax=fig.add_axes([0.92, 0.15, 0.02, 0.7])) |
|
plt.tight_layout(rect=[0, 0, 0.9, 1]) |
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png') |
|
buf.seek(0) |
|
image = Image.open(buf) |
|
|
|
return image |
|
|
|
|
|
def worker_task(args): |
|
node_id = int(os.environ.get("NODE_RANK", 0)) |
|
thread_id = str(uuid.uuid4()) |
|
logger.info(f"Starting worker in thread_id={thread_id}, node_id={node_id}") |
|
|
|
for arg in tqdm(args, desc="Running visualization"): |
|
token, gt_scores, subscores, vocab = arg['token'], arg['gt_scores'], arg['subscores'], arg['vocab'] |
|
scene_loader = arg['scene_loader'] |
|
agent_input = AgentInput.from_scene_dict_list( |
|
scene_loader.scene_frames_dicts[token], |
|
scene_loader._sensor_blobs_path, |
|
scene_loader._scene_filter.num_history_frames, |
|
scene_loader._sensor_config |
|
) |
|
gt_traj = Scene.from_scene_dict_list( |
|
scene_loader.scene_frames_dicts[token], |
|
scene_loader._sensor_blobs_path, |
|
scene_loader._scene_filter.num_history_frames, |
|
10, |
|
scene_loader._sensor_config |
|
).get_future_trajectory(int(4 / 0.5)) |
|
|
|
gt_score = gt_scores[token] |
|
subscore = subscores[token] |
|
for k, v in subscore.items(): |
|
if k != 'trajectory': |
|
subscore[k] = torch.from_numpy(v) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_traj = subscore['trajectory'] |
|
gt_traj = gt_traj.poses |
|
|
|
file_name = f'{token}' |
|
save_path = f'{output_dir}/{file_name}.png' |
|
if os.path.exists(save_path): |
|
continue |
|
|
|
|
|
cam = agent_input.cameras[-1].cam_f0 |
|
img, cam2lidar_rot, cam2lidar_tran, cam_intrin = cam.image, cam.sensor2lidar_rotation, cam.sensor2lidar_translation, cam.intrinsics |
|
|
|
img = Image.fromarray(img.astype('uint8'), 'RGB').convert('RGBA') |
|
|
|
img = Image.alpha_composite(img, get_overlay(model_traj, cam2lidar_rot, cam2lidar_tran, cam_intrin, |
|
color=(255, 0, 0, 255))) |
|
img = Image.alpha_composite(img, get_overlay(gt_traj, cam2lidar_rot, cam2lidar_tran, cam_intrin, |
|
color=(0, 255, 0, 255))) |
|
img = img.convert('RGB') |
|
|
|
|
|
|
|
|
|
|
|
total_width = img.width |
|
|
|
max_heigh = img.height |
|
new_image = Image.new('RGB', (total_width, max_height)) |
|
new_image.paste(img, (0, 0)) |
|
new_image.paste(figs, (img.width, 0)) |
|
new_image.save(save_path) |
|
|
|
return [] |
|
|
|
|
|
@hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME) |
|
def main(cfg: DictConfig) -> None: |
|
data_path = Path(cfg.navsim_log_path) |
|
sensor_blobs_path = Path(cfg.sensor_blobs_path) |
|
scene_filter = instantiate(cfg.scene_filter) |
|
scene_loader = SceneLoader( |
|
data_path=data_path, |
|
scene_filter=scene_filter, |
|
sensor_blobs_path=sensor_blobs_path, |
|
sensor_config=SensorConfig( |
|
cam_f0=True, |
|
cam_l0=True, |
|
cam_l1=True, |
|
cam_l2=True, |
|
cam_r0=True, |
|
cam_r1=True, |
|
cam_r2=True, |
|
cam_b0=True, |
|
lidar_pc=False, |
|
) |
|
) |
|
worker = build_worker(cfg) |
|
|
|
data_points = [] |
|
for token in tqdm(scene_loader.tokens): |
|
data_points.append({ |
|
'token': token, |
|
'scene_loader': scene_loader, |
|
'vocab': vocab, |
|
'gt_scores': gt_scores, |
|
'subscores': subscores |
|
}) |
|
|
|
worker_map(worker, worker_task, data_points[cfg.start_idx:cfg.end_idx]) |
|
|
|
|
|
if __name__ == "__main__": |
|
with torch.no_grad(): |
|
main() |
|
|