Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any, Tuple, Union | |
| from collections import namedtuple | |
| import torch | |
| import treetensor as ttorch | |
| from ding.rl_utils import get_gae_with_default_last_value, get_train_sample | |
| from ding.torch_utils import Adam, to_device | |
| from ding.utils import POLICY_REGISTRY, split_data_generator | |
| from ding.utils.data import default_collate, default_decollate | |
| from .base_policy import Policy | |
| from .common_utils import default_preprocess_learn | |
| class PGPolicy(Policy): | |
| r""" | |
| Overview: | |
| Policy class of Policy Gradient (REINFORCE) algorithm. | |
| """ | |
| config = dict( | |
| # (string) RL policy register name (refer to function "register_policy"). | |
| type='pg', | |
| # (bool) whether to use cuda for network. | |
| cuda=False, | |
| # (bool) whether use on-policy training pipeline(behaviour policy and training policy are the same) | |
| on_policy=True, # for pg strictly on policy algorithm, this line should not be modified by users | |
| # (str) action space type: ['discrete', 'continuous'] | |
| action_space='discrete', | |
| # (bool) whether to use deterministic action for evaluation. | |
| deterministic_eval=True, | |
| learn=dict( | |
| # (int) the number of samples for one update. | |
| batch_size=64, | |
| # (float) the step size of one gradient descend. | |
| learning_rate=0.001, | |
| # ============================================================== | |
| # The following configs is algorithm-specific | |
| # ============================================================== | |
| # (float) loss weight of the entropy regularization, the weight of policy network is set to 1 | |
| entropy_weight=0.01, | |
| # (float) max grad norm value. | |
| grad_norm=5, | |
| # (bool) whether to ignore done signal for non-termination env. | |
| ignore_done=False, | |
| ), | |
| collect=dict( | |
| # (int) collect n_sample data, train model n_iteration times | |
| # n_episode=8, | |
| # (int) trajectory unroll length | |
| unroll_len=1, | |
| # ============================================================== | |
| # The following configs is algorithm-specific | |
| # ============================================================== | |
| # (float) discount factor for future reward, defaults int [0, 1] | |
| discount_factor=0.99, | |
| collector=dict(get_train_sample=True), | |
| ), | |
| eval=dict(), | |
| ) | |
| def default_model(self) -> Tuple[str, List[str]]: | |
| return 'pg', ['ding.model.template.pg'] | |
| def _init_learn(self) -> None: | |
| r""" | |
| Overview: | |
| Learn mode init method. Called by ``self.__init__``. | |
| Init the optimizer, algorithm config, main and target models. | |
| """ | |
| # Optimizer | |
| self._optimizer = Adam(self._model.parameters(), lr=self._cfg.learn.learning_rate) | |
| self._entropy_weight = self._cfg.learn.entropy_weight | |
| self._grad_norm = self._cfg.learn.grad_norm | |
| self._learn_model = self._model # for compatibility | |
| def _forward_learn(self, data: dict) -> Dict[str, Any]: | |
| r""" | |
| Overview: | |
| Forward and backward function of learn mode. | |
| Arguments: | |
| - data (:obj:`dict`): Dict type data, including at least ['obs', 'action', 'reward', 'next_obs','adv'] | |
| Returns: | |
| - info_dict (:obj:`Dict[str, Any]`): Including current lr and loss. | |
| """ | |
| data = default_preprocess_learn(data, ignore_done=self._cfg.learn.ignore_done, use_nstep=False) | |
| if self._cuda: | |
| data = to_device(data, self._device) | |
| self._model.train() | |
| return_infos = [] | |
| for batch in split_data_generator(data, self._cfg.learn.batch_size, shuffle=True): | |
| # forward | |
| output = self._learn_model.forward(batch['obs']) | |
| return_ = batch['return'] | |
| dist = output['dist'] | |
| # calculate PG loss | |
| log_prob = dist.log_prob(batch['action']) | |
| policy_loss = -(log_prob * return_).mean() | |
| entropy_loss = -self._cfg.learn.entropy_weight * dist.entropy().mean() | |
| total_loss = policy_loss + entropy_loss | |
| # update | |
| self._optimizer.zero_grad() | |
| total_loss.backward() | |
| grad_norm = torch.nn.utils.clip_grad_norm_( | |
| list(self._learn_model.parameters()), | |
| max_norm=self._grad_norm, | |
| ) | |
| self._optimizer.step() | |
| # only record last updates information in logger | |
| return_info = { | |
| 'cur_lr': self._optimizer.param_groups[0]['lr'], | |
| 'total_loss': total_loss.item(), | |
| 'policy_loss': policy_loss.item(), | |
| 'entropy_loss': entropy_loss.item(), | |
| 'return_abs_max': return_.abs().max().item(), | |
| 'grad_norm': grad_norm, | |
| } | |
| return_infos.append(return_info) | |
| return return_infos | |
| def _init_collect(self) -> None: | |
| self._unroll_len = self._cfg.collect.unroll_len | |
| self._gamma = self._cfg.collect.discount_factor | |
| def _forward_collect(self, data: dict) -> dict: | |
| data_id = list(data.keys()) | |
| data = default_collate(list(data.values())) | |
| if self._cuda: | |
| data = to_device(data, self._device) | |
| self._model.eval() | |
| with torch.no_grad(): | |
| output = self._model.forward(data) | |
| output['action'] = output['dist'].sample() | |
| if self._cuda: | |
| output = to_device(output, 'cpu') | |
| output = default_decollate(output) | |
| return {i: d for i, d in zip(data_id, output)} | |
| def _process_transition(self, obs: Any, model_output: dict, timestep: namedtuple) -> dict: | |
| r""" | |
| Overview: | |
| Generate dict type transition data from inputs. | |
| Arguments: | |
| - obs (:obj:`Any`): Env observation | |
| - model_output (:obj:`dict`): Output of collect model, including at least ['action'] | |
| - timestep (:obj:`namedtuple`): Output after env step, including at least ['obs', 'reward', 'done'] \ | |
| (here 'obs' indicates obs after env step). | |
| Returns: | |
| - transition (:obj:`dict`): Dict type transition data. | |
| """ | |
| return { | |
| 'obs': obs, | |
| 'action': model_output['action'], | |
| 'reward': timestep.reward, | |
| 'done': timestep.done, | |
| } | |
| def _get_train_sample(self, data: list) -> Union[None, List[Any]]: | |
| r""" | |
| Overview: | |
| Get the trajectory and the n step return data, then sample from the n_step return data | |
| Arguments: | |
| - data (:obj:`list`): The trajectory's buffer list | |
| Returns: | |
| - samples (:obj:`dict`): The training samples generated | |
| """ | |
| assert data[-1]['done'], "PG needs a complete epsiode" | |
| if self._cfg.learn.ignore_done: | |
| raise NotImplementedError | |
| R = 0. | |
| if isinstance(data, list): | |
| for i in reversed(range(len(data))): | |
| R = self._gamma * R + data[i]['reward'] | |
| data[i]['return'] = R | |
| return get_train_sample(data, self._unroll_len) | |
| elif isinstance(data, ttorch.Tensor): | |
| data_size = data['done'].shape[0] | |
| data['return'] = ttorch.torch.zeros(data_size) | |
| for i in reversed(range(data_size)): | |
| R = self._gamma * R + data['reward'][i] | |
| data['return'][i] = R | |
| return get_train_sample(data, self._unroll_len) | |
| else: | |
| raise ValueError | |
| def _init_eval(self) -> None: | |
| pass | |
| def _forward_eval(self, data: dict) -> dict: | |
| data_id = list(data.keys()) | |
| data = default_collate(list(data.values())) | |
| if self._cuda: | |
| data = to_device(data, self._device) | |
| self._model.eval() | |
| with torch.no_grad(): | |
| output = self._model.forward(data) | |
| if self._cfg.deterministic_eval: | |
| if self._cfg.action_space == 'discrete': | |
| output['action'] = output['logit'].argmax(dim=-1) | |
| elif self._cfg.action_space == 'continuous': | |
| output['action'] = output['logit']['mu'] | |
| else: | |
| raise KeyError("invalid action_space: {}".format(self._cfg.action_space)) | |
| else: | |
| output['action'] = output['dist'].sample() | |
| if self._cuda: | |
| output = to_device(output, 'cpu') | |
| output = default_decollate(output) | |
| return {i: d for i, d in zip(data_id, output)} | |
| def _monitor_vars_learn(self) -> List[str]: | |
| return super()._monitor_vars_learn() + ['policy_loss', 'entropy_loss', 'return_abs_max', 'grad_norm'] | |