DIPO / agent /replay_memory.py
Wyatt-Huang's picture
Upload 10 files
f761808 verified
import numpy as np
import torch
class ReplayMemory():
"""Buffer to store environment transitions."""
def __init__(self, state_dim, action_dim, capacity, device):
self.capacity = int(capacity)
self.device = device
self.states = np.empty((self.capacity, int(state_dim)), dtype=np.float32)
self.actions = np.empty((self.capacity, int(action_dim)), dtype=np.float32)
self.rewards = np.empty((self.capacity, 1), dtype=np.float32)
self.next_states = np.empty((self.capacity, int(state_dim)), dtype=np.float32)
self.masks = np.empty((self.capacity, 1), dtype=np.float32)
self.idx = 0
self.full = False
def append(self, state, action, reward, next_state, mask):
np.copyto(self.states[self.idx], state)
np.copyto(self.actions[self.idx], action)
np.copyto(self.rewards[self.idx], reward)
np.copyto(self.next_states[self.idx], next_state)
np.copyto(self.masks[self.idx], mask)
self.idx = (self.idx + 1) % self.capacity
self.full = self.full or self.idx == 0
def sample(self, batch_size):
idxs = np.random.randint(
0, self.capacity if self.full else self.idx, size=batch_size
)
states = torch.as_tensor(self.states[idxs], device=self.device)
actions = torch.as_tensor(self.actions[idxs], device=self.device)
rewards = torch.as_tensor(self.rewards[idxs], device=self.device)
next_states = torch.as_tensor(self.next_states[idxs], device=self.device)
masks = torch.as_tensor(self.masks[idxs], device=self.device)
return states, actions, rewards, next_states, masks
class DiffusionMemory():
"""Buffer to store best actions."""
def __init__(self, state_dim, action_dim, capacity, device):
self.capacity = int(capacity)
self.device = device
self.states = np.empty((self.capacity, int(state_dim)), dtype=np.float32)
self.best_actions = np.empty((self.capacity, int(action_dim)), dtype=np.float32)
self.idx = 0
self.full = False
def append(self, state, action):
np.copyto(self.states[self.idx], state)
np.copyto(self.best_actions[self.idx], action)
self.idx = (self.idx + 1) % self.capacity
self.full = self.full or self.idx == 0
def sample(self, batch_size):
idxs = np.random.randint(
0, self.capacity if self.full else self.idx, size=batch_size
)
states = torch.as_tensor(self.states[idxs], device=self.device)
best_actions = torch.as_tensor(self.best_actions[idxs], device=self.device)
best_actions.requires_grad_(True)
return states, best_actions, idxs
def replace(self, idxs, best_actions):
np.copyto(self.best_actions[idxs], best_actions)