"""
Vanilla Policy Gradient(VPG or REINFORCE)
-----------------------------------------
The policy gradient algorithm works by updating policy parameters via stochastic gradient ascent on policy performance.
It's an on-policy algorithm can be used for environments with either discrete or continuous action spaces.
Here is an example on discrete action space game CartPole-v0.
To apply it on continuous action space, you need to change the last softmax layer and the get_action function.
Reference
---------
Cookbook: Barto A G, Sutton R S. Reinforcement Learning: An Introduction[J]. 1998.
MorvanZhou's tutorial page: https://morvanzhou.github.io/tutorials/
MorvanZhou's code: https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow/
Prerequisites
--------------
tensorflow >=2.0.0a0
tensorflow-probability 0.6.0
tensorlayer >=2.0.0
"""
import time
from rlzoo.common.utils import *
from rlzoo.common.policy_networks import *
############################### PG ####################################
[docs]class PG:
"""
PG class
"""
def __init__(self, net_list, optimizers_list):
"""
:param net_list: a list of networks (value and policy) used in the algorithm, from common functions or customization
:param optimizers_list: a list of optimizers for all networks and differentiable variables
"""
assert len(net_list) == 1
assert len(optimizers_list) == 1
self.name = 'PG'
self.model = net_list[0]
assert isinstance(self.model, StochasticPolicyNetwork)
self.buffer = []
print('Policy Network', self.model)
self.optimizer = optimizers_list[0]
[docs] def get_action(self, s):
"""
choose action with probabilities.
:param s: state
:return: act
"""
return self.model([s])[0].numpy()
[docs] def get_action_greedy(self, s):
"""
choose action with greedy policy
:param s: state
:return: act
"""
return self.model([s], greedy=True).numpy()[0]
[docs] def store_transition(self, s, a, r):
"""
store data in memory buffer
:param s: state
:param a: act
:param r: reward
:return:
"""
self.buffer.append([s, np.array(a, np.float32), np.array(r, np.float32)])
[docs] def update(self, gamma):
"""
update policy parameters via stochastic gradient ascent
:return: None
"""
# discount and normalize episode reward
s, a, r = zip(*self.buffer)
s, a, r = np.array(s), np.array(a), np.array(r).flatten()
discounted_ep_rs_norm = self._discount_and_norm_rewards(r, gamma)
with tf.GradientTape() as tape:
self.model(s)
neg_log_prob = self.model.policy_dist.neglogp(a)
loss = tf.reduce_mean(neg_log_prob * discounted_ep_rs_norm) # reward guided loss
grad = tape.gradient(loss, self.model.trainable_weights)
self.optimizer.apply_gradients(zip(grad, self.model.trainable_weights))
self.buffer = []
return discounted_ep_rs_norm
def _discount_and_norm_rewards(self, reward_list, gamma):
"""
compute discount_and_norm_rewards
:return: discount_and_norm_rewards
"""
# discount episode rewards
discounted_ep_rs = np.zeros_like(reward_list)
running_add = 0
for t in reversed(range(0, len(reward_list))):
running_add = running_add * gamma + reward_list[t]
discounted_ep_rs[t] = running_add
# normalize episode rewards
discounted_ep_rs -= np.mean(discounted_ep_rs)
std = np.std(discounted_ep_rs)
if std != 0:
discounted_ep_rs /= np.std(discounted_ep_rs)
discounted_ep_rs = discounted_ep_rs[:, np.newaxis]
return discounted_ep_rs
[docs] def save_ckpt(self, env_name):
"""
save trained weights
:return: None
"""
save_model(self.model, 'model_policy', self.name, env_name)
[docs] def load_ckpt(self, env_name):
"""
load trained weights
:return: None
"""
load_model(self.model, 'model_policy', self.name, env_name)
[docs] def learn(self, env, train_episodes=200, test_episodes=100, max_steps=200, save_interval=100,
mode='train', render=False, gamma=0.95, plot_func=None):
"""
:param env: learning environment
:param train_episodes: total number of episodes for training
:param test_episodes: total number of episodes for testing
:param max_steps: maximum number of steps for one episode
:param save_interval: time steps for saving
:param mode: train or test
:param render: render each step
:param gamma: reward decay
:param plot_func: additional function for interactive module
:return: None
"""
if mode == 'train':
print('Training... | Algorithm: {} | Environment: {}'.format(self.name, env.spec.id))
reward_buffer = []
t0 = time.time()
for i_episode in range(1, train_episodes + 1):
observation = env.reset()
ep_rs_sum = 0
for step in range(max_steps):
if render:
env.render()
action = self.get_action(observation)
observation_, reward, done, info = env.step(action)
self.store_transition(observation, action, reward)
ep_rs_sum += reward
observation = observation_
if done:
break
print('Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format(
i_episode, train_episodes, ep_rs_sum, time.time() - t0)
)
reward_buffer.append(ep_rs_sum)
if plot_func is not None:
plot_func(reward_buffer)
self.update(gamma)
if i_episode and i_episode % save_interval == 0:
self.save_ckpt(env_name=env.spec.id)
plot_save_log(reward_buffer, algorithm_name='PG', env_name=env.spec.id)
self.save_ckpt(env_name=env.spec.id)
plot_save_log(reward_buffer, algorithm_name='PG', env_name=env.spec.id)
elif mode == 'test':
# test
self.load_ckpt(env_name=env.spec.id)
print('Testing... | Algorithm: {} | Environment: {}'.format(self.name, env.spec.id))
t0 = time.time()
for eps in range(test_episodes):
observation = env.reset()
ep_rs_sum = 0
for step in range(max_steps):
if render:
env.render()
action = self.get_action_greedy(observation)
observation, reward, done, info = env.step(action)
ep_rs_sum += reward
if done:
break
print('Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format(
eps, test_episodes, ep_rs_sum, time.time() - t0)
)
else:
print('unknown mode type')