|
import io |
|
import logging |
|
import os |
|
import pickle |
|
import uuid |
|
from pathlib import Path |
|
|
|
import hydra |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import torch |
|
from PIL import Image, ImageDraw |
|
from hydra.utils import instantiate |
|
from matplotlib.collections import LineCollection |
|
from nuplan.planning.utils.multithreading.worker_utils import worker_map |
|
from omegaconf import DictConfig |
|
from tqdm import tqdm |
|
|
|
from navsim.common.dataclasses import AgentInput, Scene |
|
from navsim.common.dataclasses import SensorConfig |
|
from navsim.common.dataloader import SceneLoader, MetricCacheLoader |
|
from navsim.planning.script.builders.worker_pool_builder import build_worker |
|
from navsim.visualization.private import view_points |
|
|
|
|
|
|
|
|
|
vocab = np.load(f'{os.getenv("NAVSIM_DEVKIT_ROOT")}/traj_final/test_8192_kmeans.npy') |
|
subscores = pickle.load(open(f'{os.getenv("OPENSCENE_DATA_ROOT")}/subscores/sinepe.pkl', 'rb')) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
CONFIG_PATH = "../planning/script/config/pdm_scoring" |
|
CONFIG_NAME = "run_pdm_score_ddp" |
|
|
|
|
|
@hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME) |
|
def main(cfg: DictConfig) -> None: |
|
data_path = Path(cfg.navsim_log_path) |
|
sensor_blobs_path = Path(cfg.sensor_blobs_path) |
|
scene_filter = instantiate(cfg.scene_filter) |
|
scene_loader = SceneLoader( |
|
data_path=data_path, |
|
scene_filter=scene_filter, |
|
sensor_blobs_path=sensor_blobs_path, |
|
sensor_config=SensorConfig.build_no_sensors() |
|
) |
|
l2_dists = [] |
|
for token in tqdm(scene_loader.tokens): |
|
gt_traj = Scene.from_scene_dict_list( |
|
scene_loader.scene_frames_dicts[token], |
|
scene_loader._sensor_blobs_path, |
|
scene_loader._scene_filter.num_history_frames, |
|
10, |
|
scene_loader._sensor_config |
|
).get_future_trajectory(int(4 / 0.5)) |
|
model_traj = subscores[token]['trajectory'] |
|
sampled_timepoints = [5 * k - 1 for k in range(1, 9)] |
|
l2_dist = ((gt_traj.poses - model_traj.poses[sampled_timepoints]) ** 2).sum() |
|
l2_dists.append(l2_dist) |
|
print(sum(l2_dists) / len(l2_dists)) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
with torch.no_grad(): |
|
main() |
|
|