import io import logging import os import pickle import uuid from pathlib import Path import hydra import matplotlib.pyplot as plt import numpy as np import torch from PIL import Image, ImageDraw from hydra.utils import instantiate from matplotlib.collections import LineCollection from nuplan.planning.utils.multithreading.worker_utils import worker_map from omegaconf import DictConfig from tqdm import tqdm from navsim.common.dataclasses import AgentInput, Scene from navsim.common.dataclasses import SensorConfig from navsim.common.dataloader import SceneLoader from navsim.planning.script.builders.worker_pool_builder import build_worker from navsim.visualization.private import view_points """ ckpt -> pkl + valid score """ logger = logging.getLogger(__name__) CONFIG_PATH = "../../navsim/planning/script/config/pdm_scoring" CONFIG_NAME = "run_pdm_score_ddp" # your path to these files vocab = np.load(f'{os.getenv("NAVSIM_DEVKIT_ROOT")}/traj_final/test_8192_kmeans.npy') gt_scores = pickle.load(open(f'{os.getenv("NAVSIM_TRAJPDM_ROOT")}/vocab_score_full_8192_navtest/navtest.pkl', 'rb')) subscores = pickle.load(open(f'{os.getenv("NAVSIM_EXP_ROOT")}/hydra_offset_vov_fixedpading_bs8x8_ckpt/epoch09.pkl', 'rb')) output_dir = f'{os.getenv("NAVSIM_EXP_ROOT")}/offset_vis' os.makedirs(output_dir, exist_ok=True) norm = plt.Normalize(vmin=0.0, vmax=1.0) cmap = plt.get_cmap('viridis') def get_overlay(poses, cam2lidar_rot, cam2lidar_tran, cam_intrin, color=(255, 0, 0, 255)): coordinates = np.zeros((3, poses.shape[0])) coordinates[0] = poses[:, 0] coordinates[1] = poses[:, 1] coordinates[2] = 0.0 lidar2cam_rot = np.linalg.inv(cam2lidar_rot) coordinates -= cam2lidar_tran.reshape(-1, 1) coordinates = np.dot(lidar2cam_rot, coordinates) coordinates = np.dot(cam_intrin, coordinates) heights = coordinates[2, :] points = view_points(coordinates[:3, :], np.eye(3), normalize=True) points[2, :] = heights mask = np.ones(points.shape[1], dtype=bool) # type: ignore canvas_size = (1080, 1920) mask = np.logical_and(mask, points[0, :] < canvas_size[1] - 1) mask = np.logical_and(mask, points[0, :] > 0) mask = np.logical_and(mask, points[1, :] < canvas_size[0] - 1) mask = np.logical_and(mask, points[1, :] > 0) points = points[:, mask] depth = heights[mask] points = np.int16(np.round(points[:2, :])) depth = np.int16(np.round(depth)) overlay_img = Image.new("RGBA", (canvas_size[1], canvas_size[0]), (255, 255, 255, 0)) draw = ImageDraw.Draw(overlay_img) # Populate canvas, use maximum color_value for each bin depth_canvas = np.zeros(canvas_size, dtype=np.int16) for (col, row), d in zip(points.T, depth): depth_canvas[row, col] = d depth_canvas = torch.from_numpy(depth_canvas) inds = (depth_canvas > 0).nonzero() for ind in inds: y, x = ind x, y = x.item(), y.item() r = 5 draw.ellipse((x - r, y - r, x + r, y + r), fill=color) return overlay_img def get_distribution(scores, vocab, gt_traj): metrics = ['imi', 'noc', 'da', 'comfort', 'progress'] # Define the figure size in inches (540 pixels / 100 dpi = 5.4 inches) fig, axes = plt.subplots(2, 3, figsize=(16.2, 10.8)) # 3 plots in a row, 2 rows for i, ax in enumerate(axes.flat): metric = metrics[i] vocab_scores = scores[metric].exp().cpu().numpy() # scale imitation scores by 10 if metric == 'imi': vocab_scores *= 10 line_collection = LineCollection(vocab[..., :2], colors=[cmap(norm(score)) for score in vocab_scores], alpha=[1.0 if score > 0.1 else 0.001 for score in vocab_scores]) ax.set_xlim(-5, 65) ax.set_ylim(-25, 25) ax.add_collection(line_collection) # red line in imi plot is gt traj if metric == 'imi': ax.plot(gt_traj[:, 0], gt_traj[:, 1], c='r', alpha=1.0) ax.set_title(f"Metric {metric}") fig.colorbar(plt.cm.ScalarMappable(norm=norm, cmap=cmap), cax=fig.add_axes([0.92, 0.15, 0.02, 0.7])) plt.tight_layout(rect=[0, 0, 0.9, 1]) buf = io.BytesIO() plt.savefig(buf, format='png') buf.seek(0) image = Image.open(buf) return image def worker_task(args): node_id = int(os.environ.get("NODE_RANK", 0)) thread_id = str(uuid.uuid4()) logger.info(f"Starting worker in thread_id={thread_id}, node_id={node_id}") for arg in tqdm(args, desc="Running visualization"): token, gt_scores, subscores, vocab = arg['token'], arg['gt_scores'], arg['subscores'], arg['vocab'] scene_loader = arg['scene_loader'] agent_input = AgentInput.from_scene_dict_list( scene_loader.scene_frames_dicts[token], scene_loader._sensor_blobs_path, scene_loader._scene_filter.num_history_frames, scene_loader._sensor_config ) gt_traj = Scene.from_scene_dict_list( scene_loader.scene_frames_dicts[token], scene_loader._sensor_blobs_path, scene_loader._scene_filter.num_history_frames, 10, scene_loader._sensor_config ).get_future_trajectory(int(4 / 0.5)) gt_score = gt_scores[token] subscore = subscores[token] for k, v in subscore.items(): if k != 'trajectory': subscore[k] = torch.from_numpy(v) # inference # selected_index = subscore['total'].argmax(-1) # curr_score_noc = gt_score['noc'][selected_index] # curr_score_da = gt_score['da'][selected_index] # curr_score_ttc = gt_score['ttc'][selected_index] # curr_score_ep = gt_score['progress'][selected_index] # curr_score_pdm = gt_score['total'][selected_index] # model_traj = vocab[selected_index] model_traj = subscore['trajectory'] gt_traj = gt_traj.poses # file_name = f'{token}_noc{curr_score_noc}_da{curr_score_da}_ttc{curr_score_ttc}_ep{curr_score_ep}_pdm{curr_score_pdm}' file_name = f'{token}' save_path = f'{output_dir}/{file_name}.png' if os.path.exists(save_path): continue # inf traj + gt traj cam = agent_input.cameras[-1].cam_f0 img, cam2lidar_rot, cam2lidar_tran, cam_intrin = cam.image, cam.sensor2lidar_rotation, cam.sensor2lidar_translation, cam.intrinsics img = Image.fromarray(img.astype('uint8'), 'RGB').convert('RGBA') img = Image.alpha_composite(img, get_overlay(model_traj, cam2lidar_rot, cam2lidar_tran, cam_intrin, color=(255, 0, 0, 255))) img = Image.alpha_composite(img, get_overlay(gt_traj, cam2lidar_rot, cam2lidar_tran, cam_intrin, color=(0, 255, 0, 255))) img = img.convert('RGB') # distributions of vocab # figs = get_distribution(subscore, vocab, gt_traj) # concat total_width = img.width # max_height = max(img.height, figs.height) max_heigh = img.height new_image = Image.new('RGB', (total_width, max_height)) new_image.paste(img, (0, 0)) new_image.paste(figs, (img.width, 0)) new_image.save(save_path) return [] @hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME) def main(cfg: DictConfig) -> None: data_path = Path(cfg.navsim_log_path) sensor_blobs_path = Path(cfg.sensor_blobs_path) scene_filter = instantiate(cfg.scene_filter) scene_loader = SceneLoader( data_path=data_path, scene_filter=scene_filter, sensor_blobs_path=sensor_blobs_path, sensor_config=SensorConfig( cam_f0=True, cam_l0=True, cam_l1=True, cam_l2=True, cam_r0=True, cam_r1=True, cam_r2=True, cam_b0=True, lidar_pc=False, ) ) worker = build_worker(cfg) data_points = [] for token in tqdm(scene_loader.tokens): data_points.append({ 'token': token, 'scene_loader': scene_loader, 'vocab': vocab, 'gt_scores': gt_scores, 'subscores': subscores }) worker_map(worker, worker_task, data_points[cfg.start_idx:cfg.end_idx]) if __name__ == "__main__": with torch.no_grad(): main()