import io import logging import os import pickle import uuid from pathlib import Path import hydra import matplotlib.pyplot as plt import numpy as np import torch from PIL import Image, ImageDraw from hydra.utils import instantiate from matplotlib.collections import LineCollection from nuplan.planning.utils.multithreading.worker_utils import worker_map from omegaconf import DictConfig from tqdm import tqdm from navsim.common.dataclasses import AgentInput, Scene from navsim.common.dataclasses import SensorConfig from navsim.common.dataloader import SceneLoader, MetricCacheLoader from navsim.planning.script.builders.worker_pool_builder import build_worker from navsim.visualization.private import view_points # your path to these files vocab = np.load(f'{os.getenv("NAVSIM_DEVKIT_ROOT")}/traj_final/test_8192_kmeans.npy') subscores = pickle.load(open(f'{os.getenv("OPENSCENE_DATA_ROOT")}/subscores/sinepe.pkl', 'rb')) logger = logging.getLogger(__name__) CONFIG_PATH = "../planning/script/config/pdm_scoring" CONFIG_NAME = "run_pdm_score_ddp" @hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME) def main(cfg: DictConfig) -> None: data_path = Path(cfg.navsim_log_path) sensor_blobs_path = Path(cfg.sensor_blobs_path) scene_filter = instantiate(cfg.scene_filter) scene_loader = SceneLoader( data_path=data_path, scene_filter=scene_filter, sensor_blobs_path=sensor_blobs_path, sensor_config=SensorConfig.build_no_sensors() ) l2_dists = [] for token in tqdm(scene_loader.tokens): gt_traj = Scene.from_scene_dict_list( scene_loader.scene_frames_dicts[token], scene_loader._sensor_blobs_path, scene_loader._scene_filter.num_history_frames, 10, scene_loader._sensor_config ).get_future_trajectory(int(4 / 0.5)) model_traj = subscores[token]['trajectory'] sampled_timepoints = [5 * k - 1 for k in range(1, 9)] l2_dist = ((gt_traj.poses - model_traj.poses[sampled_timepoints]) ** 2).sum() l2_dists.append(l2_dist) print(sum(l2_dists) / len(l2_dists)) if __name__ == "__main__": with torch.no_grad(): main()