import retro
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import cv2
import torch
from torch.utils.tensorboard import SummaryWriter
class MaxAndSkipEnv(gym.Wrapper):
def __init__(self, env, skip=4):
"""Return only every `skip`-th frame"""
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=np.uint8)
self._skip = skip
def step(self, action):
"""Repeat action, sum reward, and max over last observations."""
total_reward = 0.0
done = None
for i in range(self._skip):
obs, reward, done, info = self.env.step(action)
if i == self._skip - 2: self._obs_buffer[0] = obs
if i == self._skip - 1: self._obs_buffer[1] = obs
total_reward += reward
if done:
# Note that the observation on the done=True frame
# doesn't matter
max_frame = self._obs_buffer.max(axis=0)
return max_frame, total_reward, done, info
def reset(self, **kwargs):
return self.env.reset(**kwargs)
class LazyFrames(object):
def __init__(self, frames):
"""This object ensures that common frames between the observations are only stored once.
It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay
This object should only be converted to numpy array before being passed to the model.
You'd not believe how complex the previous solution was."""
self._frames = frames
self._out = None
def _force(self):
if self._out is None:
self._out = np.concatenate(self._frames, axis=2)
self._frames = None
return self._out
def __array__(self, dtype=None):
out = self._force()
if dtype is not None:
out = out.astype(dtype)
return out
def __len__(self):
return len(self._force())
def __getitem__(self, i):
return self._force()[i]
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
"""Stack k last frames.
Returns lazy array, which is much more memory efficient.
See Also
gym.Wrapper.__init__(self, env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], shp[2] * k), dtype=env.observation_space.dtype)
def reset(self):
ob = self.env.reset()
for _ in range(self.k):
return self._get_ob()
def step(self, action):
ob, reward, done, info = self.env.step(action)
return self._get_ob(), reward, done, info
def _get_ob(self):
assert len(self.frames) == self.k
return LazyFrames(list(self.frames))
class ClipRewardEnv(gym.RewardWrapper):
def __init__(self, env):
gym.RewardWrapper.__init__(self, env)
def reward(self, reward):
"""Bin reward to {+1, 0, -1} by its sign."""
return np.sign(reward)
class ImageToPyTorch(gym.ObservationWrapper):
def __init__(self, env):
super(ImageToPyTorch, self).__init__(env)
old_shape = self.observation_space.shape
self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]), dtype=np.float32)
def observation(self, observation):
return np.moveaxis(observation, 2, 0)
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = 84
self.height = 84
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1), dtype=np.uint8)
def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
return frame[:, :, None]
class AirstrikerDiscretizer(gym.ActionWrapper):
# 初期化
def __init__(self, env):
super(AirstrikerDiscretizer, self).__init__(env)
buttons = ['B', 'A', 'MODE', 'START', 'UP', 'DOWN', 'LEFT', 'RIGHT', 'C', 'Y', 'X', 'Z']
actions = [['LEFT'], ['RIGHT'], ['B']]
self._actions = []
for action in actions:
arr = np.array([False] * 12)
for button in action:
arr[buttons.index(button)] = True
self.action_space = gym.spaces.Discrete(len(self._actions))
# 行動の取得
def action(self, a):
return self._actions[a].copy()
env = retro.make(game='Airstriker-Genesis')
env = MaxAndSkipEnv(env) ## Return only every `skip`-th frame
env = WarpFrame(env) ## Reshape image
env = ImageToPyTorch(env) ## Invert shape
env = FrameStack(env, 4) ## Stack last 4 frames
# env = ScaledFloatFrame(env) ## Scale frames
env = AirstrikerDiscretizer(env)
env = ClipRewardEnv(env)
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([],maxlen=capacity)
def push(self, *args):
"""Save a transition"""
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
# self.layer1 = nn.Linear(n_observations, 128)
# self.layer2 = nn.Linear(128, 128)
# self.layer3 = nn.Linear(128, n_actions)
self.layer1 = nn.Conv2d(in_channels=n_observations, out_channels=32, kernel_size=8, stride=4)
self.layer2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
self.layer3 = nn.Sequential(nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1), nn.ReLU(), nn.Flatten())
self.layer4 = nn.Linear(17024, 512)
self.layer5 = nn.Linear(512, n_actions)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
x = F.relu(self.layer3(x))
x = F.relu(self.layer4(x))
return self.layer5(x)
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the AdamW optimizer
GAMMA = 0.99
EPS_END = 0.01
EPS_DECAY = 10000
TAU = 0.005
# LR = 1e-4
LR = 0.00025
# Get number of actions from gym action space
n_actions = env.action_space.n
state = env.reset()
n_observations = len(state)
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1)[1].view(1, 1), eps_threshold
return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long), eps_threshold
episode_durations = []
def plot_durations(show_result=False):
durations_t = torch.tensor(episode_durations, dtype=torch.float)
if show_result:
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
if not show_result:
def optimize_model():
if len(memory) < BATCH_SIZE:
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1)[0].
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
# In-place gradient clipping
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
with SummaryWriter() as writer:
if torch.cuda.is_available():
num_episodes = 600
num_episodes = 50
epsilon = 1
episode_rewards = []
for i_episode in range(num_episodes):
# Initialize the environment and get it's state
state = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
episode_reward = 0
for t in count():
action, epsilon = select_action(state)
observation, reward, done, info = env.step(action.item())
reward = torch.tensor([reward], device=device)
done = done or info["gameover"] == 1
if done:
episode_durations.append(t + 1)
print(f"Episode {i_episode} done")
# plot_durations()
# if done:
# next_state = None
# else:
# next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
# Store the transition in memory
memory.push(state, action, next_state, reward)
episode_reward += reward
# Move to the next state
state = next_state
# Perform one step of the optimization (on the policy network)
# Soft update of the target network's weights
# θ′ ← τ θ + (1 −τ )θ′
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
# if done:
# episode_durations.append(t + 1)
# # plot_durations()
# break
# episode_rewards.append(episode_reward)
writer.add_scalar("Rewards/Episode", episode_reward, i_episode)
writer.add_scalar("Epsilon", epsilon, i_episode)