JobShopCPRL / MyDummyVecEnv.py
Pierre Tassel
improvements
9a90bc0
from collections import OrderedDict
from typing import Any, Callable, List, Optional, Sequence, Type, Union
import gym
import numpy as np
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvIndices, VecEnvObs, VecEnvStepReturn
from stable_baselines3.common.vec_env.util import dict_to_obs, obs_space_info
import torch
class MyDummyVecEnv(VecEnv):
"""
Creates a simple vectorized wrapper for multiple environments, calling each environment in sequence on the current
Python process. This is useful for computationally simple environment such as ``cartpole-v1``,
as the overhead of multiprocess or multithread outweighs the environment computation time.
This can also be used for RL methods that
require a vectorized environment, but that you want a single environments to train with.
:param env_fns: a list of functions
that return environments to vectorize
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]], device):
self.envs = [fn() for fn in env_fns]
env = self.envs[0]
VecEnv.__init__(self, len(env_fns), env.observation_space, env.action_space)
obs_space = env.observation_space
self.keys, shapes, dtypes = obs_space_info(obs_space)
self.device = device
self.buf_obs = OrderedDict(
[(k, torch.zeros((self.num_envs,) + tuple(shapes[k]), dtype=torch.float, device=self.device)) for k in self.keys])
self.buf_dones = np.zeros((self.num_envs,), dtype=bool)
self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32)
self.buf_infos = [{} for _ in range(self.num_envs)]
self.actions = None
def step_async(self, actions: np.ndarray) -> None:
self.actions = actions
def step_wait(self) -> VecEnvStepReturn:
for env_idx in range(self.num_envs):
obs, self.buf_rews[env_idx], self.buf_dones[env_idx], self.buf_infos[env_idx] = self.envs[env_idx].step(
self.actions[env_idx]
)
if self.buf_dones[env_idx]:
# save final observation where user can get it, then reset
self.buf_infos[env_idx]["terminal_observation"] = obs
obs = self.envs[env_idx].reset()
self._save_obs(env_idx, obs)
return (self._obs_from_buf(), self.buf_rews, self.buf_dones, self.buf_infos)
def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]:
seeds = list()
for idx, env in enumerate(self.envs):
seeds.append(env.seed(seed + idx))
return seeds
def reset(self) -> VecEnvObs:
for env_idx in range(self.num_envs):
obs = self.envs[env_idx].reset()
self._save_obs(env_idx, obs)
return self._obs_from_buf()
def close(self) -> None:
for env in self.envs:
env.close()
def get_images(self) -> Sequence[np.ndarray]:
return [env.render(mode="rgb_array") for env in self.envs]
def render(self, mode: str = "human") -> Optional[np.ndarray]:
"""
Gym environment rendering. If there are multiple environments then
they are tiled together in one image via ``BaseVecEnv.render()``.
Otherwise (if ``self.num_envs == 1``), we pass the render call directly to the
underlying environment.
Therefore, some arguments such as ``mode`` will have values that are valid
only when ``num_envs == 1``.
:param mode: The rendering type.
"""
if self.num_envs == 1:
return self.envs[0].render(mode=mode)
else:
return super().render(mode=mode)
def _save_obs(self, env_idx: int, obs: VecEnvObs) -> None:
for key in self.keys:
self.buf_obs[key][env_idx] = torch.from_numpy(obs[key]).to(self.device, non_blocking=True)
def _obs_from_buf(self) -> VecEnvObs:
return dict_to_obs(self.observation_space, self.buf_obs)
def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]:
"""Return attribute from vectorized environment (see base class)."""
target_envs = self._get_target_envs(indices)
return [getattr(env_i, attr_name) for env_i in target_envs]
def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None:
"""Set attribute inside vectorized environments (see base class)."""
target_envs = self._get_target_envs(indices)
for env_i in target_envs:
setattr(env_i, attr_name, value)
def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]:
"""Call instance methods of vectorized environments."""
target_envs = self._get_target_envs(indices)
return [getattr(env_i, method_name)(*method_args, **method_kwargs) for env_i in target_envs]
def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]:
"""Check if worker environments are wrapped with a given wrapper"""
target_envs = self._get_target_envs(indices)
# Import here to avoid a circular import
from stable_baselines3.common import env_util
return [env_util.is_wrapped(env_i, wrapper_class) for env_i in target_envs]
def _get_target_envs(self, indices: VecEnvIndices) -> List[gym.Env]:
indices = self._get_indices(indices)
return [self.envs[i] for i in indices]