File size: 5,023 Bytes
52933b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import gym
import numpy as np
from gym import spaces
from .vrp_data import VRPDataset
def assign_env_config(self, kwargs):
"""
Set self.key = value, for each key in kwargs
"""
for key, value in kwargs.items():
setattr(self, key, value)
def dist(loc1, loc2):
return ((loc1[:, 0] - loc2[:, 0]) ** 2 + (loc1[:, 1] - loc2[:, 1]) ** 2) ** 0.5
class CVRPVectorEnv(gym.Env):
def __init__(self, *args, **kwargs):
self.max_nodes = 50
self.capacity_limit = 40
self.n_traj = 50
# if eval_data==True, load from 'test' set, the '0'th data
self.eval_data = False
self.eval_partition = "test"
self.eval_data_idx = 0
self.demand_limit = 10
assign_env_config(self, kwargs)
obs_dict = {"observations": spaces.Box(low=0, high=1, shape=(self.max_nodes, 2))}
obs_dict["depot"] = spaces.Box(low=0, high=1, shape=(2,))
obs_dict["demand"] = spaces.Box(low=0, high=1, shape=(self.max_nodes,))
obs_dict["action_mask"] = spaces.MultiBinary(
[self.n_traj, self.max_nodes + 1]
) # 1: OK, 0: cannot go
obs_dict["last_node_idx"] = spaces.MultiDiscrete([self.max_nodes + 1] * self.n_traj)
obs_dict["current_load"] = spaces.Box(low=0, high=1, shape=(self.n_traj,))
self.observation_space = spaces.Dict(obs_dict)
self.action_space = spaces.MultiDiscrete([self.max_nodes + 1] * self.n_traj)
self.reward_space = None
self.reset()
def seed(self, seed):
np.random.seed(seed)
def _STEP(self, action):
self._go_to(action) # Go to node 'action', modify the reward
self.num_steps += 1
self.state = self._update_state()
# need to revisit the first node after visited all other nodes
self.done = (action == 0) & self.is_all_visited()
return self.state, self.reward, self.done, self.info
# Euclidean cost function
def cost(self, loc1, loc2):
return dist(loc1, loc2)
def is_all_visited(self):
# assumes no repetition in the first `max_nodes` steps
return self.visited[:, 1:].all(axis=1)
def _update_state(self):
obs = {"observations": self.nodes[1:]} # n x 2 array
obs["depot"] = self.nodes[0]
obs["action_mask"] = self._update_mask()
obs["demand"] = self.demands
obs["last_node_idx"] = self.last
obs["current_load"] = self.load
return obs
def _update_mask(self):
# Only allow to visit unvisited nodes
action_mask = ~self.visited
# can only visit depot when last node is not depot or all visited
action_mask[:, 0] |= self.last != 0
action_mask[:, 0] |= self.is_all_visited()
# not allow visit nodes with higher demand than capacity
action_mask[:, 1:] &= self.demands <= (
self.load.reshape(-1, 1) + 1e-5
) # to handle the floating point subtraction precision
return action_mask
def _RESET(self):
self.visited = np.zeros((self.n_traj, self.max_nodes + 1), dtype=bool)
self.visited[:, 0] = True
self.num_steps = 0
self.last = np.zeros(self.n_traj, dtype=int) # idx of the last elem
self.load = np.ones(self.n_traj, dtype=float) # current load
if self.eval_data:
self._load_orders()
else:
self._generate_orders()
self.state = self._update_state()
self.info = {}
self.done = np.array([False] * self.n_traj)
return self.state
def _load_orders(self):
data = VRPDataset[self.eval_partition, self.max_nodes, self.eval_data_idx]
self.nodes = np.concatenate((data["depot"][None, ...], data["loc"]))
self.demands = data["demand"]
self.demands_with_depot = self.demands.copy()
def _generate_orders(self):
self.nodes = np.random.rand(self.max_nodes + 1, 2)
self.demands = (
np.random.randint(low=1, high=self.demand_limit, size=self.max_nodes)
/ self.capacity_limit
)
self.demands_with_depot = self.demands.copy()
def _go_to(self, destination):
dest_node = self.nodes[destination]
dist = self.cost(dest_node, self.nodes[self.last])
self.last = destination
self.load[destination == 0] = 1
self.load[destination > 0] -= self.demands[destination[destination > 0] - 1]
self.demands_with_depot[destination[destination > 0] - 1] = 0
self.visited[np.arange(self.n_traj), destination] = True
self.reward = -dist
def step(self, action):
# return last state after done,
# for the sake of PPO's abuse of ff on done observation
# see https://github.com/opendilab/DI-engine/issues/497
# Not needed for CleanRL
# if self.done.all():
# return self.state, self.reward, self.done, self.info
return self._STEP(action)
def reset(self):
return self._RESET()
|