Source code for rlzoo.algorithms.trpo.trpo

"""
Trust Region Policy Optimization (TRPO)
---------------------------------------
PG method with a large step can collapse the policy performance,
even with a small step can lead a large differences in policy.
TRPO constraint the step in policy space using KL divergence (rather than in parameter space),
which can monotonically improve performance and avoid a collapsed update.

Reference
---------
Trust Region Policy Optimization, Schulman et al. 2015
High Dimensional Continuous Control Using Generalized Advantage Estimation, Schulman et al. 2016
Approximately Optimal Approximate Reinforcement Learning, Kakade and Langford 2002
openai/spinningup : http://spinningup.openai.com/en/latest/algorithms/trpo.html

Prerequisites
--------------
tensorflow >=2.0.0a0
tensorflow-probability 0.6.0
tensorlayer >=2.0.0

"""
import time

from rlzoo.common.utils import *
from rlzoo.common.policy_networks import *
from rlzoo.common.value_networks import *

EPS = 1e-8  # epsilon

"""

Trust Region Policy Optimization 

(with support for Natural Policy Gradient)

"""


[docs]class TRPO: """ trpo class """ def __init__(self, net_list, optimizers_list, damping_coeff=0.1, cg_iters=10, delta=0.01): """ :param net_list: a list of networks (value and policy) used in the algorithm, from common functions or customization :param optimizers_list: a list of optimizers for all networks and differentiable variables :param damping_coeff: Artifact for numerical stability :param cg_iters: Number of iterations of conjugate gradient to perform :param delta: KL-divergence limit for TRPO update. """ assert len(net_list) == 2 assert len(optimizers_list) == 1 self.name = 'TRPO' self.critic, self.actor = net_list assert isinstance(self.critic, ValueNetwork) assert isinstance(self.actor, StochasticPolicyNetwork) self.damping_coeff, self.cg_iters = damping_coeff, cg_iters self.delta = delta # Optimizer for value function self.critic_opt, = optimizers_list self.old_dist = make_dist(self.actor.action_space)
[docs] @staticmethod def flat_concat(xs): """ flat concat input :param xs: a list of tensor :return: flat tensor """ return tf.concat([tf.reshape(x, (-1,)) for x in xs], axis=0)
[docs] @staticmethod def assign_params_from_flat(x, params): """ assign params from flat input :param x: :param params: :return: group """ flat_size = lambda p: int(np.prod(p.shape.as_list())) # the 'int' is important for scalars splits = tf.split(x, [flat_size(p) for p in params]) new_params = [tf.reshape(p_new, p.shape) for p, p_new in zip(params, splits)] return tf.group([p.assign(p_new) for p, p_new in zip(params, new_params)])
[docs] def get_action(self, s): """ Choose action :param s: state :return: clipped act """ return self.actor([s])[0].numpy()
[docs] def get_action_greedy(self, s): """ Choose action :param s: state :return: clipped act """ return self.actor([s], greedy=True)[0].numpy()
[docs] def cal_adv(self, tfs, tfdc_r): """ Calculate advantage :param tfs: state :param tfdc_r: cumulative reward :return: advantage """ tfdc_r = np.array(tfdc_r, dtype=np.float32) advantage = tfdc_r - self.critic(tfs) return advantage.numpy()
[docs] def get_v(self, s): """ Compute value :param s: state :return: value """ if s.ndim < 2: s = s[np.newaxis, :] res = self.critic(s)[0, 0] return res
[docs] def c_train(self, tfdc_r, s): """ Update actor network :param tfdc_r: cumulative reward :param s: state :return: None """ tfdc_r = np.array(tfdc_r, dtype=np.float32) with tf.GradientTape() as tape: v = self.critic(s) advantage = tfdc_r - v closs = tf.reduce_mean(tf.square(advantage)) grad = tape.gradient(closs, self.critic.trainable_weights) self.critic_opt.apply_gradients(zip(grad, self.critic.trainable_weights))
[docs] def save_ckpt(self, env_name): """ save trained weights :return: None """ save_model(self.actor, 'actor', self.name, env_name) save_model(self.critic, 'critic', self.name, env_name)
[docs] def load_ckpt(self, env_name): """ load trained weights :return: None """ load_model(self.actor, 'actor', self.name, env_name) load_model(self.critic, 'critic', self.name, env_name)
# TRPO losses
[docs] def pi_loss(self, inputs): """ calculate pi loss :param inputs: a list of x_ph, a_ph, adv_ph, ret_ph, logp_old_ph and other inputs :return: pi loss """ x_ph, a_ph, adv_ph, ret_ph, logp_old_ph, *info_values = inputs pi, logp, logp_pi, info, info_phs, d_kl = self.actor.cal_outputs_1(x_ph, a_ph, *info_values) ratio = tf.exp(logp - logp_old_ph) # pi(a|s) / pi_old(a|s) pi_loss = -tf.reduce_mean(ratio * adv_ph) return pi_loss
# Symbols needed for CG solver
[docs] def gradient(self, inputs): """ pi gradients :param inputs: a list of x_ph, a_ph, adv_ph, ret_ph, logp_old_ph and other inputs :return: gradient """ pi_params = self.actor.trainable_weights with tf.GradientTape() as tape: loss = self.pi_loss(inputs) grad = tape.gradient(loss, pi_params) grad = self.flat_concat(grad) return grad
# Symbols for getting and setting params
[docs] def get_pi_params(self): """ get actor trainable parameters :return: flat actor trainable parameters """ pi_params = self.actor.trainable_weights return self.flat_concat(pi_params)
[docs] def set_pi_params(self, v_ph): """ set actor trainable parameters :param v_ph: inputs :return: None """ pi_params = self.actor.trainable_weights self.assign_params_from_flat(v_ph, pi_params)
[docs] def cg(self, Ax, b): """ Conjugate gradient algorithm (see https://en.wikipedia.org/wiki/Conjugate_gradient_method) """ x = np.zeros_like(b) r = copy.deepcopy(b) # Note: should be 'b - Ax(x)', but for x=0, Ax(x)=0. Change if doing warm start. p = copy.deepcopy(r) r_dot_old = np.dot(r, r) for _ in range(self.cg_iters): z = Ax(p) alpha = r_dot_old / (np.dot(p, z) + EPS) x += alpha * p r -= alpha * z r_dot_new = np.dot(r, r) p = r + (r_dot_new / r_dot_old) * p r_dot_old = r_dot_new return x
[docs] def eval(self, bs, ba, badv, oldpi_prob): _ = self.actor(bs) pi_prob = tf.exp(self.actor.policy_dist.logp(ba)) ratio = pi_prob / (oldpi_prob + EPS) surr = ratio * badv aloss = -tf.reduce_mean(surr) kl = self.old_dist.kl(self.actor.policy_dist.param) # kl = tfp.distributions.kl_divergence(oldpi, pi) kl = tf.reduce_mean(kl) return aloss, kl
[docs] def a_train(self, s, a, adv, oldpi_prob, backtrack_iters, backtrack_coeff): s = np.array(s) a = np.array(a, np.float32) adv = np.array(adv, np.float32) with tf.GradientTape() as tape: aloss, kl = self.eval(s, a, adv, oldpi_prob) a_grad = tape.gradient(aloss, self.actor.trainable_weights) # print(a_grad) a_grad = self.flat_concat(a_grad) pi_l_old = aloss # print(a_grad) Hx = lambda x: self.hessian_vector_product(s, a, adv, oldpi_prob, x) x = self.cg(Hx, a_grad) alpha = np.sqrt(2 * self.delta / (np.dot(x, Hx(x)) + EPS)) old_params = self.flat_concat(self.actor.trainable_weights) for j in range(backtrack_iters): self.set_pi_params(old_params - alpha * x * backtrack_coeff ** j) kl, pi_l_new = self.eval(s, a, adv, oldpi_prob) if kl <= self.delta and pi_l_new <= pi_l_old: # Accepting new params at step j of line search. break if j == backtrack_iters - 1: # Line search failed! Keeping old params. self.set_pi_params(old_params)
[docs] def hessian_vector_product(self, s, a, adv, oldpi_prob, v_ph): # for H = grad**2 f, compute Hx params = self.actor.trainable_weights with tf.GradientTape() as tape1: with tf.GradientTape() as tape0: aloss, kl = self.eval(s, a, adv, oldpi_prob) g = tape0.gradient(kl, params) g = self.flat_concat(g) assert v_ph.shape == g.shape v = tf.reduce_sum(g * v_ph) grad = tape1.gradient(v, params) hvp = self.flat_concat(grad) if self.damping_coeff > 0: hvp += self.damping_coeff * v_ph return hvp
[docs] def update(self, bs, ba, br, train_critic_iters, backtrack_iters, backtrack_coeff): """ update trpo :return: None """ adv = self.cal_adv(bs, br) _ = self.actor(bs) oldpi_prob = tf.exp(self.actor.policy_dist.logp(ba)) oldpi_prob = tf.stop_gradient(oldpi_prob) oldpi_param = self.actor.policy_dist.get_param() self.old_dist.set_param(oldpi_param) self.a_train(bs, ba, adv, oldpi_prob, backtrack_iters, backtrack_coeff) for _ in range(train_critic_iters): self.c_train(br, bs)
[docs] def learn(self, env, train_episodes=200, test_episodes=100, max_steps=200, save_interval=10, gamma=0.9, mode='train', render=False, batch_size=32, backtrack_iters=10, backtrack_coeff=0.8, train_critic_iters=80, plot_func=None): """ learn function :param env: learning environment :param train_episodes: total number of episodes for training :param test_episodes: total number of episodes for testing :param max_steps: maximum number of steps for one episode :param save_interval: time steps for saving :param gamma: reward discount factor :param mode: train or test :param render: render each step :param batch_size: update batch size :param backtrack_iters: Maximum number of steps allowed in the backtracking line search :param backtrack_coeff: How far back to step during backtracking line search :param train_critic_iters: critic update iteration steps :return: None """ t0 = time.time() if mode == 'train': print('Training... | Algorithm: {} | Environment: {}'.format(self.name, env.spec.id)) reward_buffer = [] for ep in range(1, train_episodes + 1): s = env.reset() buffer_s, buffer_a, buffer_r = [], [], [] ep_rs_sum = 0 for t in range(max_steps): # in one episode if render: env.render() a = self.get_action(s) s_, r, done, _ = env.step(a) buffer_s.append(s) buffer_a.append(a) buffer_r.append(r) s = s_ ep_rs_sum += r # update ppo if (t + 1) % batch_size == 0 or t == max_steps - 1 or done: if done: v_s_ = 0 else: try: v_s_ = self.get_v(s_) except: v_s_ = self.get_v(s_[np.newaxis, :]) # for raw-pixel input discounted_r = [] for r in buffer_r[::-1]: v_s_ = r + gamma * v_s_ discounted_r.append(v_s_) discounted_r.reverse() bs = buffer_s ba, br = buffer_a, np.array(discounted_r)[:, np.newaxis] buffer_s, buffer_a, buffer_r = [], [], [] self.update(bs, ba, br, train_critic_iters, backtrack_iters, backtrack_coeff) if done: break print( 'Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format( ep, train_episodes, ep_rs_sum, time.time() - t0 ) ) reward_buffer.append(ep_rs_sum) if plot_func is not None: plot_func(reward_buffer) if ep and not ep % save_interval: self.save_ckpt(env_name=env.spec.id) plot_save_log(reward_buffer, self.name, env.spec.id) self.save_ckpt(env_name=env.spec.id) plot_save_log(reward_buffer, self.name, env.spec.id) # test elif mode == 'test': self.load_ckpt(env_name=env.spec.id) print('Testing... | Algorithm: {} | Environment: {}'.format(self.name, env.spec.id)) reward_buffer = [] for eps in range(test_episodes): ep_rs_sum = 0 s = env.reset() for step in range(max_steps): if render: env.render() action = self.get_action_greedy(s) s, reward, done, info = env.step(action) ep_rs_sum += reward if done: break print('Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format( eps, test_episodes, ep_rs_sum, time.time() - t0) ) reward_buffer.append(ep_rs_sum) if plot_func: plot_func(reward_buffer) else: print('unknown mode type')