|
import numpy as np |
|
import torch |
|
|
|
|
|
class ReplayMemory(): |
|
"""Buffer to store environment transitions.""" |
|
def __init__(self, state_dim, action_dim, capacity, device): |
|
self.capacity = int(capacity) |
|
self.device = device |
|
|
|
self.states = np.empty((self.capacity, int(state_dim)), dtype=np.float32) |
|
self.actions = np.empty((self.capacity, int(action_dim)), dtype=np.float32) |
|
self.rewards = np.empty((self.capacity, 1), dtype=np.float32) |
|
self.next_states = np.empty((self.capacity, int(state_dim)), dtype=np.float32) |
|
self.masks = np.empty((self.capacity, 1), dtype=np.float32) |
|
|
|
self.idx = 0 |
|
self.full = False |
|
|
|
def append(self, state, action, reward, next_state, mask): |
|
|
|
np.copyto(self.states[self.idx], state) |
|
np.copyto(self.actions[self.idx], action) |
|
np.copyto(self.rewards[self.idx], reward) |
|
np.copyto(self.next_states[self.idx], next_state) |
|
np.copyto(self.masks[self.idx], mask) |
|
|
|
self.idx = (self.idx + 1) % self.capacity |
|
self.full = self.full or self.idx == 0 |
|
|
|
def sample(self, batch_size): |
|
idxs = np.random.randint( |
|
0, self.capacity if self.full else self.idx, size=batch_size |
|
) |
|
|
|
states = torch.as_tensor(self.states[idxs], device=self.device) |
|
actions = torch.as_tensor(self.actions[idxs], device=self.device) |
|
rewards = torch.as_tensor(self.rewards[idxs], device=self.device) |
|
next_states = torch.as_tensor(self.next_states[idxs], device=self.device) |
|
masks = torch.as_tensor(self.masks[idxs], device=self.device) |
|
|
|
return states, actions, rewards, next_states, masks |
|
|
|
|
|
class DiffusionMemory(): |
|
"""Buffer to store best actions.""" |
|
def __init__(self, state_dim, action_dim, capacity, device): |
|
self.capacity = int(capacity) |
|
self.device = device |
|
|
|
self.states = np.empty((self.capacity, int(state_dim)), dtype=np.float32) |
|
self.best_actions = np.empty((self.capacity, int(action_dim)), dtype=np.float32) |
|
|
|
self.idx = 0 |
|
self.full = False |
|
|
|
def append(self, state, action): |
|
|
|
np.copyto(self.states[self.idx], state) |
|
np.copyto(self.best_actions[self.idx], action) |
|
|
|
self.idx = (self.idx + 1) % self.capacity |
|
self.full = self.full or self.idx == 0 |
|
|
|
def sample(self, batch_size): |
|
idxs = np.random.randint( |
|
0, self.capacity if self.full else self.idx, size=batch_size |
|
) |
|
|
|
states = torch.as_tensor(self.states[idxs], device=self.device) |
|
best_actions = torch.as_tensor(self.best_actions[idxs], device=self.device) |
|
|
|
best_actions.requires_grad_(True) |
|
|
|
return states, best_actions, idxs |
|
|
|
def replace(self, idxs, best_actions): |
|
np.copyto(self.best_actions[idxs], best_actions) |
|
|