The Atari games can be played in several ways. The first is the interaction way. Either we can use a memory view, or we can use the displayed image (which is always the same). On top of this, the -v? at the end of the name of the game indicates if the step is repeated and how often. v0 for breakout indicates that the step is taken two, three, or four times before we ask for a new one. For v4, it skips four frames deterministically.
We can start with an empty, simple breakout game:
# Import the gym module
import gym
# Create a breakout environment
env = gym.make('BreakoutDeterministic-v4')
# Reset it, returns the starting frame
frame = env.reset()
# Render
env.render()
is_done = False
while not is_done:
# Perform a random action, returns the new frame, reward and whether the game is over
frame, reward, is_done, _ = env.step(env.action_space.sample())
# Render
env.render()
The only thing we now need to modify is how we get the new step for the game. Well, we need more than that: first we need to train a model!
Let's look at the context. We can get images from the environment (they are 160 x 210 pixels), and considering the fact that we will require lots of previous images, this size may be too much to fit on one computer. We can drop one pixel out of two in all directions, for instance, so this is what preprocess will achieve. We will also add two functions that transpose our internal state. The reason is that we have images that are 84 x 105 with one channel, but we need to use past images to know in which direction the ball moves. To achieve this, we transpose this state on the fly to have an image that is 84 x 105 x state_length:
import gym
import os
import six
import numpy as np
import tensorflow as tf
import random
from collections import deque , namedtuple
Transition = namedtuple("Transition",
["state", "action", "re-ward", "next_state", "done"])
def to_grayscale(img):
return np.mean(img, axis=2).astype(np.uint8)
def downsample(img):
return img[::2, ::2]
def preprocess(img):
return to_grayscale(downsample(img))[None,:,:]
def adapt_state(state):
return [np.float32(np.transpose(state, (2, 1, 0)) / 255.0)]
def adapt_batch_state(state):
return np.transpose(np.array(state), (0, 3, 2, 1)) / 255.0
def get_initial_state(frame):
processed_frame = preprocess(frame)
state = [processed_frame for _ in range(state_length)]
return np.concatenate(state)
To make this better, we could use skimage.rescale instead. For breakout, we don't need it, so this is left as an exercise for the reader.
We are now going to write a set of hyperparameters, as well as some constants for the game, like the name of the environment and the size of the image:
env_name = "Breakout-v4"
width = 80 # Resized frame width
height = 105 # Resized frame height
We need to train the network for a very long time, so let's play 12000 games. To predict a new action, we will use the past 4 images:
n_episodes = 12000 # Number of runs for the agent
state_length = 4 # Number of most frames we input to the network
We are also going to need to set our parameters for the Q function:
gamma = 0.99 # Discount factor
At the beginning, we want to test very often a random action (left or right for breakout). Then during the training, we will progressively remove the randomness (this is our epsilon-greedy strategy). Each time we run the network, we consider this one step, so let's reduce this random factor by over 1 million steps:
# During all these steps, we progressively lower epsilon
exploration_steps = 1000000
initial_epsilon = 1.0 # Initial value of epsilon in epsilon-greedy
final_epsilon = 0.1 # Final value of epsilon in epsilon-greedy
We need to fill in our collection of actions, so at the beginning we don't train, we just let the game play with random actions. This is going to be our initial training set, and over time we will add all our games to this set of training set. When it hits 400000 elements, we start dumping the old, more random training states:
# Number of steps to populate the replay memory before training starts
initial_random_search = 20000
replay_memory_size = 400000 # Number of states we keep for training
batch_size = 32 # Batch size
network_update_interval = 10000 # The frequency with which the target network is updated
We will use RMSProp to train our network, so we set a very low learning rate with momentum:
learning_rate = 0.00025 # Learning rate used by RMSProp
momentum = 0.95 # momentum used by RMSProp
# Constant added to the squared gradient in the denominator
# of the RMSProp update
min_gradient = 0.01
Finally, we will store the trained network through time (with some checkpoints so that we can restart the training at some partially trained state), and we will store some information to Tensorboard, like the reward that we found and the length of a game:
network_path = 'saved_networks/' + env_name
tensorboard_path = 'summary/' + env_name
save_interval = 300000 # The frequency with which the network is saved
We can now create our network class. We will create one instance for each network. Yes, we need two networks—one to estimate the next action to take and one to estimate the Q values or targets. From time to time, we will update the network for action (named q_estimator here) to the target estimator (named target_estimator):
class Estimator():
"""Q-Value Estimator neural network.
This network is used for both the Q-Network and the Target Network.
"""
def __init__(self, env, scope="estimator", summar-ies_dir=None):
self.scope = scope
self.num_actions = env.action_space.n
self.epsilon = initial_epsilon
self.epsilon_step =
(initial_epsilon - final_epsilon) / exploration_steps
# Writes Tensorboard summaries to disk
self.summary_writer = None
with tf.variable_scope(scope):
# Build the graph
self.build_model()
if summaries_dir:
summary_dir = os.path.join(summaries_dir,
"summaries_%s" % scope)
if not os.path.exists(summary_dir):
os.makedirs(summary_dir)
self.summary_writer = tf.summary.FileWriter(summary_dir)
def build_model(self):
"""
Builds the Tensorflow graph.
"""
self.X = tf.placeholder(shape=[None, width, height, state_length],
dtype=tf.float32, name="X")
# The TD target value
self.y = tf.placeholder(shape=[None], dtype=tf.float32, name="y")
# Integer id of which action was selected
self.actions = tf.placeholder(shape=[None], dtype=tf.int32, name="actions")
model = tf.keras.Sequential()
model.add(tf.keras.layers.Convolution2D(filters=32, kernel_size=8,
strides=(4, 4), activation='relu',
input_shape=(width, height, state_length), name="Layer1"))
model.add(tf.keras.layers.Convolution2D(filters=64, kernel_size=4,
strides=(2, 2), activation='relu', name="Layer2"))
model.add(tf.keras.layers.Convolution2D(filters=64, kernel_size=3,
strides=(1, 1), activation='relu', name="Layer3"))
model.add(tf.keras.layers.Flatten(name="Flatten"))
model.add(tf.keras.layers.Dense(512, activation='relu',
name="Layer4"))
model.add(tf.keras.layers.Dense(self.num_actions, name="Output"))
self.predictions = model(self.X)
a_one_hot = tf.one_hot(self.actions, self.num_actions, 1.0, 0.0)
q_value = tf.reduce_sum(tf.multiply(self.predictions, a_one_hot),
reduction_indices=1)
# Calculate the loss
self.losses = tf.squared_difference(self.y, q_value)
self.loss = tf.reduce_mean(self.losses)
# Optimizer Parameters from original paper
self.optimizer = tf.train.RMSPropOptimizer(learning_rate,
momentum=momentum, epsilon=min_gradient)
self.train_op = self.optimizer.minimize(self.loss,
global_step=tf.train.get_global_step())
# Summaries for Tensorboard
self.summaries = tf.summary.merge([
tf.summary.scalar("loss", self.loss),
tf.summary.histogram("loss_hist", self.losses),
tf.summary.histogram("q_values_hist", self.predictions),
tf.summary.scalar("max_q_value",
tf.reduce_max(self.predictions))
])
We used keras in this case to build our network. It stacks three convolutional layers (without max pool layers, although we do drop some nodes to reduce the number of parameters) and then two dense layers. All of them use a relu activation later.
With the network, we can now create a cost function and feed it to an optimizer. We also add some summary reports to check the distribution of Q or loss values:
def predict(self, sess, s):
return sess.run(self.predictions, { self.X: s })
def update(self, sess, s, a, y):
feed_dict = { self.X: s, self.y: y, self.actions: a }
summaries, global_step, _, loss = sess.run(
[self.summaries, tf.train.get_global_step(), self.train_op, self.loss], feed_dict)
if self.summary_writer:
self.summary_writer.add_summary(summaries, glob-al_step)
return loss
def get_action(self, sess, state):
if self.epsilon >= random.random():
action = random.randrange(self.num_actions)
else:
action = np.argmax(self.predict(sess, adapt_state(state)))
# Decay epsilon over time
if self.epsilon > final_epsilon:
self.epsilon -= self.epsilon_step
return action
def get_trained_action(self, state):
action = np.argmax(self.predict(sess, adapt_state(state)))
return action
We add a method to wrap the prediction, as we will use it in several places—firstly in an update method that will actually train this estimator. We also have two methods to retrieve an action, either with an epsilon-greedy strategy or without (after the training):
def copy_model_parameters(estimator1, estimator2):
"""
Copies the model parameters of one estimator to another.
Args:
estimator1: Estimator to copy the paramters from
estimator2: Estimator to copy the parameters to
"""
e1_params = [t for t in tf.trainable_variables()
if t.name.startswith(estimator1.scope)]
e1_params = sorted(e1_params, key=lambda v: v.name)
e2_params = [t for t in tf.trainable_variables()
if t.name.startswith(estimator2.scope)]
e2_params = sorted(e2_params, key=lambda v: v.name)
update_ops = []
for e1_v, e2_v in zip(e1_params, e2_params):
op = e2_v.assign(e1_v)
update_ops.append(op)
return update_ops
This is our function that we will call to update one estimator from another. This creates a set of operations that we will run in our session later:
def create_memory(env):
# Populate the replay memory with initial experience
replay_memory = deque()
frame = env.reset()
state = get_initial_state(frame)
for i in range(replay_memory_init_size):
action = np.random.choice(np.arange(env.action_space.n))
frame, reward, done, _ = env.step(action)
next_state = np.append(state[1:, :, :], preprocess(frame), axis=0)
replay_memory.append(
Transition(state, action, reward, next_state, done))
if done:
frame = env.reset()
state = get_initial_state(frame)
else:
state = next_state
return replay_memory
This function creates an empty replay memory. This is required so that the game can learn something. Without this set of initial states, we cannot train the network. So we just play random moves for a while and hope it will make our network gain some first-hand knowledge of the game. Of course, we also have our epsilon-greedy strategy that will add new moves to the game later. This will also help us a lot:
def setup_summary():
with tf.variable_scope("episode"):
episode_total_reward = tf.Variable(0., name="EpisodeTotalReward")
tf.summary.scalar('Total Reward', episode_total_reward)
episode_avg_max_q = tf.Variable(0., name="EpisodeAvgMaxQ")
tf.summary.scalar('Average Max Q', episode_avg_max_q)
episode_duration = tf.Variable(0., name="EpisodeDuration")
tf.summary.scalar('Duration', episode_duration)
episode_avg_loss = tf.Variable(0., name="EpisodeAverageLoss")
tf.summary.scalar('Average Loss', episode_avg_loss)
summary_vars = [episode_total_reward, episode_avg_max_q,
episode_duration, episode_avg_loss]
summary_placeholders =
[tf.placeholder(tf.float32) for _ in range(len(summary_vars))]
update_ops = [sum-mary_vars[i].assign(summary_placeholders[i])
for i in range(len(summary_vars))]
summary_op = tf.summary.merge_all(scope="episode")
return summary_placeholders, update_ops, summary_op
We defined here all the variables we want to visualize in Tensorboard on top of the histograms from the estimator.
We can start our main training loop by setting up the environment, estimators, and help functions:
if __name__ == "__main__":
from tqdm import tqdm
env = gym.make(env_name)
tf.reset_default_graph()
# Create a global step variable
global_step = tf.Variable(0, name='global_step', traina-ble=False)
# Create estimators
q_estimator = Estimator(env, scope="q",
summaries_dir=tensorboard_path)
target_estimator = Estimator(env, scope="target_q")
copy_model = copy_model_parameters(q_estimator, tar-get_estimator)
summary_placeholders, update_ops, summary_op = setup_summary()
replay_memory = create_memory(env)
We can start our Tensorflow session and restore the network if there is a previous version stored in our save location:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
# Load a previous checkpoint if we find one
latest_checkpoint = tf.train.latest_checkpoint(network_path)
if latest_checkpoint:
print("Loading model checkpoint %s... " % lat-est_checkpoint)
saver.restore(sess, latest_checkpoint)
total_t = sess.run(tf.train.get_global_step())
From here, we can start playing games. We do that first by saving the network if we need to, and then we set up the game state:
for episode in tqdm(range(n_episodes)):
if total_t % save_interval == 0:
# Save the current checkpoint
saver.save(tf.get_default_session(), network_path)
frame = env.reset()
state = get_initial_state(frame)
total_reward = 0
total_loss = 0
total_q_max = 0
We iterate forever in this game, taking an action and saving the state of this action in our replay memory. This way, when the network learns to play better, we also save these better moves to learn them even better later:
for duration in itertools.count():
# Maybe update the target estimator
if total_t % network_update_interval == 0:
sess.run(copy_model)
action = q_estimator.get_action(sess, state)
frame, reward, terminal, _ = env.step(action)
processed_frame = preprocess(frame)
next_state = np.append(state[1:, :, :], processed_frame, axis=0)
reward = np.clip(reward, -1, 1)
replay_memory.append(
Transition(state, action, reward, next_state, terminal))
if len(replay_memory) > replay_memory_size:
replay_memory.popleft()
We get a set of states from our replay memory, with the reward, the action that was used, to estimate our Q value. Once we have this, we optimize the network to enhance its behavior. This is now where we can update our network to play better, based on the target Q-network:
samples = random.sample(replay_memory, batch_size)
states_batch, action_batch, reward_batch, next_states_batch, done_batch =
map(np.array, zip(*samples))
# Calculate q values and targets (Double DQN)
adapted_state = adapt_batch_state(next_states_batch)
q_values_next = q_estimator.predict(sess, adapted_state)
best_actions = np.argmax(q_values_next, axis=1)
q_values_next_target = tar-get_estimator.predict(sess, adapted_state)
targets_batch = reward_batch + np.invert(done_batch).astype(np.float32) *
gamma * q_values_next_target[np.arange(batch_size), best_actions]
# Perform gradient descent update
states_batch = adapt_batch_state(states_batch)
loss = q_estimator.update(sess, states_batch, action_batch, targets_batch)
total_q_max += np.max(q_values_next)
total_loss += loss
total_t += 1
total_reward += reward
if terminal:
break
Once the game is finished, we save our variables to Tensorboard as well as capture a screenshot of the endgame:
stats = [total_reward, total_q_max / duration, dura-tion, total_loss / duration]
for i in range(len(stats)):
sess.run(update_ops[i], feed_dict={
summary_placeholders[i]: float(stats[i])
})
summary_str = sess.run(summary_op, )
q_estimator.summary_writer.add_summary(summary_str, episode)
env.env.ale.saveScreenPNG(six.b('%s/test_image_%05i.png' % (CHART_DIR, episode)))
We can train our network over our 12,000 games with this final loop. For each iteration, we get a new action from the trained network (starting with lots of random ones) and we train our network.
Here is an example of the Tensorboard graphs for the previous code:
After a long time, we can see the average Q slowly improving, although the reward stays low. We can see that after the training the network is a bit better, but it will still require lots of games to be good!
This is another view available in Tensorboard when we called summary_writer.add_graph(sess.graph):