Piyush Thakur
Before coming to the topic, let me excite you first what Reinforcement Learning has achieved in the field of games and machine control. This learning technique has been applied to a system that could learn to play just about any Atari game from scratch and can eventually outperform humans in most of them. Can you imagine, humans building another system which could defeat humans. It actually happened in 2016 with the victory of the system AlphaGo against Lee Sedol, a legendary professional player of the game of Go. Further on, this learning technique has been successfully applied to controlling robots, observing stock market prices, self-driving cars, recommender systems and many more.
In short if I have to explain but will also give a brief overview of what actually Reinforcement Learning does is that there is a software agent. This “agent” can be the program controlling robots, playing a board game such as Go. What this agent does is it makes observations within an environment and takes actions according to the observations taken. In return, the agent receives rewards. If the “actions” taken are the right one, the agent will receive positive reward and if not, it will receive negative rewards. Now, you must be getting one question: how this agent determines its action whether it will give positive reward or a negative one. The answer is clear and many would have guessed it till now, it is the algorithm which is the set of instructions to achieve a particular task. In Reinforcement Learning, this algorithm is named as POLICY. It basically takes “observations” as input and gives “actions” as output.
Now, let’s consider, we have to balance the pole on a moving cart. How can we achieve this? As previously said, we will be needing an algorithm to train the system so that the pole could balance on a moving cart. There are different approaches to this but here I will present you with a Hard Coded Policy and a well known optimization technique called Policy Gradients.
Before coming to Policy Gradients, one of the challenges of Reinforcement Learning is that in order to train an agent, you first need to have a working environment. If you want to program a walking robot, then the working environment is the real world and training in the real world has many repercussions, so we need a simulated environment and OpenAI Gym serves the purpose at its best. OpenAI Gym is a great toolkit for training agents, developing and comparing Reinforcement Learning algorithms. It provides many simulated environments(Atari games, board games, 2D and 3D physical simulations) for your learning agents to interact with.
To run a Gym in a colab, you have to install prerequisites like xvbf, opengl & other python-dev packages. For rendering the environment, you can use pyvirtualdisplay. This is clearly mentioned in my repo. Head over to this link and get it done, it is shown in Colab Preambles part of the code: OpenAI Gym prerequisites
To start working with CartPole Environment, we have to first import all the python packages required. Head over to this link and get it done, it is shown in Setup and Introduction to OpenAI gym part of the code: Setup
Create a CartPole environment. This is a 2D simulation in which a cart can be accelerated left or right in order to balance a pole placed on top of it. The agent must move the cart left or right to keep the pole upright. Create an environment with make().
env = gym.make('CartPole-v1')
After an environment is created, you must initialize it using the reset() method. This obs returns the first observation: Observations depend on the type of environment.
env.seed(42)
obs = env.reset()
obs
For the CartPole environment, each observation is a 1D NumPy array containing four floats: these floats represent the cart’s horizontal position (0.0 = center), its velocity (positive means right), the angle of the pole (0.0 = vertical), and its angular velocity (positive means clockwise).
Let’s plot the current observation:
def plot_environment(env, figsize=(5,4)):
plt.figure(figsize=figsize)
img = env.render(mode="rgb_array")
plt.imshow(img)
plt.axis("off")
return img
plot_environment(env)
plt.show()
Let's see how to interact with an environment. Your agent will need to select an action from an action space (the set of possible actions).
env.action_space
Discrete(2) means that the possible actions are integers 0 and 1, which represent accelerating the cart left (0) or right (1).
Now, we can make use of this action space to play with the cart-pole environment.
action = 1 # accelerate right
obs, reward, done, info = env.step(action)
obs
The cart is now moving toward the right (obs[1] > 0). The pole is still tilted toward the right (obs[2] > 0), but its angular velocity is now negative (obs[3] < 0), so it will likely be tilted toward the left after the next step.
The step() method executes the action and returns 4 values:
Let’s move on to the main business, training the model so that the pole could balance itself on a moving cart. First, we will train our model with a policy which is hard-coded. This will be a simple policy that accelerates the cart left when the pole is leaning toward the left and accelerates the cart right when the pole is leaning toward the right. We will run this policy to see the average rewards it gets over 500 episodes:
This is the hard coded policy:
env.seed(42)
def basic_policy(obs):
angle = obs[2]
return 0 if angle < 0 else 1
totals = []
for episode in range(500):
episode_rewards = 0
obs = env.reset()
for step in range(200):
action = basic_policy(obs)
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
np.mean(totals), np.std(totals), np.min(totals), np.max(totals)
From the output, we can see that np.max(totals) return 68. Even with 500 tries, this policy never managed to keep the pole upright for more than 68 consecutive steps. Not great. If you look at the simulation, you will see that the cart oscillates left and right more and more strongly until the pole tilts too much.
The hard coded policy didn’t work to the expectations as this environment is considered solved when the agent keeps the poll up for 200 steps.
So, we will be creating a Neural Network Policy which will take observation as input and it will output the action. It will estimate a probability for each action and then you will select an action randomly according to the estimated probability. In a CartPole environment where there are two actions(left or right), you will need just one output neuron which will output the probability p of action 0(left) and the probability of action 1(right) will be 1-p. The CartPole problem is simple as the observations are noise-free meaning that if the observation has some noise then in that case you have to use the past few observations to estimate the most likely current state. They also contain the environment’s full state meaning that it has no hidden state like if the environment only revealed the position of the cart but not it’s velocity, you would have to consider not only the current observation but also the previous observation in order to estimate the current velocity.
Here is a simple Sequential model to define the neural policy network:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)
n_inputs = 4
model = keras.models.Sequential([
keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
keras.layers.Dense(1, activation="sigmoid"),
])
While evaluating actions, there comes a popular problem known as the credit assignment problem. In Reinforcement Learning, the only guidance the agent gets is through rewards. But these rewards are sparse and delayed. Consider this example, if the agent manages to balance the pole for 50 steps, then we actually don’t know due to which particular action the pole gets unbalanced. We are only aware of that due to the last action it must have been unbalanced but this is not the truth because the previous actions have also been responsible for the cause.
To tackle this problem, we will be needing this Policy Gradient algorithm. Basically, it optimizes the parameters of a policy by following the gradients toward higher rewards. The process is:
Let us use tf.keras to implement this algorithm: We will start with creating a play_one_step() function which will play the action for one step.
def play_one_step(env, obs, model, loss_fn):
with tf.GradientTape() as tape:
left_proba = model(obs[np.newaxis])
action = (tf.random.uniform([1, 1]) > left_proba)
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
loss = tf.reduce_mean(loss_fn(y_target, left_proba))
grads = tape.gradient(loss, model.trainable_variables)
obs, reward, done, info = env.step(int(action[0, 0].numpy()))
return obs, reward, done, grads
Let us see what this lines of code does:
Now, we have to create another function play_multiple_episodes() which will depend on the play_one_step() function and will play multiple episodes. This code will return a list of reward lists (one reward list per episode, containing one reward per step), as well as a list of gradient lists (one gradient list per episode, each containing one tuple of gradients per step and each tuple containing one gradient tensor per trainable variable).
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
all_rewards = []
all_grads = []
for episode in range(n_episodes):
current_rewards = []
current_grads = []
obs = env.reset()
for step in range(n_max_steps):
obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
current_rewards.append(reward)
current_grads.append(grads)
if done:
break
all_rewards.append(current_rewards)
all_grads.append(current_grads)
return all_rewards, all_grads
The Policy Gradient algorithm will use this play_multiple_episodes() function to play the game several times, then it will go back and look at all the rewards, discount them and normalize them as previously mentioned.
To discount and normalize, we will need two more functions:
The first one is the discount_rewards() function which will compute the sum of future discounted rewards at each step.
def discount_rewards(rewards, discount_rate):
discounted = np.array(rewards)
for step in range(len(rewards) - 2, -1, -1):
discounted[step] += discounted[step + 1] * discount_rate
return discounted
The second one is the discount_and_normalize_rewards() function which will normalize all the previously computed discounted rewards across many episodes by subtracting the mean and dividing by the standard deviation.
def discount_and_normalize_rewards(all_rewards, discount_rate):
all_discounted_rewards = [discount_rewards(rewards, discount_rate)
for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean) / reward_std
for discounted_rewards in all_discounted_rewards]
Let me show you what actually happens with these two functions:
discount_rewards([10, 0, -50], discount_rate=0.8)
discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_rate=0.8)
Now, we are almost ready to run the algorithm. Some things are still required. Those are the hyperparameters.
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95
This will run 150 training iterations, will play 10 episodes per iteration, and each episode will last at most 200 steps. We will use a discount factor of 0.95.
We will need an optimizer and loss function. We will take Adam optimizer with learning rate 0.01 and binary cross-entropy loss function because we are training a binary classifier.(two possible actions: left or right)
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy
Let’s move forward and run the training loop:
for iteration in range(n_iterations):
all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
total_rewards = sum(map(sum, all_rewards))
print("\rIteration: {}, mean rewards: {:.1f}".format(iteration, total_rewards / n_episodes_per_update), end="")
all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_rate)
all_mean_grads = []
for var_index in range(len(model.trainable_variables)):
mean_grads = tf.reduce_mean([final_reward * all_grads[episode_index][step][var_index]
for episode_index, final_rewards in enumerate(all_final_rewards)
for step, final_reward in enumerate(final_rewards)], axis=0)
all_mean_grads.append(mean_grads)
optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
env.close()
Let us see what this lines of code does:
Hurray, we trained our neural network policy and we are ready to see how well it is trained through the animation:
frames = render_policy_net(model)
plot_animation(frames)
Just reading makes it boring I know that because I myself get bored, so just head over to the code in my repo and try it out. Here is the link: A Tour of Policy Gradients
You will surely enjoy this
Thank you.