Appendix

1. Introduction to Reinforcement Learning

Activity 1.01: Measuring the Performance of a Random Agent

  1. Import the required libraries – abc, numpy, and gym:

    import abc

    import numpy as np

    import gym

  2. Define the abstract class representing the agent:

    """

    Abstract class representing the agent

    Init with the action space and the function pi returning the action

    """

    class Agent:

        def __init__(self, action_space: gym.spaces.Space):

            """

            Constructor of the agent class.

            Args:

                action_space (gym.spaces.Space): environment action space

            """

            raise NotImplementedError("This class cannot be instantiated.")

        @abc.abstractmethod

        def pi(self, state: np.ndarray) -> np.ndarray:

            """

            Agent's policy.

            Args:

                state (np.ndarray): environment state

            Returns:

                The selected action

            """

            pass

    An agent is represented by only a constructor and an abstract method, pi. This method is the actual policy; it takes as input the environment state and returns the selected action.

  3. Define a continuous agent. A continuous agent has to initialize the probability distribution according to the action space passed as an input to the constructor:

    class ContinuousAgent(Agent):

        def __init__(self, action_space: gym.spaces.Space, seed=46):

            # setup seed

            np.random.seed(seed)

            # check the action space type

            if not isinstance(action_space, gym.spaces.Box):

                raise ValueError

                      ("This is a Continuous Agent pass as "

                       "input a Box Space.")

  4. If the upper and lower bounds are infinite, the probability distribution is simply a normal distribution centered at 0, with a scale that is equal to 1:

            """

            initialize the distribution according to the action space type

            """

            if (action_space.low == -np.inf) and

               (action_space.high == np.inf):

                # the distribution is a normal distribution

                self._pi = lambda: np.random.normal

                                   (loc=0, scale=1,

                                    size=action_space.shape)

                return

  5. If the upper and lower bounds are both finite, the distribution is a uniform distribution defined in that range:

            if (action_space.low != -np.inf) and

               (action_space.high != np.inf):

                # the distribution is a uniform distribution

                self._pi = lambda: np.random.uniform

                           (low=action_space.low,

                            high=action_space.high,

                            size=action_space.shape)

                return

    If the lower bound is 1, the probability distribution is a shifted negative exponential distribution:

            if action_space.low == -np.inf:

                # negative exponential distribution

                self._pi = (lambda: -np.random.exponential

                            (size=action_space.shape)

                            + action_space.high)

                return

    If the upper bound is 2, the probability distribution is a shifted exponential distribution:

            if action_space.high == np.inf:

                # exponential distribution

                self._pi = (lambda: np.random.exponential

                            (size=action_space.shape)

                            + action_space.low)

                return

  6. Define the pi method, which is simply a call to the distribution defined in the constructor:

        def pi(self, observation: np.ndarray) -> np.ndarray:

            """

            Policy: simply call the internal _pi().

            

            This is a random agent, so the action is independent

            from the observation.

            For real agents the action depends on the observation.

            """

            return self._pi()

  7. We are ready to define the discrete agent. As before, the agent has to correctly initialize the action distribution according to the action space that is passed as a parameter:

    class DiscreteAgent(Agent):

        def __init__(self, action_space: gym.spaces.Space, seed=46):

            # setup seed

            np.random.seed(seed)

            # check the action space type

            if not isinstance(action_space, gym.spaces.Discrete):

                raise ValueError("This is a Discrete Agent pass "

                                 "as input a Discrete Space.")

            """

            initialize the distribution according to the action

            space n attribute

            """

            # the distribution is a uniform distribution

            self._pi = lambda: np.random.randint

                       (low=0, high=action_space.n)

        def pi(self, observation: np.ndarray) -> np.ndarray:

            """

            Policy: simply call the internal _pi().

            This is a random agent, so the action is independent

            from the observation.

            For real agents the action depends on the observation.

            """

            return self._pi()

  8. Now it is useful to define a utility function to create the correct agent type based on the action space:

    def make_agent(action_space: gym.spaces.Space, seed=46):

        """

        Returns the correct agent based on the action space type

        """

        if isinstance(action_space, gym.spaces.Discrete):

            return DiscreteAgent(action_space, seed)

        if isinstance(action_space, gym.spaces.Box):

            return ContinuousAgent(action_space, seed)

        raise ValueError("Only Box spaces or Discrete Spaces "

                         "are allowed, check the action space of "

                         "the environment")

  9. The last step is to define the RL loop in which the agent interacts with the environment and collects rewards.

    Define the parameters, and then create the environment and the agent:

    # Environment Name

    env_name = "CartPole-v0"

    # Number of episodes

    episodes = 10

    # Number of Timesteps of each episode

    timesteps = 100

    # Discount factor

    gamma = 1.0

    # seed environment

    seed = 46

    # Needed to show the environment in a notebook

    from gym import wrappers

    env = gym.make(env_name)

    env.seed(seed)

    # the last argument is needed to record all episodes

    # otherwise gym would record only some of them

    # The monitor saves the episodes inside the folder ./gym-results

    env = wrappers.Monitor(env, "./gym-results", force=True,

                           video_callable=lambda episode_id: True)

    agent = make_agent(env.action_space, seed)

  10. We have to track the returns for each episode; to do this, we can use a simple list:

    # list of returns

    episode_returns = []

  11. Start a loop for each episode:

    # loop for the episodes

    for episode_number in range(episodes):

        # here we are inside an episode

  12. Initialize the variables for the calculation of the cumulated discount factor and the current episode return:

        # reset cumulated gamma

        gamma_cum = 1

        # return of the current episode

        episode_return = 0

  13. Reset the environment and get the first observation:

        # the reset function resets the environment and returns

        # the first environment observation

        observation = env.reset()

  14. Loop for the number of timesteps:

        # loop for the given number of timesteps or

        # until the episode is terminated

        for timestep_number in range(timesteps):

  15. Render the environment, select the action, and then apply it:

            # if you want to render the environment

            # uncomment the following line

            # env.render()

            # select the action

            action = agent.pi(observation)

            # apply the selected action by calling env.step

            observation, reward, done, info = env.step(action)

  16. Increment the return, and calculate the cumulated discount factor:

            # increment the return

            episode_return += reward * gamma_cum

            # update the value of cumulated discount factor

            gamma_cum = gamma_cum * gamma

  17. If the episode is terminated, break from the timestep's loop:

            """

            if done the episode is terminated, we have to reset

            the environment

            """

            if done:

                print(f"Episode Number: {episode_number},

    Timesteps: {timestep_number}, Return: {episode_return}")

                # break from the timestep loop

                break

  18. After the timestep loop, we have to record the current return by appending it to the list of returns for each episode:

        episode_returns.append(episode_return)

  19. After the episode loop, close the environment and calculate statistics:

    # close the environment

    env.close()

    # Calculate return statistics

    avg_return = np.mean(episode_returns)

    std_return = np.std(episode_returns)

    var_return = std_return ** 2 # variance is std^2

    print(f"Statistics on Return: Average: {avg_return},

    Variance: {var_return}")

    You will get the following results:

    Episode Number: 0, Timesteps: 27, Return: 28.0

    Episode Number: 1, Timesteps: 9, Return: 10.0

    Episode Number: 2, Timesteps: 13, Return: 14.0

    Episode Number: 3, Timesteps: 16, Return: 17.0

    Episode Number: 4, Timesteps: 31, Return: 32.0

    Episode Number: 5, Timesteps: 10, Return: 11.0

    Episode Number: 6, Timesteps: 14, Return: 15.0

    Episode Number: 7, Timesteps: 11, Return: 12.0

    Episode Number: 8, Timesteps: 10, Return: 11.0

    Episode Number: 9, Timesteps: 30, Return: 31.0

    Statistics on Return: Average: 18.1, Variance: 68.89000000000001

In this activity, we implemented two different types of agents: a discrete agent, working with discrete environments, and a continuous agent, working with continuous environments.

Additionally, you can render the episodes inside a notebook using the following code:

# Render the episodes

import io

import base64

from IPython.display import HTML, display

episodes_to_watch = 1

for episode in range(episodes_to_watch):

    video = io.open(f"./gym-results/openaigym.video

.{env.file_infix}.video{episode:06d}.mp4", "r+b").read()

    encoded = base64.b64encode(video)

    display(

        HTML(

            data="""

        <video width="360" height="auto" alt="test" controls>

        <source src="data:video/mp4;base64,{0}" type="video/mp4" />

        </video>""".format(

                encoded.decode("ascii")

            )

        )

    )

You can see the episode duration is not too long. This is because the actions are taken at random, so the pole falls after some timesteps.

Note

To access the source code for this specific section, please refer to https://packt.live/3fbxR3Y.

This section does not currently have an online interactive example and will need to be run locally.

Discrete and continuous agents are two different possibilities when facing a new RL problem.

We have designed our agents in a very flexible way so that they can be applied to almost all environments without having to change the code.

We also implemented a simple RL loop and measured the performance of our agent on a classical RL problem.

2. Markov Decision Processes and Bellman Equations

Activity 2.01: Solving Gridworld

  1. Import the required libraries:

    from enum import Enum, auto

    import matplotlib.pyplot as plt

    import numpy as np

    from scipy import linalg

    from typing import Tuple

  2. Define the visualization function:

    # helper function

    def vis_matrix(M, cmap=plt.cm.Blues):

        fig, ax = plt.subplots()

        ax.matshow(M, cmap=cmap)

        for i in range(M.shape[0]):

            for j in range(M.shape[1]):

                c = M[j, i]

                ax.text(i, j, "%.2f" % c, va="center", ha="center")

  3. Define the possible actions:

    # Define the actions

    class Action(Enum):

        UP = auto()

        DOWN = auto()

        LEFT = auto()

        RIGHT = auto()

  4. Define the Policy class, representing the random policy:

    # Agent Policy, random

    class Policy:

        def __init__(self):

            self._possible_actions = [action for action in Action]

            self._action_probs = {a: 1 / len(self._possible_actions)

                                  for a in self._possible_actions}

        def __call__(self, state: Tuple[int, int],

                     action: Action) -> float:

            """

            Returns the action probability

            """

            assert action in self._possible_actions

            # state is unused for this policy

            return self._action_probs[action]

  5. Define the Environment class and the step function:

    class Environment:

        def __init__(self):

            self.grid_width = 5

            self.grid_height = 5

            self._good_state1 = (0, 1)

            self._good_state2 = (0, 3)

            self._to_state1 = (4, 2)

            self._to_state2 = (2, 3)

            self._bad_state1 = (1, 1)

            self._bad_state2 = (4, 4)

            self._bad_states = [self._bad_state1, self._bad_state2]

            self._good_states = [self._good_state1, self._good_state2]

            self._to_states = [self._to_state1, self._to_state2]

            self._good_rewards = [10, 5]

        def step(self, state, action):

            i, j = state

            # search among good states

            for good_state, reward,

                to_state in zip(self._good_states,

                                self._good_rewards,

                                self._to_states):

                if (i, j) == good_state:

                    return (to_state, reward)

            reward = 0

            # if the state is a bad state, the reward is -1

            if state in self._bad_states:

                reward = -1

            # calculate next state based on the action

            if action == Action.LEFT:

                j_next = max(j - 1, 0)

                i_next = i

                if j - 1 < 0:

                    reward = -1

            elif action == Action.RIGHT:

                j_next = min(j + 1, self.grid_width - 1)

                i_next = i

                if j + 1 > self.grid_width - 1:

                    reward = -1

            elif action == Action.UP:

                j_next = j

                i_next = max(i - 1, 0)

                if i - 1 < 0:

                    reward = -1

            elif action == Action.DOWN:

                j_next = j

                i_next = min(i + 1, self.grid_height - 1)

                if i + 1 > self.grid_height - 1:

                    reward = -1

            else:

                 raise ValueError("Invalid action")

            return ((i_next, j_next), reward)

  6. Loop for all states and actions and build the transition and reward matrices:

    pi = Policy()

    env = Environment()

    # setup probability matrix and reward matrix

    P = np.zeros((env.grid_width*env.grid_height,

                  env.grid_width*env.grid_height))

    R = np.zeros_like(P)

    possible_actions = [action for action in Action]

    # Loop for all states and fill up P and R

    for i in range(env.grid_height):

        for j in range(env.grid_width):

            state = (i, j)

            # loop for all action and setup P and R

            for action in possible_actions:

                next_state, reward = env.step(state, action)

                (i_next, j_next) = next_state

                P[i*env.grid_width+j,

                  i_next*env.grid_width

                  + j_next] += pi(state, action)

                """

                the reward depends only on the starting state and

                the final state

                """

                R[i*env.grid_width+j,

                  i_next*env.grid_width + j_next] = reward

  7. Check the correctness of the matrix:

    # check the correctness

    assert((np.sum(P, axis=1) == 1).all())

  8. Calculate the expected reward for each state:

    # expected reward for each state

    R_expected = np.sum(P * R, axis=1, keepdims=True)

  9. Use the function to visualize the expected reward:

    # reshape the state values in a matrix

    R_square = R_expected.reshape((env.grid_height,env.grid_width))

    # Visualize

    vis_matrix(R_square, cmap=plt.cm.Reds)

    The function visualizes the matrix using Matplotlib. You should see something similar to this:

    Figure 2.62: The expected reward for each state

    Figure 2.62: The expected reward for each state

    The previous figure is a color representation of the expected reward associated with each state considering the current policy. Notice that the expected reward of bad states is exactly equal to -1. The expected reward of good states is exactly equal to 10 and 5, respectively.

  10. Now set up the matrix form of the Bellman expectation equation:

    # define the discount factor

    gamma = 0.9

    # Now it is possible to solve the Bellman Equation

    A = np.eye(env.grid_width*env.grid_height) - gamma * P

    B = R_expected

  11. Solve the Bellman equation:

    # solve using scipy linalg

    V = linalg.solve(A, B)

  12. Visualize the result:

    # reshape the state values in a matrix

    V_square = V.reshape((env.grid_height,env.grid_width))

    # visualize results

    vis_matrix(V_square, cmap=plt.cm.Reds)

Figure 2.63: State values of Gridworld

Figure 2.63: State values of Gridworld

Note that the value of good states is less than the expected reward from those states. This is because landing states have an expected reward that is negative or because landing states are close to states for which the reward is negative. You can see that the state with the higher value is state a, followed by state b. It is also interesting to note the high value of the state in position (0, 2), which is close to the good states.

Note

To access the source code for this specific section, please refer to https://packt.live/2Al9xOB.

You can also run this example online at https://packt.live/2UChxBy.

In this activity, we experimented with the Gridworld environment, one of the most common toy RL environments. We defined a random policy, and we solved the Bellman expectation equation using scipy.linalg.solve to find the state values of the policy.

It is important to visualize the results, when possible, to get a better understanding and to spot any errors.

3. Deep Learning in Practice with TensorFlow 2

Activity 3.01: Classifying Fashion Clothes Using a TensorFlow Dataset and TensorFlow 2

  1. Import all the required modules:

    from __future__ import absolute_import, division,

    print_function, unicode_literals

    import numpy as np

    import matplotlib.pyplot as plt

    # TensorFlow

    import tensorflow as tf

    import tensorflow_datasets as tfds

  2. Import the Fashion MNIST dataset using TensorFlow datasets and split it into train and test splits. Then, create a list of classes:

    # Construct a tf.data.Dataset

    (train_images, train_labels), (test_images, test_labels) =

    tfds.as_numpy(tfds.load('fashion_mnist',

                            split=['train', 'test'],

                            batch_size=-1, as_supervised=True,))

    train_images = np.squeeze(train_images)

    test_images = np.squeeze(test_images)

    classes = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress',

               'Coat','Sandal', 'Shirt', 'Sneaker', 'Bag',

               'Ankle boot']

  3. Explore the dataset to get familiar with the input features, that is, shapes, labels, and classes:

    print("Training dataset shape =", train_images.shape)

    print("Training labels length =", len(train_labels))

    print("Some training labels =", train_labels[:5])

    print("Test dataset shape =", test_images.shape)

    print("Test labels length =", len(test_labels))

    The output will be as follows:

    Training dataset shape = (60000, 28, 28)

    Training labels length = 60000

    Some training labels = [2 1 8 4 1]

    Test dataset shape = (10000, 28, 28)

    Test labels length = 10000

  4. Visualize some instances of the training set.

    It is also useful to take a look at how the images will appear. The following code snippet shows the first training set instance:

    plt.figure()

    plt.imshow(train_images[0])

    plt.colorbar()

    plt.grid(False)

    plt.show()

    The output image will be as follows:

    Figure 3.30: First training image plot

    Figure 3.30: First training image plot

  5. Perform feature normalization:

    train_images = train_images / 255.0

    test_images = test_images / 255.0

  6. Now, let's take a look at some instances of our training set by plotting 25 of them with their corresponding labels:

    plt.figure(figsize=(10,10))

    for i in range(25):

        plt.subplot(5,5,i+1)

        plt.xticks([])

        plt.yticks([])

        plt.grid(False)

        plt.imshow(train_images[i], cmap=plt.cm.binary)

        plt.xlabel(classes[train_labels[i]])

    plt.show()

    The output image will be as follows:

    Figure 3.31: A set of 25 training samples and their corresponding labels

    Figure 3.31: A set of 25 training samples and their corresponding labels

  7. Build the classification model. First, create a model using a layers' sequence:

    model = tf.keras.Sequential

            ([tf.keras.layers.Flatten(input_shape=(28, 28)),

              tf.keras.layers.Dense(128, activation='relu'),

              tf.keras.layers.Dense(10)])

  8. Then, associate the model with an optimizer, a loss function, and a metrics:

    model.compile(optimizer='adam',

                  loss=tf.keras.losses.SparseCategoricalCrossentropy

                  (from_logits=True), metrics=['accuracy'])

  9. Train the deep neural network:

    model.fit(train_images, train_labels, epochs=10)

    The last output lines will be as follows:

    Epoch 9/1060000/60000 [==============================]

    - 2s 40us/sample - loss: 0.2467 - accuracy: 0.9076

    Epoch 10/1060000/60000 [==============================]

    - 2s 40us/sample - loss: 0.2389 - accuracy: 0.9103

  10. Test the model's accuracy. The accuracy should be in excess of 88%.
  11. Evaluate the model on the test set and print the accuracy score:

    test_loss, test_accuracy = model.evaluate

                               (test_images, test_labels, verbose=2)

    print(' Test accuracy:', test_accuracy)

    The output will be as follows:

    10000/10000 - 0s - loss: 0.3221 - accuracy: 0.8878

    Test accuracy: 0.8878

    Note

    The accuracy may show slightly different values due to random sampling with a variable random seed.

  12. Perform inference and check the predictions against the ground truth.

    As a first step, add a softmax layer to the model so that it outputs probabilities instead of logits. Then, print out the probabilities of the first test instance with the following code:

    probability_model = tf.keras.Sequential

                        ([model,tf.keras.layers.Softmax()])

    predictions = probability_model.predict(test_images)

    print(predictions[0:3])

    The output will be as follows:

    [[3.85897374e-06 2.33953915e-06 2.30801385e-02 4.74092474e-07

      9.55752671e-01 1.56392260e-10 2.11589299e-02 8.57651870e-08

      1.49855202e-06 1.05843508e-10]

  13. Next, compare one model prediction (that is, the class with the highest predicted probability), the one on the first test instance, with its ground truth:

    print("Class ID, predicted | real =",

          np.argmax(predictions[0]), "|", test_labels[0])

    The output will be as follows:

    Class ID, predicted | real = 4 | 4

  14. In order to perform a comparison that's even clearer, create the following two functions. The first one plots the i-th test set instance image with a caption showing the predicted class with the highest probability, its probability in percent, and the ground truth between round brackets. This caption will be blue for correct predictions, and red for incorrect ones:

    def plot_image(i, predictions_array, true_label, img):

        predictions_array, true_label, img = predictions_array,

                                             true_label[i], img[i]

        plt.grid(False)

        plt.xticks([])

        plt.yticks([])

        plt.imshow(img, cmap=plt.cm.binary)

        predicted_label = np.argmax(predictions_array)

        if predicted_label == true_label:

            color = 'blue'

        else:

            color = 'red'

        plt.xlabel("{} {:2.0f}% ({})".format

                   (classes[predicted_label],

                    100*np.max(predictions_array),

                    classes[true_label]),

                    color=color)

  15. The second function creates a second image showing a bar plot of all classes' predicted probabilities. It will color the highest probable one in blue if the prediction is correct, or in red if it is incorrect. In this second case, the bar corresponding to the correct label is colored in blue:

    def plot_value_array(i, predictions_array, true_label):

        predictions_array, true_label = predictions_array,

                                        true_label[i]

        plt.grid(False)

        plt.xticks(range(10))

        plt.yticks([])

        thisplot = plt.bar(range(10), predictions_array,

                   color="#777777")

        plt.ylim([0, 1])

        predicted_label = np.argmax(predictions_array)

        thisplot[predicted_label].set_color('red')

        thisplot[true_label].set_color('blue')

  16. Using these two functions, we can examine every instance of the test set. In the following snippet, the first test instance is being plotted:

    i = 0

    plt.figure(figsize=(6,3))

    plt.subplot(1,2,1)

    plot_image(i, predictions[i], test_labels, test_images)

    plt.subplot(1,2,2)

    plot_value_array(i, predictions[i], test_labels)

    plt.show()

    The output will be as follows:

    Figure 3.32: First test instance, correctly predicted

    Figure 3.32: First test instance, correctly predicted

  17. The very same approach can be used to plot a user-defined number of test instances, arranging the output in subplots, as follows:

    """

    Plot the first X test images, their predicted labels, and the true labels.

    Color correct predictions in blue and incorrect predictions in red.

    """

    num_rows = 5

    num_cols = 3

    num_images = num_rows*num_cols

    plt.figure(figsize=(2*2*num_cols, 2*num_rows))

    for i in range(num_images):

        plt.subplot(num_rows, 2*num_cols, 2*i+1)

        plot_image(i, predictions[i], test_labels, test_images)

        plt.subplot(num_rows, 2*num_cols, 2*i+2)

        plot_value_array(i, predictions[i], test_labels)

    plt.tight_layout()

    plt.show()

    The output will be as follows:

    Figure 3.33: First 25 test instances with their predicted classes and ground truth comparison

Figure 3.33: First 25 test instances with their predicted classes and ground truth comparison

Note

To access the source code for this specific section, please refer to https://packt.live/3dXv3am.

You can also run this example online at https://packt.live/2Ux5JR5.

In this activity, we faced a problem that is quite similar to a real-world one. We had to deal with complex high dimensional inputs – in our case, grayscale images – and we wanted to build a model capable of autonomously grouping them into 10 different categories. Thanks to the power of deep learning and state-of-the-art machine learning frameworks, we were able to build a fully connected neural network that achieves a classification accuracy in excess of 88%.

4. Getting started with OpenAI and TensorFlow for Reinforcement Learning

Activity 4.01: Training a Reinforcement Learning Agent to Play a Classic Video Game

  1. Import all the required modules from OpenAI Baselines and TensorFlow in order to use the PPO algorithm:

    from baselines.ppo2.ppo2 import learn

    from baselines.ppo2 import defaults

    from baselines.common.vec_env import VecEnv, VecFrameStack

    from baselines.common.cmd_util import make_vec_env, make_env

    from baselines.common.models import register

    import tensorflow as tf

  2. Define and register a custom convolutional neural network for the policy network:

    @register("custom_cnn")

    def custom_cnn():

        def network_fn(input_shape, **conv_kwargs):

            """

            Custom CNN

            """

            print('input shape is {}'.format(input_shape))

            x_input = tf.keras.Input

                      (shape=input_shape, dtype=tf.uint8)

            h = x_input

            h = tf.cast(h, tf.float32) / 255.

            h = tf.keras.layers.Conv2D

                (filters=32,kernel_size=8,strides=4,

                 padding='valid', data_format='channels_last',

                 activation='relu')(h)

            h2 = tf.keras.layers.Conv2D

                 (filters=64, kernel_size=4,strides=2,

                  padding='valid', data_format='channels_last',

                  activation='relu')(h)

            h3 = tf.keras.layers.Conv2D

                 (filters=64, kernel_size=3,strides=1,

                  padding='valid', data_format='channels_last',

                  activation='relu')(h2)

            h3 = tf.keras.layers.Flatten()(h3)

            h3 = tf.keras.layers.Dense

                 (units=512, name='fc1', activation='relu')(h3)

            network = tf.keras.Model(inputs=[x_input], outputs=[h3])

            network.summary()

            return network

        return network_fn

  3. Create a function to build the environment in the format required by OpenAI Baselines:

    def build_env(env_id, env_type):

        if env_type in {'atari', 'retro'}:

            env = make_vec_env(env_id, env_type, 1, None,

                               gamestate=None, reward_scale=1.0)

            env = VecFrameStack(env, 4)

        else:

            env = make_vec_env(env_id, env_type, 1, None,

                               reward_scale=1.0,

                               flatten_dict_observations=True)

        return env

  4. Build the PongNoFrameskip-v4 environment, choose the required policy network parameters, and train it:

    env_id = 'PongNoFrameskip-v0'

    env_type = 'atari'

    print("Env type = ", env_type)

    env = build_env(env_id, env_type)

    model = learn(network="custom_cnn", env=env, total_timesteps=1e4)

    While training, the model produces an output similar to the following (only a few lines have been reported here):

    Env type = atari

    Logging to /tmp/openai-2020-05-11-16-19-42-770612

    input shape is (84, 84, 4)

    Model: "model"

    _________________________________________________________________

    Layer (type) Output Shape Param #

    =================================================================

    input_1 (InputLayer) [(None, 84, 84, 4)] 0

    _________________________________________________________________

    tf_op_layer_Cast (TensorFlow [(None, 84, 84, 4)] 0

    _________________________________________________________________

    tf_op_layer_truediv (TensorF [(None, 84, 84, 4)] 0

    _________________________________________________________________

    conv2d (Conv2D) (None, 20, 20, 32) 8224

    _________________________________________________________________

    conv2d_1 (Conv2D) (None, 9, 9, 64) 32832

    _________________________________________________________________

    conv2d_2 (Conv2D) (None, 7, 7, 64) 36928

    _________________________________________________________________

    flatten (Flatten) (None, 3136) 0

    _________________________________________________________________

    fc1 (Dense) (None, 512) 1606144

    =================================================================

    Total params: 1,684,128

    Trainable params: 1,684,128

    Non-trainable params: 0

    _________________________________________________________________

    --------------------------------------------

    | eplenmean | 1e+03 |

    | eprewmean | -20 |

    | fps | 213 |

    | loss/approxkl | 0.00012817292 |

    | loss/clipfrac | 0.0 |

    | loss/policy_entropy | 1.7916294 |

    | loss/policy_loss | -0.00050599687 |

    | loss/value_loss | 0.06880974 |

    | misc/explained_variance | 0.000675 |

    | misc/nupdates | 1 |

    | misc/serial_timesteps | 2048 |

    | misc/time_elapsed | 9.6 |

    | misc/total_timesteps | 2048 |

    --------------------------------------------

  5. Run the trained agent in the environment and print the cumulative reward:

    obs = env.reset()

    if not isinstance(env, VecEnv):

        obs = np.expand_dims(np.array(obs), axis=0)

    episode_rew = 0

    while True:

        actions, _, state, _ = model.step(obs)

        obs, reward, done, info = env.step(actions.numpy())

        if not isinstance(env, VecEnv):

            obs = np.expand_dims(np.array(obs), axis=0)

        env.render()

        print("Reward = ", reward)

        episode_rew += reward

        if done:

            print('Episode Reward = {}'.format(episode_rew))

            break

    env.close()

    The following lines show the last part of the output:

    [...]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [0.]

    Reward = [-1.]

    Episode Reward = [-17.]

    It also renders the environment, showing what happens in the environment in real time:

    Figure 4.14: One frame of the real-time environment, after rendering

    Figure 4.14: One frame of the real-time environment, after rendering

  6. Use the built-in OpenAI Baselines run script to train PPO on the PongNoFrameskip-v0 environment:

    !python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v0

    --num_timesteps=2e7 --save_path=./models/Pong_20M_ppo2

    --log_path=./logs/Pong/

    The last few lines of the output will be similar to the following:

    Stepping environment...

    -------------------------------------------

    | eplenmean | 867 |

    | eprewmean | -20.8 |

    | fps | 500 |

    | loss/approxkl | 4.795634e-05 |

    | loss/clipfrac | 0.0 |

    | loss/policy_entropy | 1.7456135 |

    | loss/policy_loss | -0.0005875508 |

    | loss/value_loss | 0.050125826 |

    | misc/explained_variance | 0.145 |

    | misc/nupdates | 19 |

    | misc/serial_timesteps | 2432 |

    | misc/time_elapsed | 22 |

    | misc/total_timesteps | 9728 |

    -------------------------------------------

  7. Use the built-in OpenAI Baselines run script to run the trained model on the PongNoFrameskip-v0 environment:

    !python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v0

        --num_timesteps=0 --load_path=./models/Pong_20M_ppo2 --play

    The output will be similar to the following:

    episode_rew=-21.0

    episode_rew=-20.0

    episode_rew=-20.0

    episode_rew=-19.0

  8. Use the pretrained weights provided to see the trained agent in action:

    !wget -O pong_20M_ppo2.tar.gz

    https://github.com/PacktWorkshops

    /The-Reinforcement-Learning-Workshop/blob/master

    /Chapter04/pong_20M_ppo2.tar.gz?raw=true

    The output will be as follows:

    Saving to: 'pong_20M_ppo2.tar.gz'

    pong_20M_ppo2.tar.g 100%[===================>] 17,44M 15,

    1MB/s in 1,2s

    2020-05-11 16:19:11 (15,1 MB/s) - 'pong_20M_ppo2.tar.gz' saved [18284569/18284569]

    You can read the .tar file by using the following command:

    !tar xvzf pong_20M_ppo2.tar.gz

    The output will be as follows:

    pong_20M_ppo2/ckpt-1.data-00000-of-00001

    pong_20M_ppo2/ckpt-1.index

    pong_20M_ppo2/

    pong_20M_ppo2/checkpoint

  9. Use the built-in OpenAI Baselines run script to train PPO on PongNoFrameskip-v0:

    !python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v0 --num_timesteps=0 --load_path=./pong_20M_ppo2 –play

    Note

    To access the source code for this specific section, please refer to https://packt.live/30yFmOi.

    This section does not currently have an online interactive example, and will need to be run locally.

In this activity, we learned how to train a state-of-the-art reinforcement learning agent that, by only looking at screen pixels, is able to achieve better-than-human performance when playing a classic Atari video game. We made use of a convolutional neural network to encode environment observations and leveraged the state-of-the-art OpenAI tool to successfully train a PPO algorithm.

5. Dynamic Programming

Activity 5.01: Implementing Policy and Value Iteration on the FrozenLake-v0 Environment

  1. Import the required libraries:

    import numpy as np

    import gym

  2. Initialize the environment and reset the current one. Set is_slippery=False in the initializer. Show the size of the action space and the number of possible states:

    def initialize_environment():

        """initialize the OpenAI Gym environment"""

        env = gym.make("FrozenLake-v0", is_slippery=False)

        print("Initializing environment")

        # reset the current environment

        env.reset()

        # show the size of the action space

        action_size = env.action_space.n

        print(f"Action space: {action_size}")

        # Number of possible states

        state_size = env.observation_space.n

        print(f"State space: {state_size}")

        return env

  3. Perform policy evaluation iterations until the smallest change is less than smallest_change:

    def policy_evaluation(V, current_policy, env,

                          gamma, small_change):

        """

        Perform policy evaluation iterations until the smallest

        change is less than

        'smallest_change'

        Args:

            V: the value function table

            current_policy: current policy

            env: the OpenAI FrozenLake-v0 environment

            gamma: future reward coefficient

            small_change: how small should the change be for the

              iterations to stop

        Returns:

            V: the value function after convergence of the evaluation

        """

        state_size = env.observation_space.n

        while True:

            biggest_change = 0

            # loop through every state present

            for state in range(state_size):

                old_V = V[state]

  4. Take the action according to the current policy:

                action = current_policy[state]

                prob, new_state, reward, done = env.env.P[state]

                                                [action][0]

  5. Use the Bellman optimality equation to update 6:

                V[state] = reward + gamma * V[new_state]

                # if the biggest change is small enough then it means

                # the policy has converged, so stop.

                biggest_change = max(biggest_change, abs(V[state]

                                     - old_V))

            if biggest_change < small_change:

                break

        return V

  6. Perform policy improvement using the Bellman optimality equation:

    def policy_improvement(V, current_policy, env, gamma):

        """

        Perform policy improvement using the Bellman Optimality Equation.

        Args:

            V: the value function table

            current_policy: current policy

            env: the OpenAI FrozenLake-v0 environment

            gamma: future reward coefficient

        Returns:

            current_policy: the updated policy

            policy_changed: True, if the policy was changed, else,

              False

        """

        state_size = env.observation_space.n

        action_size = env.action_space.n

        policy_changed = False

        for state in range(state_size):

            best_val = -np.inf

            best_action = -1

            # loop over all actions and select the best one

            for action in range(action_size):

                prob, new_state, reward, done = env.env.

                                                P[state][action][0]

  7. Calculate the future reward by taking this action. Note that we are using the simplified equation because we don't have non-one transition probabilities:

                future_reward = reward + gamma * V[new_state]

                if future_reward > best_val:

                    best_val = future_reward

                    best_action = action

  8. Using assert statements, we can avoid getting into unwanted situations:

            assert best_action != -1

            if current_policy[state] != best_action:

                policy_changed = True

  9. Update the best action for this current state:

            current_policy[state] = best_action

        # if the policy didn't change, it means we have converged

        return current_policy, policy_changed

  10. Find the most optimal policy for the FrozenLake-v0 environment using policy iteration:

    def policy_iteration(env):

        """

        Find the most optimal policy for the FrozenLake-v0

        environment using Policy

        Iteration

        Args:

            env: FrozenLake-v0 environment

        Returns:

            policy: the most optimal policy

        """

        V = dict()

        """

        initially the value function for all states

        will be random values close to zero

        """

        state_size = env.observation_space.n

        for i in range(state_size):

            V[i] = np.random.random()

        # when the change is smaller than this, stop

        small_change = 1e-20

        # future reward coefficient

        gamma = 0.9

        episodes = 0

        # train for these many episodes

        max_episodes = 50000

        # initially we will start with a random policy

        current_policy = dict()

        for s in range(state_size):

            current_policy[s] = env.action_space.sample()

        while episodes < max_episodes:

            episodes += 1

            # policy evaluation

            V = policy_evaluation(V, current_policy,

                                  env, gamma, small_change)

            # policy improvement

            current_policy, policy_changed = policy_improvement

                                             (V, current_policy,

                                              env, gamma)

            # if the policy didn't change, it means we have converged

            if not policy_changed:

                break

        print(f"Number of episodes trained: {episodes}")

        return current_policy

  11. Perform a test pass on the FrozenLake-v0 environment:

    def play(policy, render=False):

        """

        Perform a test pass on the FrozenLake-v0 environment

        Args:

            policy: the policy to use

            render: if the result should be rendered at every step.

              False by default

        """

        env = initialize_environment()

        rewards = []

  12. Define the maximum number of steps the agent is allowed to take. If it doesn't reach a solution in this time, then we call it an episode and proceed ahead:

        max_steps = 25

        test_episodes = 50

        for episode in range(test_episodes):

            # reset the environment every new episode

            state = env.reset()

            total_rewards = 0

            print("*" * 100)

            print("Episode {}".format(episode))

            for step in range(max_steps):

  13. Take the action that has the highest Q value in the current state:

                action = policy[state]

                new_state, reward, done, info = env.step(action)

                if render:

                    env.render()

                total_rewards += reward

                if done:

                    rewards.append(total_rewards)

                    print("Score", total_rewards)

                    break

                state = new_state

        env.close()

        print("Average Score", sum(rewards) / test_episodes)

  14. Step through the FrozenLake-v0 environment randomly:

    def random_step(n_steps=5):

        """

        Steps through the FrozenLake-v0 environment randomly

        Args:

            n_steps: Number of steps to step through

        """

        # reset the environment

        env = initialize_environment()

        state = env.reset()

        for i in range(n_steps):

            # choose an action at random

            action = env.action_space.sample()

            env.render()

            new_state, reward, done, info = env.step(action)

            print(f"New State: {new_state} "

                  f"reward: {reward} "

                  f"done: {done} "

                  f"info: {info} ")

            print("*" * 20)

  15. Perform value iteration to find the most optimal policy for the FrozenLake-v0 environment:

    def value_iteration(env):

        """

        Performs Value Iteration to find the most optimal policy for the

        FrozenLake-v0 environment

        Args:

            env: FrozenLake-v0 Gym environment

        Returns:

            policy: the most optimum policy

        """

        V = dict()

        gamma = 0.9

        state_size = env.observation_space.n

        action_size = env.action_space.n

        policy = dict()

  16. Initialize the value table randomly and initialize the policy randomly:

        for x in range(state_size):

            V[x] = -1

            policy[x] = env.action_space.sample()

        """

        this loop repeats until the change in value function

        is less than delta

        """

        while True:

            delta = 0

            for state in reversed(range(state_size)):

                old_v_s = V[state]

                best_rewards = -np.inf

                best_action = None

                # for all the actions in current state

                for action in range(action_size):

  17. Check the reward obtained if we were to perform this action:

                   prob, new_state, reward, done = env.env.P[state]

                                                   [action][0]

                   potential_reward = reward + gamma * V[new_state]

                   """

                   select the one that has the best reward

                   and also save the action to the policy

                   """

                if potential_reward > best_rewards:

                    best_rewards = potential_reward

                    best_action = action

                policy[state] = best_action

                V[state] = best_rewards

                # terminate if the change is not high

                delta = max(delta, abs(V[state] - old_v_s))

            if delta < 1e-30:

                break

        print(policy)

        print(V)

        return policy

  18. Run the code and make sure the output matches the expectation by running it in the main block:

    if __name__ == '__main__':

        env = initialize_environment()

        # policy = policy_iteration(env)

        policy = value_iteration(env)

        play(policy, render=True)

    After running this, you should be able to see the following output:

    Figure 5.27: FrozenLake-v0 environment output

Figure 5.27: FrozenLake-v0 environment output

As can be seen from the output, we have successfully achieved the goal of retrieving the frisbee.

Note

To access the source code for this specific section, please refer to https://packt.live/3fxtZuq.

You can also run this example online at https://packt.live/2ChI1Ss.

6. Monte Carlo Methods

Activity 6.01: Exploring the Frozen Lake Problem – the Reward Function

  1. Import the necessary libraries:

    import gym

    import numpy as np

    from collections import defaultdict

  2. Select the environment as FrozenLake. is_slippery is set to False. The environment is reset with the line env.reset() and rendered with the line env.render():

    env = gym.make("FrozenLake-v0", is_slippery=False)

    env.reset()

    env.render()

    You will get the following output:

    Figure 6.15: Frozen Lake state rendered

    Figure 6.15: Frozen Lake state rendered

    This is a text grid with the letters S, F, G, and H used to represent the current environment of FrozenLake. The highlighted cell S is the current state of the agent.

  3. Print the possible values in the observation space and the number of action values using the print(env.observation_space) and print(env.action_space) functions respectively:

    print(env.observation_space)

    print(env.action_space)

    name_action = {0:'Left',1:'Down',2:'Right',3:'Up'}

    You will get the following output:

    Discrete(16)

    Discrete(4)

    16 is the number of cells in the grid, so print(env.observation_space) prints 16. 4 is the number of possible actions, so print(env.action_space) prints 4. Discrete shows the observation space and action space take only discrete values and do not take continuous values.

  4. The next step is to define a function to generate a frozen lake episode. We initialize episodes and the environment:

    def generate_frozenlake_episode():

        episode = []

        state = env.reset()

        step = 0;

  5. Navigate step by step and store episode and return reward:

        while (True):

            action = env.action_space.sample()

            next_state, reward, done, info = env.step(action)

            episode.append((next_state, action, reward))

            if done:

                break

            state = next_state

            step += 1

        return episode, reward

    The action is obtained with env.action_space.sample(). next_state, action, and reward are obtained by calling the env_step(action) function. They are then appended to an episode. The episode is now a list of states, actions, and rewards.

    The key is now to calculate the success rate, which is the likelihood of success for a batch of episodes. The way we do this is by calculating the total number of attempts in a batch of episodes. We calculate how many of them successfully reached the goal. The ratio of the agent successfully reaching the goal to the number of attempts made by the agent is the success ratio.

  6. First, we initialize the total reward:

    def frozen_lake_prediction(batch):

        for batch_number in range(batch+1):

            total_reward = 0

  7. Generate the episode and reward for every iteration and calculate the total reward:

            for i_episode in range(100):

                episode, reward = generate_frozenlake_episode()

                total_reward += reward

  8. The success ratio is calculated by dividing total_reward by 100 and is printed:

            success_percent = total_reward/100

            print("Episode", batch_number*100,

                  "Policy Win Rate=>", float(success_percent*100),

                  "%")

  9. The frozen lake prediction is calculated using the frozen_lake_prediction function:

    frozen_lake_prediction(100)

    You will get the following output:

    Figure 6.16: Output of Frozen Lake without learning

Figure 6.16: Output of Frozen Lake without learning

The output prints the policy win ratio for the various episodes in batches of 100. The ratios are quite low as this is the simulation of an agent following a random policy. We will see in the next exercise how this can be improved by learning to a higher level by using a combination of a greedy policy and an epsilon soft policy.

Note

To access the source code for this specific section, please refer to https://packt.live/2Akh8Nm.

You can also run this example online at https://packt.live/2zruU07.

Activity 6.02 Solving Frozen Lake Using Monte Carlo Control Every Visit Epsilon Soft

  1. Import the necessary libraries:

    import gym

    import numpy as np

  2. Select the environment as FrozenLake. is_slippery is set to False:

    #Setting up the Frozen Lake environment

    env = gym.make("FrozenLake-v0", is_slippery=False)

  3. Initialize the Q value and num_state_action to zeros:

    #Initializing the Q and num_state_action

    Q = np.zeros([env.observation_space.n, env.action_space.n])

    num_state_action = np.zeros([env.observation_space.n,

                                 env.action_space.n])

  4. Set the value of num_episodes to 100000 and create rewardsList. We set epsilon to 0.30:

    num_episodes = 100000

    epsilon = 0.30

    rewardsList = []

    Setting epsilon to 0.30 means we will explore with a likelihood of 0.30 and be greedy with a likelihood of 1-0.30 or 0.70.

  5. Run the loop till num_episodes. We initialize the environment, results_List, and result_sum to zero. Also, reset the environment:

    for x in range(num_episodes):

        state = env.reset()

        done = False

        results_list = []

        result_sum = 0.0

  6. Start a while loop, and check whether you need to pick a random action with a probability epsilon or greedy policy with a probability of 1-epsilon:

        while not done:

            

            #random action less than epsilon

            if np.random.rand() < epsilon:

                #we go with the random action

                action = env.action_space.sample()

            else:

                """

                1 - epsilon probability, we go with the greedy algorithm

                """

                action = np.argmax(Q[state, :])

  7. Now step through the action and get new_state and reward:

            #action is performed and assigned to new_state, reward

            new_state, reward, done, info = env.step(action)

  8. The result list is appended with the state and action pair. result_sum is incremented by the value of the result:

            results_list.append((state, action))

            result_sum += reward

  9. new_state is assigned to state and result_sum is appended to rewardsList:

            #new state is assigned as state

            state = new_state

        #appending the results sum to the rewards list

        rewardsList.append(result_sum)

  10. Calculate Q[s,a] using the incremental method, as Q[s,a] + (result_sum – Q[s,a]) / N(s,a):

        for (state, action) in results_list:

            num_state_action[state, action] += 1.0

            sa_factor = 1.0 / num_state_action[state, action]

            Q[state, action] += sa_factor *

                                (result_sum - Q[state, action])

  11. Print the value of the success rates in batches of 1000:

        if x % 1000 == 0 and x is not 0:

            print('Frozen Lake Success rate=>',

                  str(sum(rewardsList) * 100 / x ), '%')

  12. Print the final success rate:

    print("Frozen Lake Success rate=>",

          str(sum(rewardsList)/num_episodes * 100), "%")

    You will get the following output initially:

    Figure 6.17: Initial output of the Frozen Lake success rate

Figure 6.17: Initial output of the Frozen Lake success rate

You will get the following output finally:

Figure 6.18: Final output of the Frozen Lake success rate

Figure 6.18: Final output of the Frozen Lake success rate

The success rate starts with a very low value close to 0% but with reinforcement learning, it learns, and the success rate increases incrementally going up to 60%.

Note

To access the source code for this specific section, please refer to https://packt.live/2Ync9Dq.

You can also run this example online at https://packt.live/3cUJLxQ.

7. Temporal Difference Learning

Activity 7.01: Using TD(0) Q-Learning to Solve FrozenLake-v0 Stochastic Transitions

  1. Import the required modules:

    import numpy as np

    import matplotlib.pyplot as plt

    %matplotlib inline

    import gym

  2. Instantiate the gym environment called FrozenLake-v0 using the is_slippery flag set to True in order to enable stochasticity:

    env = gym.make('FrozenLake-v0', is_slippery=True)

  3. Take a look at the action and observation spaces:

    print("Action space = ", env.action_space)

    print("Observation space = ", env.observation_space)

    This will print out the following:

    Action space = Discrete(4)

    Observation space = Discrete(16)

  4. Create two dictionaries to easily translate the actions numbers into moves:

    actionsDict = {}

    actionsDict[0] = " L "

    actionsDict[1] = " D "

    actionsDict[2] = " R "

    actionsDict[3] = " U "

    actionsDictInv = {}

    actionsDictInv["L"] = 0

    actionsDictInv["D"] = 1

    actionsDictInv["R"] = 2

    actionsDictInv["U"] = 3

  5. Reset the environment and render it to take a look at the grid problem:

    env.reset()

    env.render()

    Its initial state is as follows:

    Figure 7.39: Environment's initial state

    Figure 7.39: Environment's initial state

  6. Visualize the optimal policy for this environment:

    optimalPolicy = [" * "," U ","L/R/D"," U ",

                     " L "," - "," L/R "," - ",

                     " U "," D "," L "," - ",

                     " - "," R ","R/D/U"," ! ",]

    print("Optimal policy:")

    idxs = [0,4,8,12]

    for idx in idxs:

        print(optimalPolicy[idx+0], optimalPolicy[idx+1],

              optimalPolicy[idx+2], optimalPolicy[idx+3])

    This prints out the following output:

    Optimal policy:

      L/R/D U U U

        L - L/R -

        U D L -

        - R D !

  7. Define the functions that will take ε-greedy actions:

    def action_epsilon_greedy(q, s, epsilon=0.05):

        if np.random.rand() > epsilon:

            return np.argmax(q[s])

        return np.random.randint(4)

  8. Define a function that will take greedy actions:

    def greedy_policy(q, s):

        return np.argmax(q[s])

  9. Define a function that will calculate the agent's average performance:

    def average_performance(policy_fct, q):

        acc_returns = 0.

        n = 500

        for i in range(n):

            done = False

            s = env.reset()

            while not done:

                a = policy_fct(q, s)

                s, reward, done, info = env.step(a)

                acc_returns += reward

        return acc_returns/n

  10. Initialize the Q-table so that all the values are equal to 1, except for the values at the terminal states:

    q = np.ones((16, 4))

    # Set q(terminal,*) equal to 0

    q[5,:] = 0.0

    q[7,:] = 0.0

    q[11,:] = 0.0

    q[12,:] = 0.0

    q[15,:] = 0.0

  11. Set the number of total episodes, the number of steps representing the interval by which we're evaluating the agent's average performance, the learning rate, the discounting factor, the ε value for the exploration policy, and an array to collect all the agent's performance evaluations during training:

    nb_episodes = 80000

    STEPS = 2000

    alpha = 0.01

    gamma = 0.99

    epsilon_expl = 0.2

    q_performance = np.ndarray(nb_episodes//STEPS)

  12. Train the Q-learning algorithm. Loop among all episodes:

    for i in range(nb_episodes):

  13. Reset the environment and start the in-episode loop:

        done = False

        s = env.reset()

        while not done:

  14. Select the exploration action with an ε-greedy policy:

            # behavior policy

            a = action_epsilon_greedy(q, s, epsilon=epsilon_expl)

  15. Step the environment with the selected exploration action and retrieval of the new state, reward, and done conditions:

            new_s, reward, done, info = env.step(a)

  16. Select a new action with the greedy policy:

            a_max = np.argmax(q[new_s]) # estimation policy

  17. Update the Q-table with the Q-learning TD(0) rule:

            q[s, a] = q[s, a] + alpha *

                      (reward + gamma * q[new_s, a_max] - q[s, a])

  18. Update the state with a new value:

            s = new_s

  19. Evaluate the agent's average performance for every step:

        if i%STEPS == 0:

            q_performance[i//STEPS] = average_performance

                                      (greedy_policy, q)

  20. Plot the Q-learning agent's mean reward history during training:

    plt.plot(STEPS * np.arange(nb_episodes//STEPS), q_performance)

    plt.xlabel("Epochs")

    plt.ylabel("Average reward of an epoch")

    plt.title("Learning progress for Q-Learning")

    This generates the following output, showing the learning progress for the Q-learning algorithm:

    Text(0.5, 1.0, 'Learning progress for Q-Learning')

    The plot for this can be visualized as follows:

    Figure 7.40: Average reward of an epoch trend over training epochs

    Figure 7.40: Average reward of an epoch trend over training epochs

    In this case, as in the case of Q-learning applied to the deterministic environment, the plot shows how quickly Q-learning performance grows over epochs as the agent collects more and more experience. It also demonstrates that the algorithm is not capable of reaching 100% success after learning due to the limitations of stochasticity. When compared with using the SARSA method on a stochastic environment, as seen in Figure 7.15, the algorithm's performance grows faster and more steadily.

  21. Evaluate the greedy policy's performance for the trained agent (Q-table):

    greedyPolicyAvgPerf = average_performance(greedy_policy, q=q)

    print("Greedy policy Q-learning performance =",

          greedyPolicyAvgPerf)

    This prints out the following:

    Greedy policy Q-learning performance = 0.708

  22. Display the Q-table values:

    q = np.round(q,3)

    print("(A,S) Value function =", q.shape)

    print("First row")

    print(q[0:4,:])

    print("Second row")

    print(q[4:8,:])

    print("Third row")

    print(q[8:12,:])

    print("Fourth row")

    print(q[12:16,:])

    This generates the following output:

    (A,S) Value function = (16, 4)

    First row

    [[0.543 0.521 0.516 0.515]

     [0.319 0.355 0.322 0.493]

     [0.432 0.431 0.425 0.461]

     [0.32 0.298 0.296 0.447]]

    Second row

    [[0.559 0.392 0.396 0.393]

     [0. 0. 0. 0. ]

     [0.296 0.224 0.327 0.145]

     [0. 0. 0. 0. ]]

    Third row

    [[0.337 0.366 0.42 0.595]

     [0.484 0.639 0.433 0.415]

     [0.599 0.511 0.342 0.336]

     [0. 0. 0. 0. ]]

    Fourth row

    [[0. 0. 0. 0. ]

     [0.46 0.53 0.749 0.525]

     [0.711 0.865 0.802 0.799]

     [0. 0. 0. 0. ]]

  23. Print out the greedy policy that was found and compare it with the optimal policy:

    policyFound = [actionsDict[np.argmax(q[0,:])],

                   actionsDict[np.argmax(q[1,:])],

                   actionsDict[np.argmax(q[2,:])],

                   actionsDict[np.argmax(q[3,:])],

                   actionsDict[np.argmax(q[4,:])],

                   " - ",

                   actionsDict[np.argmax(q[6,:])],

                   " - ",

                   actionsDict[np.argmax(q[8,:])],

                   actionsDict[np.argmax(q[9,:])],

                   actionsDict[np.argmax(q[10,:])],

                   " - ",

                   " - ",

                   actionsDict[np.argmax(q[13,:])],

                   actionsDict[np.argmax(q[14,:])],

                   " ! "]

    print("Greedy policy found:")

    idxs = [0,4,8,12]

    for idx in idxs:

        print(policyFound[idx+0], policyFound[idx+1],

              policyFound[idx+2], policyFound[idx+3])

    print(" ")

    print("Optimal policy:")

    idxs = [0,4,8,12]

    for idx in idxs:

        print(optimalPolicy[idx+0], optimalPolicy[idx+1],

              optimalPolicy[idx+2], optimalPolicy[idx+3])

    This generates the following output:

    Greedy policy found:

        L U U U

        L - R -

        U D L -

        - R D !

    Optimal policy:

      L/R/D U U U

        L - L/R -

        U D L -

        - R D !

This output shows that, as for all the exercises in this chapter, the off-policy, one-step Q-learning algorithm is able to find the optimal policy by simply exploring the environment, even in the context of stochastic environment transitions. As anticipated, for this setting, it is not possible to achieve the maximum reward 100% of the time.

As we can see, for every state of the grid world that the greedy policy obtained with the Q-table that was calculated by our algorithm, it prescribes an action that is in accordance with the optimal policy that was defined by analyzing the environment problem. As we already saw, there are two states in which many different actions are equally optimal, and the agent correctly implements one of them.

Note

To access the source code for this specific section, please refer to https://packt.live/3elMxxu.

You can also run this example online at https://packt.live/37HSDWx.

8. The Multi-Armed Bandit Problem

Activity 8.01: Queueing Bandits

  1. Import the necessary libraries and tools, as follows:

    import numpy as np

    from utils import QueueBandit

  2. Declare the bandit object, as follows:

    N_CLASSES = 3

    queue_bandit = QueueBandit(filename='data.csv')

    The N_CLASSES variable will be used by our subsequent code.

  3. Implement the Greedy algorithm, as follows:

    class GreedyQueue:

        def __init__(self, n_classes=3):

            self.n_classes = n_classes

            self.time_history = [[] for _ in range(n_classes)]

        

        def decide(self, queue_lengths):

            for class_ in range(self.n_classes):

                if queue_lengths[class_] > 0 and

                   len(self.time_history[class_]) == 0:

                    return class_

            mean_times = [np.mean(self.time_history[class_])

                          if queue_lengths[class_] > 0 else np.inf

                          for class_ in range(self.n_classes)]

            return int(np.random.choice

                      (np.argwhere

                      (mean_times == np.min(mean_times)).flatten()))

        def update(self, class_, time):

            self.time_history[class_].append(time)

    Notice that we are taking care to avoid choosing a class that does not have any customers left in it by checking if queue_lengths[class_] is greater than 0 or not. The remaining code is analogous to what we had in our earlier discussion of Greedy.

    Subsequently, apply the algorithm to the bandit object, as follows:

    cumulative_times = queue_bandit.repeat

                       (GreedyQueue, [N_CLASSES],

                        visualize_cumulative_times=True)

    np.max(cumulative_times), np.mean(cumulative_times)

    This will generate the following graph:

    Figure 8.24: Distribution of cumulative waiting time from Greedy

    Figure 8.24: Distribution of cumulative waiting time from Greedy

    Additionally, the following will be printed out as the max and mean cumulative waiting times:

    (1218887.7924350922, 45155.236786598274)

    While these values might appear large compared to our earlier discussions, this is because the reward/cost distributions we are working with here take on higher values. We will use these values from Greedy as a frame of reference to analyze the performance of later algorithms.

  4. Implement the Explore-then-commit algorithm using the following code:

    class ETCQueue:

        def __init__(self, n_classes=3, T=3):

            self.n_classes = n_classes

            self.T = T

            self.time_history = [[] for _ in range(n_classes)]

        def decide(self, queue_lengths):

            for class_ in range(self.n_classes):

                if queue_lengths[class_] > 0 and

                len(self.time_history[class_]) < self.T:

                    return class_

            mean_times = [np.mean(self.time_history[class_])

                          if queue_lengths[class_] > 0 else np.inf

                          for class_ in range(self.n_classes)]

            return int(np.random.choice

                      (np.argwhere(mean_times == np.min(mean_times))

                      .flatten()))

        def update(self, class_, time):

            self.time_history[class_].append(time)

  5. Apply the algorithm to the bandit object, as follows:

    cumulative_times = queue_bandit.repeat

                       (ETCQueue, [N_CLASSES, 2],

                        visualize_cumulative_times=True)

    np.max(cumulative_times), np.mean(cumulative_times)

    This will produce the following graph:

    Figure 8.25: Distribution of cumulative waiting time from Explore-then-commit

    Figure 8.25: Distribution of cumulative waiting time from Explore-then-commit

    This will also produce the max and average cumulative waiting times: (1238591.3208636027, 45909.77140562623). Compared to Greedy (1218887.7924350922, 45155.236786598274), Explore-then-commit did relatively worse on this queueing bandit problem.

  6. Implement Thompson Sampling, as follows:

    class ExpThSQueue:

        def __init__(self, n_classes=3):

            self.n_classes = n_classes

            self.time_history = [[] for _ in range(n_classes)]

            self.temp_beliefs = [(0, 0) for _ in range(n_classes)]

            

        def decide(self, queue_lengths):

            for class_ in range(self.n_classes):

                if queue_lengths[class_] > 0 and

                len(self.time_history[class_]) == 0:

                    return class_

            

            rate_draws = [np.random.gamma

                          (self.temp_beliefs[class_][0],1

                           / self.temp_beliefs[class_][1])

                         if queue_lengths[class_] > 0 else -np.inf

                         for class_ in range(self.n_classes)]

            return int(np.random.choice

                      (np.argwhere(rate_draws == np.max(rate_draws))

                      .flatten()))

        def update(self, class_, time):

            self.time_history[class_].append(time)

            

            # Update parameters according to Bayes rule

            alpha, beta = self.temp_beliefs[class_]

            alpha += 1

            beta += time

            self.temp_beliefs[class_] = alpha, beta

    Recall that in our initial discussion of Thompson Sampling, we draw random samples to estimate the reward expectation for each arm. Here, we drew random samples from the corresponding Gamma distributions (which are being used to model service rates) to estimate the rates (or the inverse job lengths) and choose the largest drawn sample.

  7. This can be applied to solve the bandit problem using the following code:

    cumulative_times = queue_bandit.repeat

                       (ExpThSQueue, [N_CLASSES],

                        visualize_cumulative_times=True)

    np.max(cumulative_times), np.mean(cumulative_times)

    The following plot will be produced:

    Figure 8.26: Distribution of cumulative waiting time from Thompson Sampling

    Figure 8.26: Distribution of cumulative waiting time from Thompson Sampling

    From the max and mean waiting time (1218887.7924350922, 45129.343871806814), we can see that Thompson Sampling is able to improve on Greedy.

  8. The modified version of Thompson Sampling can be implemented as follows:

    class ExploitingThSQueue:

        def __init__(self, n_classes=3, r=1):

            self.n_classes = n_classes

            self.time_history = [[] for _ in range(n_classes)]

            self.temp_beliefs = [(0, 0) for _ in range(n_classes)]

            self.t = 0

            self.r = r

            

        def decide(self, queue_lengths):

            for class_ in range(self.n_classes):

                if queue_lengths[class_] > 0 and

                len(self.time_history[class_]) == 0:

                    return class_

            if self.t > self.r * np.sum(queue_lengths):

                mean_times = [np.mean(self.time_history[class_])

                              if queue_lengths[class_] > 0

                              else np.inf

                              for class_ in range(self.n_classes)]

                return int(np.random.choice

                          (np.argwhere

                          (mean_times == np.min(mean_times))

                          .flatten()))

            rate_draws = [np.random.gamma

                          (self.temp_beliefs[class_][0],

                           1 / self.temp_beliefs[class_][1])

                          if queue_lengths[class_] > 0 else -np.inf

                          for class_ in range(self.n_classes)]

            return int(np.random.choice

                      (np.argwhere

                      (rate_draws == np.max(rate_draws)).flatten()))

    The initialization method of this class implementation has an additional attribute, r, which we will use to implement the exploitation logic.

    In the decide() method, right before we draw samples to estimate the rates, we check to see if the current time (t) is greater than the current queue length (the sum of queue_lengths). This Boolean indicates whether we have processed more than half of the customers or not. If so, we simply implement the logic of the Greedy algorithm and return the arm with the optimal average rate. Otherwise, we have our actual Thompson Sampling logic.

    The update() method should be the same as the actual Thompson Sampling algorithm from the previous step, as follows:

        def update(self, class_, time):

            self.time_history[class_].append(time)

            self.t += 1

            

            # Update parameters according to Bayes rule

            alpha, beta = self.temp_beliefs[class_]

            alpha += 1

            beta += time

            self.temp_beliefs[class_] = alpha, beta

  9. Finally, apply the algorithm to the bandit problem:

    cumulative_times = queue_bandit.repeat

                       (ExploitingThSQueue, [N_CLASSES, 1],

                        visualize_cumulative_times=True)

    np.max(cumulative_times), np.mean(cumulative_times)

    We will obtain the following graph:

    Figure 8.27: Distribution of cumulative waiting time from modified Thompson Sampling

Figure 8.27: Distribution of cumulative waiting time from modified Thompson Sampling

Together with the max and mean waiting time (1218887.7924350922, 45093.244027644556), we can see that this modified version of Thompson Sampling is more effective than the original at minimizing the cumulative waiting time across the experiments.

This speaks to the potential benefit of designing algorithms that are tailored to the contextual bandit problem that they are trying to solve.

Note

To access the source code for this specific section, please refer to https://packt.live/2Yuw2IQ.

You can also run this example online at https://packt.live/3hnK5Z5.

Throughout this activity, we have learned how to apply the approaches discussed in this chapter to a queueing bandit problem, that is, exploring an example of a potential contextual bandit process. Most notably, we have considered a variant of Thompson Sampling that has been modified to fit the context of the queueing problem, thus successfully lowering our cumulative regret compared to other algorithms. This activity also marks the end of this chapter.

9. What Is Deep Q-Learning?

Activity 9.01: Implementing a Double Deep Q Network in PyTorch for the CartPole Environment

  1. Open a new Jupyter notebook and import all of the required libraries:

    import gym

    import matplotlib.pyplot as plt

    import torch

    import torch.nn as nn

    from torch import optim

    import numpy as np

    import random

    import math

  2. Write code that will create a device based on the availability of a GPU environment:

    use_cuda = torch.cuda.is_available()

    device = torch.device("cuda:0" if use_cuda else "cpu")

    print(device)

  3. Create a gym environment using the 'CartPole-v0' environment:

    env = gym.make('CartPole-v0')

  4. Set the seed for torch and the environment for reproducibility:

    seed = 100

    env.seed(seed)

    torch.manual_seed(seed)

    random.seed(seed)

  5. Fetch the number of states and actions from the environment:

    number_of_states = env.observation_space.shape[0]

    number_of_actions = env.action_space.n

    print('Total number of States : {}'.format(number_of_states))

    print('Total number of Actions : {}'.format(number_of_actions))

    The output is as follows:

    Total number of States : 4

    Total number of Actions : 2

  6. Set all of the hyperparameter values required for the DDQN process:

    NUMBER_OF_EPISODES = 500

    MAX_STEPS = 1000

    LEARNING_RATE = 0.01

    DISCOUNT_FACTOR = 0.99

    HIDDEN_LAYER_SIZE = 64

    EGREEDY = 0.9

    EGREEDY_FINAL = 0.02

    EGREEDY_DECAY = 500

    REPLAY_BUFFER_SIZE = 6000

    BATCH_SIZE = 32

    UPDATE_TARGET_FREQUENCY = 200

  7. Implement the calculate_epsilon function, as described in the previous exercises:

    def calculate_epsilon(steps_done):

        """

        Decays epsilon with increasing steps

        Parameter:

        steps_done (int) : number of steps completed

        Returns:

        int - decayed epsilon

        """

        epsilon = EGREEDY_FINAL + (EGREEDY - EGREEDY_FINAL)

                  * math.exp(-1. * steps_done / EGREEDY_DECAY )

        return epsilon

  8. Create a class, called DQN, that accepts the number of states as inputs and outputs Q values for the number of actions present in the environment, with the network that has a hidden layer of size 64:

    class DQN(nn.Module):

        def __init__(self , hidden_layer_size):

            super().__init__()

            self.hidden_layer_size = hidden_layer_size

            self.fc1 = nn.Linear(number_of_states,

                                 self.hidden_layer_size)

            self.fc2 = nn.Linear(self.hidden_layer_size,

                                 number_of_actions)

        def forward(self, x):

            output = torch.tanh(self.fc1(x))

            output = self.fc2(output)

            return output

  9. Implement the ExperienceReplay class, as described in the previous exercises:

    class ExperienceReplay(object):

        def __init__(self , capacity):

            self.capacity = capacity

            self.buffer = []

            self.pointer = 0

        def push(self , state, action, new_state, reward, done):

            experience = (state, action, new_state, reward, done)

            if self.pointer >= len(self.buffer):

                self.buffer.append(experience)

            else:

                self.buffer[self.pointer] = experience

            self.pointer = (self.pointer + 1) % self.capacity

        def sample(self , batch_size):

            return zip(*random.sample(self.buffer , batch_size))

        def __len__(self):

            return len(self.buffer)

  10. Instantiate the ExperienceReplay class by passing the buffer size as input:

    memory = ExperienceReplay(REPLAY_BUFFER_SIZE)

  11. Implement the DQN agent class with the changes discussed for the optimize function (from the code example given in the Double Deep Q Network (DDQN) section):

    class DQN_Agent(object):

        def __init__(self):

            self.dqn = DQN(HIDDEN_LAYER_SIZE).to(device)

            self.target_dqn = DQN(HIDDEN_LAYER_SIZE).to(device)

            self.criterion = torch.nn.MSELoss()

            self.optimizer = optim.Adam

                             (params=self.dqn.parameters(),

                              lr=LEARNING_RATE)

            self.target_dqn_update_counter = 0

        def select_action(self,state,EGREEDY):

            random_for_egreedy = torch.rand(1)[0]

            if random_for_egreedy > EGREEDY:

                with torch.no_grad():

                    state = torch.Tensor(state).to(device)

                    q_values = self.dqn(state)

                    action = torch.max(q_values,0)[1]

                    action = action.item()

            else:

                action = env.action_space.sample()

            return action

        def optimize(self):

            if (BATCH_SIZE > len(memory)):

                return

            state, action, new_state, reward, done = memory.sample

                                                     (BATCH_SIZE)

            state = torch.Tensor(state).to(device)

            new_state = torch.Tensor(new_state).to(device)

            reward = torch.Tensor(reward).to(device)

            action = torch.LongTensor(action).to(device)

            done = torch.Tensor(done).to(device)

            """

            select action : get the index associated with max q

            value from prediction network

            """

            new_state_indxs = self.dqn(new_state).detach()

            # to get the max new state indexes

            max_new_state_indxs = torch.max(new_state_indxs, 1)[1]

            """

            Using the best action from the prediction nn get

            the max new state value in target dqn

            """

            new_state_values = self.target_dqn(new_state).detach()

            max_new_state_values = new_state_values.gather

                                   (1, max_new_state_indxs

                                    .unsqueeze(1))

                                   .squeeze(1)

            #when done = 1 then target = reward

            target_value = reward + (1 - done) * DISCOUNT_FACTOR

                           * max_new_state_values

            predicted_value = self.dqn(state).gather

                              (1, action.unsqueeze(1))

                              .squeeze(1)

            loss = self.criterion(predicted_value, target_value)

            self.optimizer.zero_grad()

            loss.backward()

            self.optimizer.step()

            if self.target_dqn_update_counter

            % UPDATE_TARGET_FREQUENCY == 0:

                self.target_dqn.load_state_dict(self.dqn.state_dict())

            self.target_dqn_update_counter += 1

  12. Write the training process loop with the help of the following steps. First, instantiate the DQN agent using the class created earlier. Create a steps_total empty list to collect the total number of steps for each episode. Initialize steps_counter with zero and use it to calculate the decayed epsilon value for each step:

    dqn_agent = DQN_Agent()

    steps_total = []

    steps_counter = 0

    Use two loops during the training process; the first one is to play the game for a certain number of steps. The second loop ensures that each episode goes on for a fixed number of steps. Inside the second for loop, the first step is to calculate the epsilon value for the current step.

    Using the present state and epsilon value, you can select the action to perform. The next step is to take the action. Once you take the action, the environment returns the new_state, reward, and done flags.

    Using the optimize function, perform one step of gradient descent to optimize the DQN. Now make the new state the present state for the next iteration. Finally, check whether the episode is over. If the episode is over, then you can collect and record the reward for the current episode:

    for episode in range(NUMBER_OF_EPISODES):

        state = env.reset()

        done = False

        step = 0

        for i in range(MAX_STEPS):

            step += 1

            steps_counter += 1

            EGREEDY = calculate_epsilon(steps_counter)

            action = dqn_agent.select_action(state, EGREEDY)

            new_state, reward, done, info = env.step(action)

            memory.push(state, action, new_state, reward, done)

            dqn_agent.optimize()

            state = new_state

            if done:

                steps_total.append(step)

                break

  13. Now observe the reward. As the reward is scalar feedback and gives an indication of how well the agent is performing, you should look at the average reward and the average reward for the last 100 episodes. Also, perform the graphical representation of rewards. Check how the agent is performing while playing more episodes and what the reward average is for the last 100 episodes:

    print("Average reward: %.2f"

          % (sum(steps_total)/NUMBER_OF_EPISODES))

    print("Average reward (last 100 episodes): %.2f"

          % (sum(steps_total[-100:])/100))

    The output will be as follows:

    Average reward: 174.09

    Average reward (last 100 episodes): 186.06

  14. Plot the rewards collected in the y axis and the number of episodes in the x axis to visualize how the rewards have been collected with the increasing number of episodes:

    Plt.figure(figsize=(12,5))

    plt.title("Rewards Collected")

    plt.xlabel('Steps')

    plt.ylabel('Reward')

    plt.bar(np.arange(len(steps_total)), steps_total,

            alpha=0.5, color='green', width=6)

    plt.show()

    The output will be as follows:

    Figure 9.37: Plot for the rewards collected by the agent

Figure 9.37: Plot for the rewards collected by the agent

Note

To access the source code for this specific section, please refer to https://packt.live/3hnLDTd.

You can also run this example online at https://packt.live/37ol5MK.

The following is a comparison between different DQN techniques and DDQN:

Vanilla DQN Outputs:

Average reward: 158.83

Average reward (last 100 episodes): 176.28

DQN with Experience Replay and Target Network Outputs:

Average reward: 154.41

Average reward (last 100 episodes): 183.28

DDQN Outputs:

Average reward: 174.09

Average reward (last 100 episodes): 186.06

As you can see from the preceding figure, along with the comparison of the results shown earlier, DDQN has the highest average reward, compared to other DQN implementations, and the average reward for the last 100 episodes is also higher. We can say that DDQN improves performance significantly in comparison to the other two DQN techniques. After completing this whole activity, we have learned how to combine a DDQN network with experience replay to overcome the issues of a vanilla DQN and achieve more stable rewards.

10. Playing an Atari Game with Deep Recurrent Q-Networks

Activity 10.01: Training a DQN with CNNs to Play Breakout

Solution

  1. Open a new Jupyter Notebook and import the relevant packages: gym, random, tensorflow, numpy, and collections:

    import gym

    import random

    import numpy as np

    from collections import deque

    import tensorflow as tf

    from tensorflow.keras.models import Sequential

    from tensorflow.keras.layers import Dense, Conv2D,

    MaxPooling2D, Flatten

    from tensorflow.keras.optimizers import RMSprop

    import datetime

  2. Set the seed for NumPy and TensorFlow to 168:

    np.random.seed(168)

    tf.random.set_seed(168)

  3. Create the DQN class with the following methods: the build_model() method to instantiate a CNN, the get_action() method to apply the epsilon-greedy algorithm to choose the action to be played, the add_experience() method to store in memory the experience acquired by playing the game, the replay() method, which will perform experience replay by sampling experiences from the memory and train the DQN model with a callback to save the model every two episodes, and the update_epsilon() method to gradually decrease the epsilon value for epsilon-greedy:

    Activity10_01.ipynb

    class DQN():

        def __init__(self, env, batch_size=64, max_experiences=5000):

            self.env = env

            self.input_size = self.env.observation_space.shape[0]

            self.action_size = self.env.action_space.n

            self.max_experiences = max_experiences

            self.memory = deque(maxlen=self.max_experiences)

            self.batch_size = batch_size

            self.gamma = 1.0

            self.epsilon = 1.0

            self.epsilon_min = 0.01

            self.epsilon_decay = 0.995

            self.model = self.build_model()

            self.target_model = self.build_model()

                 

        def build_model(self):

            model = Sequential()

            model.add(Conv2D(32, 8, (4,4), activation='relu',

                             padding='valid',

                             input_shape=(IMG_SIZE, IMG_SIZE, 1)))

            model.add(Conv2D(64, 4, (2,2), activation='relu',

                             padding='valid'))

            model.add(Conv2D(64, 3, (1,1), activation='relu',

                             padding='valid'))

            model.add(Flatten())

            model.add(Dense(256, activation='relu'))

            model.add(Dense(self.action_size))

            model.compile(loss='mse',

                          optimizer=RMSprop(lr=0.00025,

                          epsilon=self.epsilon_min),

                          metrics=['accuracy'])

            return model

  4. Create the initialize_env() function, which will initialize the Breakout environment:

    def initialize_env(env):

        initial_state = env.reset()

        initial_done_flag = False

        initial_rewards = 0

        return initial_state, initial_done_flag, initial_rewards

  5. Create the preprocess_state() function to preprocess the input images:

    def preprocess_state(image, img_size):

        img_temp = image[31:195]

        img_temp = tf.image.rgb_to_grayscale(img_temp)

        img_temp = tf.image.resize

                   (img_temp, [img_size, img_size],

                    method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

        img_temp = tf.cast(img_temp, tf.float32)

        return img_temp

  6. Create the play_game() function, which will play an entire game of Breakout:

    def play_game(agent, state, done, rewards):

        while not done:

            action = agent.get_action(state)

            next_state, reward, done, _ = env.step(action)

            next_state = preprocess_state(next_state, IMG_SIZE)

            agent.add_experience(state, action, reward,

                                 next_state, done)

            state = next_state

            rewards += reward

        return rewards

  7. Create the train_agent() function, which will iterate through a number of episodes where the agent will play a game and perform experience replay:

    def train_agent(env, episodes, agent):

      from collections import deque

      import numpy as np

      scores = deque(maxlen=100)

      for episode in range(episodes):

        state, done, rewards = initialize_env(env)

        state = preprocess_state(state, IMG_SIZE)

        rewards = play_game(agent, state, done, rewards)

        scores.append(rewards)

        mean_score = np.mean(scores)

        if episode % 50 == 0:

            print(f'[Episode {episode}]

    - Average Score: {mean_score}')

            agent.target_model.set_weights(agent.model.get_weights())

            agent.target_model.save_weights

            (f'dqn/dqn_model_weights_{episode}')

        agent.replay(episode)

      print(f"Average Score: {np.mean(scores)}")

  8. Instantiate a Breakout environment called env with the gym.make() function:

    env = gym.make('BreakoutDeterministic-v4')

  9. Create two variables, IMG_SIZE and SEQUENCE, that will take the values 84 and 4, respectively:

    IMG_SIZE = 84

    SEQUENCE = 4

  10. Instantiate a DQN object called agent:

    agent = DQN(env)

  11. Create a variable called episodes that will take the value 50:

    episodes = 50

  12. Call the train_agent function by providing env, episodes, and agent:

    train_agent(env, episodes, agent)

    The following is the output of the code:

    [Episode 0] - Average Score: 3.0

    Average Score: 0.59

    Note

    To access the source code for this specific section, please refer to https://packt.live/3hoZXdV.

    You can also run this example online at https://packt.live/3dWLwfa.

You just completed the first activity of this chapter. You successfully built and trained a DQN agent combined with CNNs to play the game Breakout. The performance of this model is very similar to the random agent (average score of 0.6). However, if you train it for longer (by increasing the number of episodes), it may achieve a better score.

Activity 10.02: Training a DRQN to Play Breakout

Solution

  1. Open a new Jupyter Notebook and import the relevant packages: gym, random, tensorflow, numpy, and collections:

    import gym

    import random

    import numpy as np

    from collections import deque

    import tensorflow as tf

    from tensorflow.keras.models import Sequential

    from tensorflow.keras.layers import Dense, Conv2D,

    MaxPooling2D, TimeDistributed, Flatten, LSTM

    from tensorflow.keras.optimizers import RMSprop

    import datetime

  2. Set the seed for NumPy and TensorFlow to 168:

    np.random.seed(168)

    tf.random.set_seed(168)

  3. Create the DRQN class with the following methods: the build_model() method to instantiate a CNN combined with a RNN model, the get_action() method to apply the epsilon-greedy algorithm to choose the action to be played, the add_experience() method to store in memory the experience acquired by playing the game, the replay() method, which will perform experience replay by sampling experiences from the memory and train the DRQN model with a callback to save the model every two episodes, and the update_epsilon() method to gradually decrease the epsilon value for epsilon-greedy:

    Activity10_02.ipynb

    class DRQN():

        def __init__(self, env, batch_size=64, max_experiences=5000):

            self.env = env

            self.input_size = self.env.observation_space.shape[0]

            self.action_size = self.env.action_space.n

            self.max_experiences = max_experiences

            self.memory = deque(maxlen=self.max_experiences)

            self.batch_size = batch_size

            self.gamma = 1.0

            self.epsilon = 1.0

            self.epsilon_min = 0.01

            self.epsilon_decay = 0.995

           

            self.model = self.build_model()

            self.target_model = self.build_model()

                 

        def build_model(self):

            model = Sequential()

            model.add(TimeDistributed(Conv2D(32, 8, (4,4),

                                      activation='relu',

                                      padding='valid'),

                      input_shape=(SEQUENCE, IMG_SIZE, IMG_SIZE, 1)))

            model.add(TimeDistributed(Conv2D(64, 4, (2,2),

                                      activation='relu',

                                      padding='valid')))

            model.add(TimeDistributed(Conv2D(64, 3, (1,1),

                                      activation='relu',

                                      padding='valid')))

            model.add(TimeDistributed(Flatten()))

            model.add(LSTM(512))

            model.add(Dense(128, activation='relu'))

            model.add(Dense(self.action_size))

            model.compile(loss='mse',

                          optimizer=RMSprop(lr=0.00025,

                                            epsilon=self.epsilon_min),

                          metrics=['accuracy'])

            return model

  4. Create the initialize_env() function, which will initialize the Breakout environment:

    def initialize_env(env):

      initial_state = env.reset()

      initial_done_flag = False

      initial_rewards = 0

      return initial_state, initial_done_flag, initial_rewards

  5. Create the preprocess_state() function to preprocess the input images:

    def preprocess_state(image, img_size):

        img_temp = image[31:195]

        img_temp = tf.image.rgb_to_grayscale(img_temp)

        img_temp = tf.image.resize

                   (img_temp, [img_size, img_size],

                    method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

        img_temp = tf.cast(img_temp, tf.float32)

        return img_temp

  6. Create the combine_images() function to stack the previous four screenshots:

    def combine_images(new_img, prev_img, img_size, seq=4):

        if len(prev_img.shape) == 4 and prev_img.shape[0] == seq:

            im = np.concatenate

                 ((prev_img[1:, :, :],

                   tf.reshape(new_img, [1, img_size, img_size, 1])),

                   axis=0)

        else:

            im = np.stack([new_img] * seq, axis=0)

        return im

  7. Create the play_game() function, which will play an entire game of Breakout:

    def play_game(agent, state, done, rewards):

        while not done:

            action = agent.get_action(state)

            next_state, reward, done, _ = env.step(action)

            next_state = preprocess_state(next_state, IMG_SIZE)

            next_state = combine_images

                         (new_img=next_state, prev_img=state,

                          img_size=IMG_SIZE, seq=SEQUENCE)

            agent.add_experience(state, action,

                                 reward, next_state, done)

            state = next_state

            rewards += reward 

        return rewards

  8. Create the train_agent() function, which will iterate through a number of episodes where the agent will play a game and perform experience replay:

    def train_agent(env, episodes, agent):

      from collections import deque

      import numpy as np

      scores = deque(maxlen=100)

      for episode in range(episodes):

        state, done, rewards = initialize_env(env)

        state = preprocess_state(state, IMG_SIZE)

        state = combine_images(new_img=state, prev_img=state,

                               img_size=IMG_SIZE, seq=SEQUENCE)

        rewards = play_game(agent, state, done, rewards)

        scores.append(rewards)

        mean_score = np.mean(scores)

        if episode % 50 == 0:

            print(f'[Episode {episode}] - Average Score: {mean_score}')

            agent.target_model.set_weights

            (agent.model.get_weights())

            agent.target_model.save_weights

            (f'drqn_model_weights_{episode}')

        agent.replay(episode)

      print(f"Average Score: {np.mean(scores)}")

  9. Instantiate a Breakout environment called env with gym.make():

    env = gym.make('BreakoutDeterministic-v4')

  10. Create two variables, IMG_SIZE and SEQUENCE, that will take the values 84 and 4, respectively:

    IMG_SIZE = 84

    SEQUENCE = 4

  11. Instantiate a DRQN object called agent:

    agent = DRQN(env)

  12. Create a variable called episodes that will take the value 200:

    episodes = 200

  13. Call the train_agent function by providing env, episodes, and agent:

    train_agent(env, episodes, agent)

    The following is the output of the code:

    [Episode 0] - Average Score: 0.0

    [Episode 50] - Average Score: 0.43137254901960786

    [Episode 100] - Average Score: 0.4

    [Episode 150] - Average Score: 0.54

    Average Score: 0.53

    Note

    To access the source code for this specific section, please refer to https://packt.live/2AjdgMx.

    You can also run this example online at https://packt.live/37mhlLM.

In this activity, we added an LSTM layer and built a DRQN agent. It learned how to play the Breakout game, but didn't achieve satisfactory results even after 200 episodes. It seems this is still at the exploratory stage. You may try to train it for more episodes.

Activity 10.03: Training a DARQN to Play Breakout

Solution

  1. Open a new Jupyter Notebook and import the relevant packages: gym, random, tensorflow, numpy, and collections:

    import gym

    import random

    import numpy as np

    from collections import deque

    import tensorflow as tf

    from tensorflow.keras.models import Sequential

    from tensorflow.keras.layers import Dense, Conv2D,

    MaxPooling2D, TimeDistributed, Flatten, GRU, Attention

    from tensorflow.keras.optimizers import RMSprop

    import datetime

  2. Set the seed for NumPy and TensorFlow to 168:

    np.random.seed(168)

    tf.random.set_seed(168)

  3. Create the DARQN class and create the following methods: the build_model() method to instantiate a CNN combined with an RNN model, the get_action() method to apply the epsilon-greedy algorithm to choose the action to be played, the add_experience() method to store in memory the experience acquired by playing the game, the replay() method, which will perform experience replay by sampling experiences from the memory and train the DARQN model with a callback to save the model every two episodes, and the update_epsilon() method to gradually decrease the epsilon value for epsilon-greedy:

    Activity10_03.ipynb

    class DARQN():

        def __init__(self, env, batch_size=64, max_experiences=5000):

            self.env = env

            self.input_size = self.env.observation_space.shape[0]

            self.action_size = self.env.action_space.n

            self.max_experiences = max_experiences

            self.memory = deque(maxlen=self.max_experiences)

            self.batch_size = batch_size

            self.gamma = 1.0

            self.epsilon = 1.0

            self.epsilon_min = 0.01

            self.epsilon_decay = 0.995

            self.model = self.build_model()

            self.target_model = self.build_model()

        def build_model(self):

            inputs = Input(shape=(SEQUENCE, IMG_SIZE, IMG_SIZE, 1))

            conv1 = TimeDistributed(Conv2D(32, 8, (4,4),

                                    activation='relu',

                                    padding='valid'))(inputs)

            conv2 = TimeDistributed(Conv2D(64, 4, (2,2),

                                    activation='relu',

                                    padding='valid'))(conv1)

            conv3 = TimeDistributed(Conv2D(64, 3, (1,1),

                                    activation='relu',

                                    padding='valid'))(conv2)

            flatten = TimeDistributed(Flatten())(conv3)

            out, states = GRU(512, return_sequences=True,

                              return_state=True)(flatten)

            att = Attention()([out, states])

            output_1 = Dense(256, activation='relu')(att)

            predictions = Dense(self.action_size)(output_1)

            model = Model(inputs=inputs, outputs=predictions)

            model.compile(loss='mse',

                          optimizer=RMSprop(lr=0.00025,

                                            epsilon=self.epsilon_min),

                          metrics=['accuracy'])

            return model

  4. Create the initialize_env() function, which will initialize the Breakout environment:

    def initialize_env(env):

      initial_state = env.reset()

      initial_done_flag = False

      initial_rewards = 0

      return initial_state, initial_done_flag, initial_rewards

  5. Create the preprocess_state() function to preprocess the input images:

    def preprocess_state(image, img_size):

        img_temp = image[31:195]

        img_temp = tf.image.rgb_to_grayscale(img_temp)

        img_temp = tf.image.resize

                   (img_temp, [img_size, img_size],

                   method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

        img_temp = tf.cast(img_temp, tf.float32)

        return img_temp

  6. Create the combine_images() function to stack the previous four screenshots:

    def combine_images(new_img, prev_img, img_size, seq=4):

        if len(prev_img.shape) == 4 and prev_img.shape[0] == seq:

            im = np.concatenate((prev_img[1:, :, :],

                                 tf.reshape

                                 (new_img, [1, img_size,

                                            img_size, 1])), axis=0)

        else:

            im = np.stack([new_img] * seq, axis=0)

        return im

  7. Create the preprocess_state() function to preprocess the input images:

    def play_game(agent, state, done, rewards):

        while not done:

            action = agent.get_action(state)

            next_state, reward, done, _ = env.step(action)

            next_state = preprocess_state(next_state, IMG_SIZE)

            next_state = combine_images

                         (new_img=next_state, prev_img=state,

                          img_size=IMG_SIZE, seq=SEQUENCE)

            agent.add_experience(state, action, reward,

                                 next_state, done)

             state = next_state

           rewards += reward

        return rewards

  8. Create the train_agent() function, which will iterate through a number of episodes where the agent will play a game and perform experience replay:

    def train_agent(env, episodes, agent):

      from collections import deque

      import numpy as np

      scores = deque(maxlen=100)

      for episode in range(episodes):

        state, done, rewards = initialize_env(env)

        state = preprocess_state(state, IMG_SIZE)

        state = combine_images

                (new_img=state, prev_img=state,

                 img_size=IMG_SIZE, seq=SEQUENCE)

        rewards = play_game(agent, state, done, rewards)

        scores.append(rewards)

        mean_score = np.mean(scores)

        if episode % 50 == 0:

            print(f'[Episode {episode}] - Average Score: {mean_score}')

            agent.target_model.set_weights

            (agent.model.get_weights())

            agent.target_model.save_weights

            (f'drqn_model_weights_{episode}')

        agent.replay(episode)

      print(f"Average Score: {np.mean(scores)}")

  9. Instantiate a Breakout environment called env with gym.make():

    env = gym.make('BreakoutDeterministic-v4')

  10. Create two variables, IMG_SIZE and SEQUENCE, that will take the values 84 and 4, respectively:

    IMG_SIZE = 84

    SEQUENCE = 4

  11. Instantiate a DRQN object called agent:

    agent = DRQN(env)

  12. Create a variable called episodes that will take the value 400:

    episodes = 400

  13. Call the train_agent function by providing env, episodes, and agent:

    train_agent(env, episodes, agent)

    The following is the output of the code:

    [Episode 0] - Average Score: 1.0

    [Episode 50] - Average Score: 2.4901960784313726

    [Episode 100] - Average Score: 3.92

    [Episode 150] - Average Score: 7.37

    [Episode 200] - Average Score: 7.76

    [Episode 250] - Average Score: 7.91

    [Episode 300] - Average Score: 10.33

    [Episode 350] - Average Score: 10.94

    Average Score: 10.83

In this activity, we built and trained a DARQN agent. It successfully learned how to play the Breakout game. It started with a score of 1.0 and achieved a final score of over 10 after 400 episodes, as shown in the preceding results. This is quite remarkable performance.

Note

To access the source code for this specific section, please refer to https://packt.live/2XUDZrH.

You can also run this example online at https://packt.live/2UDCsUP.

11. Policy-Based Methods for Reinforcement Learning

Activity 11.01: Creating an Agent That Learns a Model Using DDPG

  1. Import the necessary libraries (os, gym, and ddpg):

    import os

    import gym

    from ddpg import *

  2. First, we create our Gym environment (LunarLanderContinuous-v2), as we did previously:

    env = gym.make("LunarLanderContinuous-v2")

  3. Initialize the agent with some sensible hyperparameters, as in Exercise 11.02, Creating a Learning Agent:

    agent = Agent(alpha=0.000025, beta=0.00025,

                  inp_dimensions=[8], tau=0.001,

                  env=env, bs=64, l1_size=400, l2_size=300,

                  nb_actions=2)

  4. Set up a random seed so that our experiments are reproducible.

    np.random.seed(0)

  5. Create a blank array to story the scores; you can name it history. Iterate for at least 1000 episodes and in each episode, set a running score variable to 0 and the done flag to False, then reset the environment. Then, when the done flag is not True, carry out the following step:

    history = []

    for i in np.arange(1000):

        observation = env.reset()

        score = 0

        done = False

        while not done:

  6. Select the observations and get the new state, reward, and done flags. Save the observation, action, reward, state_new, and done flags. Call the learn function of the agent and add the current reward to the running score. Set the new state as the observation and finally, when the done flag is True, append score to history:

    history = []

    for i in np.arange(1000):

        observation = env.reset()

        score = 0

        done = False

        while not done:

            action = agent.select_action(observation)

            state_new, reward, done, info = env.step(action)

            agent.remember(observation, action, reward,

                           state_new, int(done))

            agent.learn()

            score += reward

            observation = state_new

            # env.render() # Uncomment to see the game window

        history.append(score)

    You can print out score and mean score_history results to see how the agent is learning over time.

    Note

    To observe the rewards, we can simply add the print statement. The rewards will be similar to those in the previous exercise.

    Run the code for at least 1,000 iterations and watch your lander attempt to land on the lunar surface.

    Note

    To see the Lunar Lander simulation once the policy is learned, we just need to uncomment the env.render() code from the preceding code block. As seen in the previous exercise, this will open another window, where we will be able to see the game simulation.

    Here's a glimpse of how your lunar lander might behave once it has learned the policy:

    Figure 11.16: Screenshots from the environment after 1,000 rounds of training

Figure 11.16: Screenshots from the environment after 1,000 rounds of training

Note

To access the source code for this specific section, please refer to https://packt.live/30X03Ul.

This section does not currently have an online interactive example and will need to be run locally.

Activity 11.02: Loading the Saved Policy to Run the Lunar Lander Simulation

  1. Import the essential Python libraries:

    import os

    import gym

    import torch as T

    import numpy as np

    from PIL import Image

  2. Set your device using the device parameter:

    device = T.device("cuda:0" if T.cuda.is_available() else "cpu")

  3. Define the ReplayBuffer class, as we did in the previous exercise:

    class ReplayBuffer:

        def __init__(self):

            self.memory_actions = []

            self.memory_states = []

            self.memory_log_probs = []

            self.memory_rewards = []

            self.is_terminals = []

        def clear_memory(self):

            del self.memory_actions[:]

            del self.memory_states[:]

            del self.memory_log_probs[:]

            del self.memory_rewards[:]

            del self.is_terminals[:]

  4. Define the ActorCritic class, as we did in the previous exercise:

    Activity11_02.ipynb

    class ActorCritic(T.nn.Module):

        def __init__(self, state_dimension, action_dimension,

                     nb_latent_variables):

            super(ActorCritic, self).__init__()

            self.action_layer = T.nn.Sequential

                                (T.nn.Linear(state_dimension,

                                             nb_latent_variables),

                                T.nn.Tanh(),

                                T.nn.Linear(nb_latent_variables,

                                            nb_latent_variables),

                                T.nn.Tanh(),

                                T.nn.Linear(nb_latent_variables,

                                            action_dimension),

                                T.nn.Softmax(dim=-1))

  5. Define the Agent class, as we did in the previous exercise:

    Activity11_02.ipynb

    class Agent:

        def __init__(self, state_dimension, action_dimension,

        nb_latent_variables, lr, betas, gamma, K_epochs, eps_clip):

            self.lr = lr

            self.betas = betas

            self.gamma = gamma

            self.eps_clip = eps_clip

            self.K_epochs = K_epochs

          

            self.policy = ActorCritic(state_dimension,

                                      action_dimension,

                                      nb_latent_variables).to(device)

            self.optimizer = T.optim.Adam

                             (self.policy.parameters(),

                              lr=lr, betas=betas)

            self.policy_old = ActorCritic(state_dimension,

                                          action_dimension,

                                          nb_latent_variables)

                                          .to(device)

            self.policy_old.load_state_dict(self.policy.state_dict())

  6. Create the Lunar Lander environment. Initialize the random seed:

    env = gym.make(„LunarLander-v2")

    np.random.seed(0)

    render = True

  7. Create the memory buffer and initialize the agent with hyperparameters, as in the previous exercise:

    memory = ReplayBuffer()

    agent = Agent(state_dimension=env.observation_space.shape[0],

                  action_dimension=4, nb_latent_variables=64,

                  lr=0.002, betas=(0.9, 0.999), gamma=0.99,

                  K_epochs=4, eps_clip=0.2)

  8. Load the saved policy as an old policy from the Exercise11.03 folder:

    agent.policy_old.load_state_dict

    (T.load("../Exercise11.03/PPO_LunarLander-v2.pth"))

  9. Finally, loop through your desired number of episodes. In every iteration, start by initializing the episode reward as 0. Do not forget to reset the state. Run another loop, specifying the max timestamp. Get the state, reward, and done flags for each action taken and add the reward to the episode reward. Render the environment to see how your Lunar Lander is doing:

    for ep in range(5):

        ep_reward = 0

        state = env.reset()

        for t in range(300):

            action = agent.policy_old.act(state, memory)

            state, reward, done, _ = env.step(action)

            ep_reward += reward

            if render:

                env.render()

                img = env.render(mode = „rgb_array")

                img = Image.fromarray(img)

                image_dir = "./gif"

                if not os.path.exists(image_dir):

                    os.makedirs(image_dir)

                img.save(os.path.join(image_dir, "{}.jpg".format(t)))

            if done:

                break

        print("Episode: {}, Reward: {}".format(ep, int(ep_reward)))

        ep_reward = 0

        env.close()

    The following is the output of the code:

    Episode: 0, Reward: 272

    Episode: 1, Reward: 148

    Episode: 2, Reward: 249

    Episode: 3, Reward: 169

    Episode: 4, Reward: 35

    You'll see the reward oscillate in the positive zone as our Lunar Lander now has some idea of what a good policy can be. The reward may oscillate as there is more scope for learning. You might iterate over a few thousand more iterations to make your agent learn a better policy. Do not hesitate to tinker with the parameters specified in the code. The following screenshot shows the simulation output of some of the stages:

    Figure 11.17: The environment showing the simulation of the Lunar Lander

Figure 11.17: The environment showing the simulation of the Lunar Lander

Before this activity, we explained some necessary concepts, such as creating a learning agent, training a policy, saving and loading the learned policies, and so on, in isolation. Through carrying out this activity, you learned how to build a complete RL project or a working prototype on your own by combining all that you have learned in this chapter.

Note

The complete simulation output can be found in the form of images at https://packt.live/3ehPaAj.

To access the source code for this specific section, please refer to https://packt.live/2YhzrvD.

This section does not currently have an online interactive example and will need to be run locally.

12. Evolutionary Strategies for RL

Activity 12.01: Cart-Pole Activity

  1. Import the required packages as follows:

    import gym

    import numpy as np

    import math

    import tensorflow as tf

    from matplotlib import pyplot as plt

    from random import randint

    from statistics import median, mean

  2. Initialize the environment and the state and action space shapes:

    env = gym.make('CartPole-v0')

    no_states = env.observation_space.shape[0]

    no_actions = env.action_space.n

  3. Create a function to generate randomly selected initial network parameters:

    def initial(run_test):

        #initialize arrays

        i_w = []

        i_b = []

        h_w = []

        o_w = []

        no_input_nodes = 8

        no_hidden_nodes = 4

        

        for r in range(run_test):

            input_weight = np.random.rand(no_states, no_input_nodes)

            input_bias = np.random.rand((no_input_nodes))

            hidden_weight = np.random.rand(no_input_nodes,

                                           no_hidden_nodes)

            output_weight = np.random.rand(no_hidden_nodes,

                                           no_actions)

            i_w.append(input_weight)

            i_b.append(input_bias)

            h_w.append(hidden_weight)

            o_w.append(output_weight)

        chromosome =[i_w, i_b, h_w, o_w]

        return chromosome

  4. Create a function to generate the neural network using the set of parameters:

    def nnmodel(observations, i_w, i_b, h_w, o_w):

        alpha = 0.199

        observations = observations/max

                       (np.max(np.linalg.norm(observations)),1)

        #apply relu on layers

        funct1 = np.dot(observations, i_w)+ i_b.T

        layer1= tf.nn.relu(funct1)-alpha*tf.nn.relu(-funct1)

        funct2 = np.dot(layer1,h_w)

        layer2 = tf.nn.relu(funct2) - alpha*tf.nn.relu(-funct2)

        funct3 = np.dot(layer2, o_w)

        layer3 = tf.nn.relu(funct3)-alpha*tf.nn.relu(-funct3)

        #apply softmax

        layer3 = np.exp(layer3)/np.sum(np.exp(layer3))

        output = layer3.argsort().reshape(1,no_actions)

        action = output[0][0]

        return action

  5. Create a function to get the total reward for 300 steps when using the neural network:

    def get_reward(env, i_w, i_b, h_w, o_w):

        current_state = env.reset()

        total_reward = 0

        for step in range(300):

            action = nnmodel(current_state, i_w, i_b, h_w, o_w)

            next_state, reward, done, info = env.step(action)

            total_reward += reward

            current_state = next_state

            if done:

                break

        return total_reward

  6. Create a function to get the fitness scores for each element of the population when running the initial random selection:

    def get_weights(env, run_test):

        rewards = []

        chromosomes = initial(run_test)

        for trial in range(run_test):

            i_w = chromosomes[0][trial]

            i_b = chromosomes[1][trial]

            h_w = chromosomes[2][trial]

            o_w = chromosomes[3][trial]

            total_reward = get_reward(env, i_w, i_b, h_w, o_w)

            rewards = np.append(rewards, total_reward)

        chromosome_weight = [chromosomes, rewards]

        return chromosome_weight

  7. Create a mutation function:

    def mutate(parent):

        index = np.random.randint(0, len(parent))

        if(0 < index < 10):

            for idx in range(index):

                n = np.random.randint(0, len(parent))

                parent[n] = parent[n] + np.random.rand()

        mutation = parent

        return mutation

  8. Create a single-point crossover function:

    def crossover(list_chr):

        gen_list = []

        gen_list.append(list_chr[0])

        gen_list.append(list_chr[1])

        for i in range(10):

            m = np.random.randint(0, len(list_chr[0]))

            parent = np.append(list_chr[0][:m], list_chr[1][m:])

            child = mutate(parent)

            gen_list.append(child)

        return gen_list

  9. Create a function for creating the next generation by selecting the pair with the highest rewards:

    def generate_new_population(rewards, chromosomes):

        #2 best reward indexes selected

        best_reward_idx = rewards.argsort()[-2:][::-1]

        list_chr = []

        new_i_w =[]

        new_i_b = []

        new_h_w = []

        new_o_w = []

        new_rewards = []

  10. Get the current parameters for the weights and bias using a for loop to go through the indices:

        for ind in best_reward_idx:

            weight1 = chromosomes[0][ind]

            w1 = weight1.reshape(weight1.shape[1], -1)

            bias1 = chromosomes[1][ind]

            b1 = np.append(w1, bias1)

            weight2 = chromosomes[2][ind]

            w2 = np.append

                 (b1, weight2.reshape(weight2.shape[1], -1))

            weight3 = chromosomes[3][ind]

            chr = np.append(w2, weight3)

            #the 2 best parents are selected

            list_chr.append(chr)

        gen_list = crossover(list_chr)

  11. Build the neural network using the identified parameters and obtain a new reward based on the constructed neural network:

        for l in gen_list:

            chromosome_w1 = np.array(l[:chromosomes[0][0].size])

            new_input_weight = np.reshape(chromosome_w1,(-1,chromosomes[0][0].shape[1]))

            new_input_bias = np.array

                             ([l[chromosome_w1.size:chromosome_w1

                               .size+chromosomes[1][0].size]]).T

            hidden = chromosome_w1.size + new_input_bias.size

            chromosome_w2 = np.array

                            ([l[hidden:hidden

                             + chromosomes[2][0].size]])

            new_hidden_weight = np.reshape

                                (chromosome_w2,

                                (-1, chromosomes[2][0].shape[1]))

            final = chromosome_w1.size+new_input_bias.size

                    +chromosome_w2.size

            new_output_weight = np.array([l[final:]]).T

            new_output_weight = np.reshape

                                (new_output_weight,

                                (-1, chromosomes[3][0].shape[1]))

            new_i_w.append(new_input_weight)

            new_i_b.append(new_input_bias)

            new_h_w.append(new_hidden_weight)

            new_o_w.append(new_output_weight)

            new_reward = get_reward(env, new_input_weight,

                                    new_input_bias, new_hidden_weight,

                                    new_output_weight)

            new_rewards = np.append(new_rewards, new_reward)

        generation = [new_i_w, new_i_b, new_h_w, new_o_w]

        return generation, new_rewards

  12. Create a function to output the convergence graph:

    def graphics(act):

        plt.plot(act)

        plt.xlabel('No. of generations')

        plt.ylabel('Rewards')

        plt.grid()

        print('Mean rewards:', mean(act))

        return plt.show()

  13. Create a function for the genetic algorithm that outputs the parameters of the neural network based on the highest average reward:

    def ga_algo(env, run_test, no_gen):

        weights = get_weights(env, run_test)

        chrom = weights[0]

        current_rewards = weights[1]

        act = []

        for n in range(no_gen):

            gen, new_rewards = generate_new_population

                               (current_rewards, chrom)

            average = np.average(current_rewards)

            new_average = np.average(new_rewards)

            if average > new_average:

                parameters = [chrom[0][0], chrom[1][0],

                              chrom[2][0], chrom[3][0]]

            else:

                 parameters = [gen[0][0], gen[1][0],

                               gen[2][0], gen[3][0]]

            chrom = gen

            current_rewards = new_rewards

            max_arg = np.amax(current_rewards)

            print('Generation:{}, max reward:{}'.format(n+1, max_arg))

            act = np.append(act, max_arg)

        graphics(act)

        return parameters

  14. Create a function that decodes the array of parameters to each neural network parameter:

    def params(parameters):

        i_w = parameters[0]

        i_b = parameters[1]

        h_w = parameters[2]

        o_w = parameters[3]

        return i_w,i_b,h_w,o_w

  15. Set the generations to 50, the number of trial tests to 15, and the number of steps and trials to 500:

    generations = []

    no_gen = 50

    run_test = 15

    trial_length = 500

    no_trials = 500

    rewards = []

    final_reward = 0

    parameters = ga_algo(env, run_test, no_gen)

    i_w, i_b, h_w, o_w = params(parameters)

    for trial in range(no_trials):

        current_state = env.reset()

        total_reward = 0

        for step in range(trial_length):

            env.render()

            action = nnmodel(current_state, i_w,i_b, h_w, o_w)

            next_state,reward, done, info = env.step(action)

            total_reward += reward

            current_state = next_state

            if done:

                break

        print('Trial:{}, total reward:{}'.format(trial, total_reward))

        final_reward +=total_reward

    print('Average reward:', final_reward/no_trials)

    env.close()

    The output (just the first few lines are shown here) will be similar to the following:

    Generation:1, max reward:11.0

    Generation:2, max reward:11.0

    Generation:3, max reward:10.0

    Generation:4, max reward:10.0

    Generation:5, max reward:11.0

    Generation:6, max reward:10.0

    Generation:7, max reward:10.0

    Generation:8, max reward:10.0

    Generation:9, max reward:11.0

    Generation:10, max reward:10.0

    Generation:11, max reward:10.0

    Generation:12, max reward:10.0

    Generation:13, max reward:10.0

    Generation:14, max reward:10.0

    Generation:15, max reward:10.0

    Generation:16, max reward:10.0

    Generation:17, max reward:10.0

    Generation:18, max reward:10.0

    Generation:19, max reward:11.0

    Generation:20, max reward:11.0

    The output can be visualized in a plot as follows:

    Figure 12.15: Rewards obtained over the generations

Figure 12.15: Rewards obtained over the generations

The average of the rewards output (just the last few lines are shown here) will be similar to the following:

Trial:486, total reward:8.0

Trial:487, total reward:9.0

Trial:488, total reward:10.0

Trial:489, total reward:10.0

Trial:490, total reward:8.0

Trial:491, total reward:9.0

Trial:492, total reward:9.0

Trial:493, total reward:10.0

Trial:494, total reward:10.0

Trial:495, total reward:9.0

Trial:496, total reward:10.0

Trial:497, total reward:9.0

Trial:498, total reward:10.0

Trial:499, total reward:9.0

Average reward: 9.384

You will notice that depending on the start state, the convergence of the GA algorithm to the highest score will vary; also, the neural network model will not always achieve the optimal solution. The purpose of this activity was for you to implement the genetic algorithm techniques studied in this chapter and to see how you can combine evolutionary methods of neural network parameter tuning for action selection.

Note

To access the source code for this specific section, please refer to https://packt.live/2AmKR8m.

This section does not currently have an online interactive example and will need to be run locally.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset