Spaces:
Runtime error
Runtime error
| import atexit | |
| import copy | |
| import os | |
| from mmcv import Config | |
| import numpy as np | |
| import pytest | |
| from pytorch_lightning import seed_everything | |
| import torch | |
| import shutil | |
| from torch.utils.data import DataLoader | |
| from risk_biased.scene_dataset.loaders import SceneDataLoaders | |
| from risk_biased.scene_dataset.scene import SceneDataset, RandomSceneParams | |
| from risk_biased.scene_dataset.scene import load_create_dataset | |
| def clean_up_dataset_dir(): | |
| """ | |
| This function is designed to delete the directories | |
| that might have created even if the test fails early | |
| by being called on exit. | |
| """ | |
| current_dir = os.path.dirname(os.path.realpath(__file__)) | |
| dataset_dir0 = os.path.join(current_dir, "scene_dataset_000") | |
| if os.path.exists(dataset_dir0): | |
| shutil.rmtree(dataset_dir0) | |
| dataset_dir1 = os.path.join(current_dir, "scene_dataset_001") | |
| if os.path.exists(dataset_dir1): | |
| shutil.rmtree(dataset_dir1) | |
| # atexit.register(clean_up_dataset_dir) | |
| def params(): | |
| seed_everything(0) | |
| cfg = Config() | |
| cfg.batch_size = 4 | |
| cfg.time_scene = 5.0 | |
| cfg.dt = 0.1 | |
| cfg.sample_times = [t * cfg.dt for t in range(0, int(cfg.time_scene / cfg.dt))] | |
| cfg.ego_ref_speed = 14 | |
| cfg.ego_speed_init_low = 4.0 | |
| cfg.ego_speed_init_high = 16.0 | |
| cfg.ego_acceleration_mean_low = -1.5 | |
| cfg.ego_acceleration_mean_high = 1.5 | |
| cfg.ego_acceleration_std = 1.5 | |
| cfg.ego_length = 4 | |
| cfg.ego_width = 1.75 | |
| cfg.fast_speed = 2.0 | |
| cfg.slow_speed = 1.0 | |
| cfg.p_change_pace = 0.2 | |
| cfg.proportion_fast = 0.5 | |
| cfg.perception_noise_std = 0.03 | |
| cfg.state_dim = 2 | |
| cfg.num_steps = 3 | |
| cfg.num_steps_future = len(cfg.sample_times) - cfg.num_steps | |
| cfg.file_name = "test_scene_data" | |
| cfg.datasets_sizes = {"train": 100, "val": 10, "test": 30} | |
| cfg.datasets = list(cfg.datasets_sizes.keys()) | |
| cfg.num_workers = 2 | |
| cfg.dataset_parameters = { | |
| "dt": cfg.dt, | |
| "time_scene": cfg.time_scene, | |
| "sample_times": cfg.sample_times, | |
| "ego_ref_speed": cfg.ego_ref_speed, | |
| "ego_speed_init_low": cfg.ego_speed_init_low, | |
| "ego_speed_init_high": cfg.ego_speed_init_high, | |
| "ego_acceleration_mean_low": cfg.ego_acceleration_mean_low, | |
| "ego_acceleration_mean_high": cfg.ego_acceleration_mean_high, | |
| "ego_acceleration_std": cfg.ego_acceleration_std, | |
| "fast_speed": cfg.fast_speed, | |
| "slow_speed": cfg.slow_speed, | |
| "p_change_pace": cfg.p_change_pace, | |
| "proportion_fast": cfg.proportion_fast, | |
| "file_name": cfg.file_name, | |
| "datasets_sizes": cfg.datasets_sizes, | |
| "state_dim": cfg.state_dim, | |
| "num_steps": cfg.num_steps, | |
| "num_steps_future": cfg.num_steps_future, | |
| "perception_noise_std": cfg.perception_noise_std, | |
| } | |
| return cfg | |
| def test_load_data(params, n_data, batch_size, sample_times, state_dim): | |
| params = copy.deepcopy(params) | |
| params.batch_size = batch_size | |
| params.sample_times = sample_times | |
| scene_params = RandomSceneParams.from_config(params) | |
| dataset_rand = SceneDataset(n_data, scene_params, pre_fetch=False) | |
| data_loader_rand = DataLoader( | |
| dataset_rand, batch_size, collate_fn=dataset_rand.collate_fn, shuffle=False | |
| ) | |
| dataset_prefetch = SceneDataset(n_data, scene_params, pre_fetch=True) | |
| data_loader_prefetch = DataLoader( | |
| dataset_prefetch, | |
| batch_size, | |
| collate_fn=dataset_prefetch.collate_fn, | |
| shuffle=False, | |
| ) | |
| for i, (batch_rand, batch_prefetch) in enumerate( | |
| zip(data_loader_rand, data_loader_prefetch) | |
| ): | |
| if i == 0: | |
| first_batch_prefetch = batch_prefetch | |
| first_batch_rand = batch_rand | |
| # Check the shape of the data is the expected one | |
| assert ( | |
| batch_rand.shape | |
| == batch_prefetch.shape | |
| == (batch_size, 1, len(sample_times), state_dim) | |
| ) | |
| # Shuffle false and pre-fetch should loop back to the same batch | |
| assert torch.allclose(next(iter(data_loader_prefetch)), first_batch_prefetch) | |
| # Shuffle false but producing random batches should not loop back to the same batch | |
| assert not torch.allclose(next(iter(data_loader_rand)), first_batch_rand) | |
| class TestDataset: | |
| def setup(self, params): | |
| clean_up_dataset_dir() | |
| current_dir = os.path.dirname(os.path.realpath(__file__)) | |
| [data_train, data_val, data_test] = load_create_dataset(params, current_dir) | |
| self.loaders = SceneDataLoaders( | |
| params.state_dim, | |
| params.num_steps, | |
| params.num_steps_future, | |
| params.batch_size, | |
| data_train=data_train, | |
| data_val=data_val, | |
| data_test=data_test, | |
| num_workers=params.num_workers, | |
| ) | |
| assert os.path.exists(os.path.join(current_dir, "scene_dataset_000")) | |
| assert not os.path.exists(os.path.join(current_dir, "scene_dataset_001")) | |
| self.batch = torch.rand( | |
| params.batch_size, | |
| params.num_steps + params.num_steps_future, | |
| params.state_dim, | |
| ) | |
| self.normalized_batch, self.offset = self.loaders.normalize_trajectory( | |
| self.batch | |
| ) | |
| ( | |
| self.normalized_batch_past, | |
| self.normalized_batch_future, | |
| ) = self.loaders.split_trajectory(self.normalized_batch) | |
| # Setup is done but some cleanup must be defined | |
| yield | |
| # Remove data directory after use | |
| dataset_dir = os.path.join(current_dir, "scene_dataset_000") | |
| shutil.rmtree(dataset_dir) | |
| def test_setup_datasets(self, params): | |
| current_dir = os.path.dirname(os.path.realpath(__file__)) | |
| assert os.path.exists(os.path.join(current_dir, "scene_dataset_000")) | |
| # Should only load from directory that was created, not create a new one | |
| [train_set, val_set, test_set] = load_create_dataset( | |
| params, base_dir=current_dir | |
| ) | |
| assert not os.path.exists(os.path.join(current_dir, "scene_dataset_001")) | |
| train_path = os.path.join( | |
| current_dir, "scene_dataset_000", "scene_dataset_train.npy" | |
| ) | |
| val_path = os.path.join( | |
| current_dir, "scene_dataset_000", "scene_dataset_val.npy" | |
| ) | |
| test_path = os.path.join( | |
| current_dir, "scene_dataset_000", "scene_dataset_test.npy" | |
| ) | |
| # make sure paths for datasets exist | |
| assert os.path.exists(train_path) | |
| assert os.path.exists(val_path) | |
| assert os.path.exists(test_path) | |
| # make sure datasets match the specifications made in config | |
| assert np.load(train_path).shape == ( | |
| 2, | |
| params.datasets_sizes["train"], | |
| 1, | |
| params.num_steps + params.num_steps_future, | |
| params.state_dim, | |
| ) | |
| assert np.load(val_path).shape == ( | |
| 2, | |
| params.datasets_sizes["val"], | |
| 1, | |
| params.num_steps + params.num_steps_future, | |
| params.state_dim, | |
| ) | |
| assert np.load(test_path).shape == ( | |
| 2, | |
| params.datasets_sizes["test"], | |
| 1, | |
| params.num_steps + params.num_steps_future, | |
| params.state_dim, | |
| ) | |
| total_steps = params.num_steps + params.num_steps_future | |
| assert list(train_set.shape) == [ | |
| 2, | |
| params.datasets_sizes.train, | |
| 1, | |
| total_steps, | |
| 2, | |
| ] | |
| assert list(val_set.shape) == [2, params.datasets_sizes.val, 1, total_steps, 2] | |
| assert list(test_set.shape) == [ | |
| 2, | |
| params.datasets_sizes.test, | |
| 1, | |
| total_steps, | |
| 2, | |
| ] | |
| def test_split_trajectory(self, params): | |
| batch_history, batch_future = self.loaders.split_trajectory(self.batch) | |
| # make sure split_trajectory splits batch into history and future | |
| assert torch.all(torch.eq(batch_history, self.batch[:, : params.num_steps, :])) | |
| assert torch.all(torch.eq(batch_future, self.batch[:, params.num_steps :, :])) | |
| def test_normalize_trajectory(self, params): | |
| batch_copied = self.batch.detach().clone() | |
| # make sure batch remains the same | |
| assert torch.all(torch.eq(batch_copied, self.batch)) | |
| # test normalization of whole batch | |
| assert torch.allclose( | |
| self.normalized_batch + self.offset.unsqueeze(1), self.batch | |
| ) | |
| assert torch.allclose( | |
| self.batch - self.offset.unsqueeze(1), self.normalized_batch | |
| ) | |
| batch_past, batch_fut = self.loaders.split_trajectory(self.batch) | |
| # test normalization of history | |
| assert torch.allclose( | |
| self.normalized_batch_past + self.offset.unsqueeze(1), batch_past | |
| ) | |
| def test_unnormalize_trajectory(self, params): | |
| batch_future_test = self.loaders.unnormalize_trajectory( | |
| self.normalized_batch_future, self.offset | |
| ) | |
| # test unnormalization | |
| assert torch.allclose( | |
| self.normalized_batch_future + self.offset.unsqueeze(1), batch_future_test | |
| ) | |