"""
Deep Deterministic Policy Gradient (DDPG)
-----------------------------------------
An algorithm concurrently learns a Q-function and a policy.
It uses off-policy data and the Bellman equation to learn the Q-function,
and uses the Q-function to learn the policy.
Reference
---------
Deterministic Policy Gradient Algorithms, Silver et al. 2014
Continuous Control With Deep Reinforcement Learning, Lillicrap et al. 2016
MorvanZhou's tutorial page: https://morvanzhou.github.io/tutorials/
MorvanZhou's code: https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow/
Prerequisites
-------------
tensorflow >=2.0.0a0
tensorflow-probability 0.6.0
tensorlayer >=2.0.0
"""
import time
from rlzoo.common.utils import *
from rlzoo.common.buffer import *
from rlzoo.common.policy_networks import *
from rlzoo.common.value_networks import *
############################### DDPG ####################################
[docs]class DDPG(object):
"""
DDPG class
"""
def __init__(self, net_list, optimizers_list, replay_buffer_size, action_range=1., tau=0.01):
"""
:param net_list: a list of networks (value and policy) used in the algorithm, from common functions or customization
:param optimizers_list: a list of optimizers for all networks and differentiable variables
:param replay_buffer_size: the size of buffer for storing explored samples
:param tau: soft update factor
"""
assert len(net_list) == 4
assert len(optimizers_list) == 2
self.name = 'DDPG'
self.critic, self.critic_target, self.actor, self.actor_target = net_list
assert isinstance(self.critic, QNetwork)
assert isinstance(self.critic_target, QNetwork)
assert isinstance(self.actor, DeterministicPolicyNetwork)
assert isinstance(self.actor_target, DeterministicPolicyNetwork)
assert isinstance(self.actor.action_space, gym.spaces.Box)
def copy_para(from_model, to_model):
for i, j in zip(from_model.trainable_weights, to_model.trainable_weights):
j.assign(i)
copy_para(self.actor, self.actor_target)
copy_para(self.critic, self.critic_target)
self.replay_buffer_size = replay_buffer_size
self.buffer = ReplayBuffer(replay_buffer_size)
self.ema = tf.train.ExponentialMovingAverage(decay=1 - tau) # soft replacement
self.action_range = action_range
self.critic_opt, self.actor_opt = optimizers_list
[docs] def ema_update(self):
"""
Soft updating by exponential smoothing
:return: None
"""
paras = self.actor.trainable_weights + self.critic.trainable_weights
self.ema.apply(paras)
for i, j in zip(self.actor_target.trainable_weights + self.critic_target.trainable_weights, paras):
i.assign(self.ema.average(j))
[docs] def sample_action(self):
""" generate random actions for exploration """
a = tf.random.uniform(self.actor.action_space.shape, self.actor.action_space.low, self.actor.action_space.high)
return a
[docs] def get_action(self, s, noise_scale):
"""
Choose action with exploration
:param s: state
:return: action
"""
a = self.actor([s])[0].numpy()*self.action_range
# add randomness to action selection for exploration
noise = np.random.normal(0, 1, a.shape) * noise_scale
a += noise
a = np.clip(a, self.actor.action_space.low, self.actor.action_space.high)
return a
[docs] def get_action_greedy(self, s):
"""
Choose action
:param s: state
:return: action
"""
return self.actor([s])[0].numpy()*self.action_range
[docs] def update(self, batch_size, gamma):
"""
Update parameters
:param batch_size: update batch size
:param gamma: reward decay factor
:return:
"""
bs, ba, br, bs_, bd = self.buffer.sample(batch_size)
ba_ = self.actor_target(bs_)*self.action_range
q_ = self.critic_target([bs_, ba_])
y = br + (1 - bd) * gamma * q_
with tf.GradientTape() as tape:
q = self.critic([bs, ba])
td_error = tf.losses.mean_squared_error(y, q)
c_grads = tape.gradient(td_error, self.critic.trainable_weights)
self.critic_opt.apply_gradients(zip(c_grads, self.critic.trainable_weights))
with tf.GradientTape() as tape:
a = self.actor(bs)*self.action_range
q = self.critic([bs, a])
a_loss = - tf.reduce_mean(q) # maximize the q
a_grads = tape.gradient(a_loss, self.actor.trainable_weights)
self.actor_opt.apply_gradients(zip(a_grads, self.actor.trainable_weights))
self.ema_update()
[docs] def store_transition(self, s, a, r, s_, d):
"""
Store data in data buffer
:param s: state
:param a: act
:param r: reward
:param s_: next state
:return: None
"""
d = 1 if d else 0
self.buffer.push(s, a, [r], s_, d)
[docs] def save_ckpt(self, env_name):
"""
save trained weights
:return: None
"""
save_model(self.actor, 'model_policy_net', self.name, env_name)
save_model(self.actor_target, 'model_target_policy_net', self.name, env_name)
save_model(self.critic, 'model_q_net', self.name, env_name)
save_model(self.critic_target, 'model_target_q_net', self.name, env_name)
[docs] def load_ckpt(self, env_name):
"""
load trained weights
:return: None
"""
load_model(self.actor, 'model_policy_net', self.name, env_name)
load_model(self.actor_target, 'model_target_policy_net', self.name, env_name)
load_model(self.critic, 'model_q_net', self.name, env_name)
load_model(self.critic_target, 'model_target_q_net', self.name, env_name)
[docs] def learn(self, env, train_episodes=200, test_episodes=100, max_steps=200, save_interval=10, explore_steps=500,
mode='train', render=False, batch_size=32, gamma=0.9, noise_scale=1., noise_scale_decay=0.995,
plot_func=None):
"""
learn function
:param env: learning environment
:param train_episodes: total number of episodes for training
:param test_episodes: total number of episodes for testing
:param max_steps: maximum number of steps for one episode
:param save_interval: time steps for saving
:param explore_steps: for random action sampling in the beginning of training
:param mode: train or test mode
:param render: render each step
:param batch_size: update batch size
:param gamma: reward decay factor
:param noise_scale: range of action noise for exploration
:param noise_scale_decay: noise scale decay factor
:param plot_func: additional function for interactive module
:return: None
"""
t0 = time.time()
if mode == 'train': # train
print('Training... | Algorithm: {} | Environment: {}'.format(self.name, env.spec.id))
reward_buffer = []
frame_idx = 0
for i in range(1, train_episodes + 1):
s = env.reset()
ep_reward = 0
for j in range(max_steps):
if render:
env.render()
# Add exploration noise
if frame_idx > explore_steps:
a = self.get_action(s, noise_scale)
else:
a = self.sample_action()
frame_idx += 1
s_, r, done, info = env.step(a)
self.store_transition(s, a, r, s_, done)
if len(self.buffer) >= self.replay_buffer_size:
self.update(batch_size, gamma)
noise_scale *= noise_scale_decay
s = s_
ep_reward += r
if done:
break
print(
'Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format(
i, train_episodes, ep_reward,
time.time() - t0
)
)
reward_buffer.append(ep_reward)
if plot_func is not None:
plot_func(reward_buffer)
if i and not i % save_interval:
self.save_ckpt(env_name=env.spec.id)
plot_save_log(reward_buffer, algorithm_name=self.name, env_name=env.spec.id)
self.save_ckpt(env_name=env.spec.id)
plot_save_log(reward_buffer, algorithm_name=self.name, env_name=env.spec.id)
# test
elif mode == 'test':
self.load_ckpt(env_name=env.spec.id)
print('Testing... | Algorithm: {} | Environment: {}'.format(self.name, env.spec.id))
reward_buffer = []
for eps in range(1, test_episodes+1):
ep_rs_sum = 0
s = env.reset()
for step in range(max_steps):
if render:
env.render()
action = self.get_action_greedy(s)
s, reward, done, info = env.step(action)
ep_rs_sum += reward
if done:
break
print('Episode: {}/{} | Episode Reward: {:.4f} | Running Time: {:.4f}'.format(
eps, test_episodes, ep_rs_sum, time.time() - t0)
)
reward_buffer.append(ep_rs_sum)
if plot_func:
plot_func(reward_buffer)
else:
print('unknown mode type')