Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| from collections import namedtuple, defaultdict | |
| import copy | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| from torch.distributions import Normal, Independent | |
| from ding.torch_utils import Adam, to_device | |
| from ding.rl_utils import v_1step_td_data, v_1step_td_error, get_train_sample, \ | |
| qrdqn_nstep_td_data, qrdqn_nstep_td_error, get_nstep_return_data | |
| from ding.policy import Policy | |
| from ding.model import model_wrap | |
| from ding.utils import POLICY_REGISTRY, DatasetNormalizer | |
| from ding.utils.data import default_collate, default_decollate | |
| from .common_utils import default_preprocess_learn | |
| class PDPolicy(Policy): | |
| r""" | |
| Overview: | |
| Implicit Plan Diffuser | |
| https://arxiv.org/pdf/2205.09991.pdf | |
| """ | |
| config = dict( | |
| type='pd', | |
| # (bool) Whether to use cuda for network. | |
| cuda=False, | |
| # (bool type) priority: Determine whether to use priority in buffer sample. | |
| # Default False in SAC. | |
| priority=False, | |
| # (bool) Whether use Importance Sampling Weight to correct biased update. If True, priority must be True. | |
| priority_IS_weight=False, | |
| # (int) Number of training samples(randomly collected) in replay buffer when training starts. | |
| # Default 10000 in SAC. | |
| random_collect_size=10000, | |
| nstep=1, | |
| # normalizer type | |
| normalizer='GaussianNormalizer', | |
| model=dict( | |
| diffuser_model='GaussianDiffusion', | |
| diffuser_model_cfg=dict( | |
| # the type of model | |
| model='TemporalUnet', | |
| # config of model | |
| model_cfg=dict( | |
| # model dim, In GaussianInvDynDiffusion, it is obs_dim. In others, it is obs_dim + action_dim | |
| transition_dim=23, | |
| dim=32, | |
| dim_mults=[1, 2, 4, 8], | |
| # whether use return as a condition | |
| returns_condition=False, | |
| condition_dropout=0.1, | |
| # whether use calc energy | |
| calc_energy=False, | |
| kernel_size=5, | |
| # whether use attention | |
| attention=False, | |
| ), | |
| # horizon of tarjectory which generated by model | |
| horizon=80, | |
| # timesteps of diffusion | |
| n_timesteps=1000, | |
| # hidden dim of action model | |
| # Whether predict epsilon | |
| predict_epsilon=True, | |
| # discount of loss | |
| loss_discount=1.0, | |
| # whether clip denoise | |
| clip_denoised=False, | |
| action_weight=10, | |
| ), | |
| value_model='ValueDiffusion', | |
| value_model_cfg=dict( | |
| # the type of model | |
| model='TemporalValue', | |
| # config of model | |
| model_cfg=dict( | |
| horizon=4, | |
| # model dim, In GaussianInvDynDiffusion, it is obs_dim. In others, it is obs_dim + action_dim | |
| transition_dim=23, | |
| dim=32, | |
| dim_mults=[1, 2, 4, 8], | |
| # whether use calc energy | |
| kernel_size=5, | |
| ), | |
| # horizon of tarjectory which generated by model | |
| horizon=80, | |
| # timesteps of diffusion | |
| n_timesteps=1000, | |
| # hidden dim of action model | |
| predict_epsilon=True, | |
| # discount of loss | |
| loss_discount=1.0, | |
| # whether clip denoise | |
| clip_denoised=False, | |
| action_weight=1.0, | |
| ), | |
| # guide_steps for p sample | |
| n_guide_steps=2, | |
| # scale of grad for p sample | |
| scale=0.1, | |
| # t of stopgrad for p sample | |
| t_stopgrad=2, | |
| # whether use std as a scale for grad | |
| scale_grad_by_std=True, | |
| ), | |
| learn=dict( | |
| # How many updates(iterations) to train after collector's one collection. | |
| # Bigger "update_per_collect" means bigger off-policy. | |
| # collect data -> update policy-> collect data -> ... | |
| update_per_collect=1, | |
| # (int) Minibatch size for gradient descent. | |
| batch_size=100, | |
| # (float type) learning_rate_q: Learning rate for model. | |
| # Default to 3e-4. | |
| # Please set to 1e-3, when model.value_network is True. | |
| learning_rate=3e-4, | |
| # (bool) Whether ignore done(usually for max step termination env. e.g. pendulum) | |
| # Note: Gym wraps the MuJoCo envs by default with TimeLimit environment wrappers. | |
| # These limit HalfCheetah, and several other MuJoCo envs, to max length of 1000. | |
| # However, interaction with HalfCheetah always gets done with done is False, | |
| # Since we inplace done==True with done==False to keep | |
| # TD-error accurate computation(``gamma * (1 - done) * next_v + reward``), | |
| # when the episode step is greater than max episode step. | |
| ignore_done=False, | |
| # (float type) target_theta: Used for soft update of the target network, | |
| # aka. Interpolation factor in polyak averaging for target networks. | |
| # Default to 0.005. | |
| target_theta=0.005, | |
| # (float) discount factor for the discounted sum of rewards, aka. gamma. | |
| discount_factor=0.99, | |
| gradient_accumulate_every=2, | |
| # train_epoch = train_epoch * gradient_accumulate_every | |
| train_epoch=60000, | |
| # batch_size of every env when eval | |
| plan_batch_size=64, | |
| # step start update target model and frequence | |
| step_start_update_target=2000, | |
| update_target_freq=10, | |
| # update weight of target net | |
| target_weight=0.995, | |
| value_step=200e3, | |
| # dataset weight include returns | |
| include_returns=True, | |
| # (float) Weight uniform initialization range in the last output layer | |
| init_w=3e-3, | |
| ), | |
| ) | |
| def default_model(self) -> Tuple[str, List[str]]: | |
| return 'pd', ['ding.model.template.diffusion'] | |
| def _init_learn(self) -> None: | |
| r""" | |
| Overview: | |
| Learn mode init method. Called by ``self.__init__``. | |
| Init q, value and policy's optimizers, algorithm config, main and target models. | |
| """ | |
| # Init | |
| self._priority = self._cfg.priority | |
| self._priority_IS_weight = self._cfg.priority_IS_weight | |
| self.action_dim = self._cfg.model.diffuser_model_cfg.action_dim | |
| self.obs_dim = self._cfg.model.diffuser_model_cfg.obs_dim | |
| self.n_timesteps = self._cfg.model.diffuser_model_cfg.n_timesteps | |
| self.gradient_accumulate_every = self._cfg.learn.gradient_accumulate_every | |
| self.plan_batch_size = self._cfg.learn.plan_batch_size | |
| self.gradient_steps = 1 | |
| self.update_target_freq = self._cfg.learn.update_target_freq | |
| self.step_start_update_target = self._cfg.learn.step_start_update_target | |
| self.target_weight = self._cfg.learn.target_weight | |
| self.value_step = self._cfg.learn.value_step | |
| self.use_target = False | |
| self.horizon = self._cfg.model.diffuser_model_cfg.horizon | |
| self.include_returns = self._cfg.learn.include_returns | |
| # Optimizers | |
| self._plan_optimizer = Adam( | |
| self._model.diffuser.model.parameters(), | |
| lr=self._cfg.learn.learning_rate, | |
| ) | |
| if self._model.value: | |
| self._value_optimizer = Adam( | |
| self._model.value.model.parameters(), | |
| lr=self._cfg.learn.learning_rate, | |
| ) | |
| # Algorithm config | |
| self._gamma = self._cfg.learn.discount_factor | |
| # Main and target models | |
| self._target_model = copy.deepcopy(self._model) | |
| # self._target_model = model_wrap( | |
| # self._target_model, | |
| # wrapper_name='target', | |
| # update_type='momentum', | |
| # update_kwargs={'theta': self._cfg.learn.target_theta} | |
| # ) | |
| self._learn_model = model_wrap(self._model, wrapper_name='base') | |
| self._learn_model.reset() | |
| # self._target_model.reset() | |
| self._forward_learn_cnt = 0 | |
| def _forward_learn(self, data: dict) -> Dict[str, Any]: | |
| loss_dict = {} | |
| data = default_preprocess_learn( | |
| data, | |
| use_priority=self._priority, | |
| use_priority_IS_weight=self._cfg.priority_IS_weight, | |
| ignore_done=self._cfg.learn.ignore_done, | |
| use_nstep=False | |
| ) | |
| conds = {} | |
| vals = data['condition_val'] | |
| ids = data['condition_id'] | |
| for i in range(len(ids)): | |
| conds[ids[i][0].item()] = vals[i] | |
| if len(ids) > 1: | |
| self.use_target = True | |
| data['conditions'] = conds | |
| if 'returns' in data.keys(): | |
| data['returns'] = data['returns'].unsqueeze(-1) | |
| if self._cuda: | |
| data = to_device(data, self._device) | |
| self._learn_model.train() | |
| # self._target_model.train() | |
| x = data['trajectories'] | |
| batch_size = len(x) | |
| t = torch.randint(0, self.n_timesteps, (batch_size, ), device=x.device).long() | |
| cond = data['conditions'] | |
| if 'returns' in data.keys(): | |
| target = data['returns'] | |
| loss_dict['diffuse_loss'], loss_dict['a0_loss'] = self._model.diffuser_loss(x, cond, t) | |
| loss_dict['diffuse_loss'] = loss_dict['diffuse_loss'] / self.gradient_accumulate_every | |
| loss_dict['diffuse_loss'].backward() | |
| if self._forward_learn_cnt < self.value_step and self._model.value: | |
| loss_dict['value_loss'], logs = self._model.value_loss(x, cond, target, t) | |
| loss_dict['value_loss'] = loss_dict['value_loss'] / self.gradient_accumulate_every | |
| loss_dict['value_loss'].backward() | |
| loss_dict.update(logs) | |
| if self.gradient_steps >= self.gradient_accumulate_every: | |
| self._plan_optimizer.step() | |
| self._plan_optimizer.zero_grad() | |
| if self._forward_learn_cnt < self.value_step and self._model.value: | |
| self._value_optimizer.step() | |
| self._value_optimizer.zero_grad() | |
| self.gradient_steps = 1 | |
| else: | |
| self.gradient_steps += 1 | |
| self._forward_learn_cnt += 1 | |
| if self._forward_learn_cnt % self.update_target_freq == 0: | |
| if self._forward_learn_cnt < self.step_start_update_target: | |
| self._target_model.load_state_dict(self._model.state_dict()) | |
| else: | |
| self.update_model_average(self._target_model, self._learn_model) | |
| if 'returns' in data.keys(): | |
| loss_dict['max_return'] = target.max().item() | |
| loss_dict['min_return'] = target.min().item() | |
| loss_dict['mean_return'] = target.mean().item() | |
| loss_dict['max_traj'] = x.max().item() | |
| loss_dict['min_traj'] = x.min().item() | |
| loss_dict['mean_traj'] = x.mean().item() | |
| return loss_dict | |
| def update_model_average(self, ma_model, current_model): | |
| for current_params, ma_params in zip(current_model.parameters(), ma_model.parameters()): | |
| old_weight, up_weight = ma_params.data, current_params.data | |
| if old_weight is None: | |
| ma_params.data = up_weight | |
| else: | |
| old_weight * self.target_weight + (1 - self.target_weight) * up_weight | |
| def _monitor_vars_learn(self) -> List[str]: | |
| return [ | |
| 'diffuse_loss', | |
| 'value_loss', | |
| 'max_return', | |
| 'min_return', | |
| 'mean_return', | |
| 'max_traj', | |
| 'min_traj', | |
| 'mean_traj', | |
| 'mean_pred', | |
| 'max_pred', | |
| 'min_pred', | |
| 'a0_loss', | |
| ] | |
| def _state_dict_learn(self) -> Dict[str, Any]: | |
| if self._model.value: | |
| return { | |
| 'model': self._learn_model.state_dict(), | |
| 'target_model': self._target_model.state_dict(), | |
| 'plan_optimizer': self._plan_optimizer.state_dict(), | |
| 'value_optimizer': self._value_optimizer.state_dict(), | |
| } | |
| else: | |
| return { | |
| 'model': self._learn_model.state_dict(), | |
| 'target_model': self._target_model.state_dict(), | |
| 'plan_optimizer': self._plan_optimizer.state_dict(), | |
| } | |
| def _init_eval(self): | |
| self._eval_model = model_wrap(self._target_model, wrapper_name='base') | |
| self._eval_model.reset() | |
| if self.use_target: | |
| self._plan_seq = [] | |
| def init_data_normalizer(self, normalizer: DatasetNormalizer = None): | |
| self.normalizer = normalizer | |
| def _forward_eval(self, data: dict) -> Dict[str, Any]: | |
| data_id = list(data.keys()) | |
| data = default_collate(list(data.values())) | |
| self._eval_model.eval() | |
| if self.use_target: | |
| cur_obs = self.normalizer.normalize(data[:, :self.obs_dim], 'observations') | |
| target_obs = self.normalizer.normalize(data[:, self.obs_dim:], 'observations') | |
| else: | |
| obs = self.normalizer.normalize(data, 'observations') | |
| with torch.no_grad(): | |
| if self.use_target: | |
| cur_obs = torch.tensor(cur_obs) | |
| target_obs = torch.tensor(target_obs) | |
| if self._cuda: | |
| cur_obs = to_device(cur_obs, self._device) | |
| target_obs = to_device(target_obs, self._device) | |
| conditions = {0: cur_obs, self.horizon - 1: target_obs} | |
| else: | |
| obs = torch.tensor(obs) | |
| if self._cuda: | |
| obs = to_device(obs, self._device) | |
| conditions = {0: obs} | |
| if self.use_target: | |
| if self._plan_seq == [] or 0 in self._eval_t: | |
| plan_traj = self._eval_model.get_eval(conditions, self.plan_batch_size) | |
| plan_traj = to_device(plan_traj, 'cpu').numpy() | |
| if self._plan_seq == []: | |
| self._plan_seq = plan_traj | |
| self._eval_t = [0] * len(data_id) | |
| else: | |
| for id in data_id: | |
| if self._eval_t[id] == 0: | |
| self._plan_seq[id] = plan_traj[id] | |
| action = [] | |
| for id in data_id: | |
| if self._eval_t[id] < len(self._plan_seq[id]) - 1: | |
| next_waypoint = self._plan_seq[id][self._eval_t[id] + 1] | |
| else: | |
| next_waypoint = self._plan_seq[id][-1].copy() | |
| next_waypoint[2:] = 0 | |
| cur_ob = cur_obs[id] | |
| cur_ob = to_device(cur_ob, 'cpu').numpy() | |
| act = next_waypoint[:2] - cur_ob[:2] + (next_waypoint[2:] - cur_ob[2:]) | |
| action.append(act) | |
| self._eval_t[id] += 1 | |
| else: | |
| action = self._eval_model.get_eval(conditions, self.plan_batch_size) | |
| if self._cuda: | |
| action = to_device(action, 'cpu') | |
| action = self.normalizer.unnormalize(action, 'actions') | |
| action = torch.tensor(action).to('cpu') | |
| output = {'action': action} | |
| output = default_decollate(output) | |
| return {i: d for i, d in zip(data_id, output)} | |
| def _reset_eval(self, data_id: Optional[List[int]] = None) -> None: | |
| if self.use_target and data_id: | |
| for id in data_id: | |
| self._eval_t[id] = 0 | |
| def _init_collect(self) -> None: | |
| pass | |
| def _forward_collect(self, data: dict, **kwargs) -> dict: | |
| pass | |
| def _process_transition(self, obs: Any, model_output: dict, timestep: namedtuple) -> dict: | |
| pass | |
| def _get_train_sample(self, data: list) -> Union[None, List[Any]]: | |
| pass | |