Source code for rlzoo.common.value_networks

"""
Functions for utilization.

# Requirements
tensorflow==2.0.0a0
tensorlayer==2.0.1

"""
import copy

import numpy as np
import tensorlayer as tl
from tensorlayer.layers import BatchNorm, Dense, Input
from tensorlayer.models import Model

from rlzoo.common.basic_nets import *


[docs]class ValueNetwork(Model):
[docs] def __init__(self, state_space, hidden_dim_list, w_init=tf.keras.initializers.glorot_normal(), activation=tf.nn.relu, output_activation=None, trainable=True, name=None): """ Value network with multiple fully-connected layers or convolutional layers (according to state shape) :param state_space: (gym.spaces) space of the state from gym environments :param hidden_dim_list: (list[int]) a list of dimensions of hidden layers :param w_init: (callable) weights initialization :param activation: (callable) activation function :param output_activation: (callable or None) output activation function :param trainable: (bool) set training and evaluation mode """ self._state_space = state_space obs_inputs, current_layer, self._state_shape = CreateInputLayer(state_space) if isinstance(state_space, spaces.Dict): assert isinstance(obs_inputs, OrderedDict) assert isinstance(current_layer, OrderedDict) self.input_dict = obs_inputs obs_inputs = list(obs_inputs.values()) current_layer = tl.layers.Concat(-1)(list(current_layer.values())) with tf.name_scope('MLP'): for i, dim in enumerate(hidden_dim_list): current_layer = Dense(n_units=dim, act=activation, W_init=w_init, name='hidden_layer%d' % (i + 1))( current_layer) with tf.name_scope('Output'): outputs = Dense(n_units=1, act=output_activation, W_init=w_init)(current_layer) super().__init__(inputs=obs_inputs, outputs=outputs, name=name) if trainable: self.train() else: self.eval()
[docs] def __call__(self, states, *args, **kwargs): if isinstance(self._state_space, spaces.Dict): states = np.array(states).transpose([1, 0]).tolist() else: if np.shape(states)[1:] != self.state_shape: raise ValueError( 'Input state shape error. Shape can be {} but your shape is {}'.format((None,) + self.state_shape, np.shape(states))) states = np.array(states, dtype=np.float32) return super().__call__(states, *args, **kwargs)
@property def state_space(self): return copy.deepcopy(self._state_space) @property def state_shape(self): return copy.deepcopy(self._state_shape)
[docs]class MlpQNetwork(Model):
[docs] def __init__(self, state_shape, action_shape, hidden_dim_list, \ w_init=tf.keras.initializers.glorot_normal(), activation=tf.nn.relu, output_activation=None, trainable=True): """ Q-value network with multiple fully-connected layers Inputs: (state tensor, action tensor) :param state_shape: (tuple[int]) shape of the state, for example, (state_dim, ) for single-dimensional state :param action_shape: (tuple[int]) shape of the action, for example, (action_dim, ) for single-dimensional action :param hidden_dim_list: (list[int]) a list of dimensions of hidden layers :param w_init: (callable) weights initialization :param activation: (callable) activation function :param output_activation: (callable or None) output activation function :param trainable: (bool) set training and evaluation mode """ input_shape = tuple(map(sum, zip(action_shape, state_shape))) input_dim = input_shape[0] assert len(state_shape) == 1 with tf.name_scope('MLP'): inputs, l = MLP(input_dim, hidden_dim_list, w_init, activation) with tf.name_scope('Output'): outputs = Dense(n_units=1, act=output_activation, W_init=w_init)(l) super().__init__(inputs=inputs, outputs=outputs) if trainable: self.train() else: self.eval()
[docs]class QNetwork(Model):
[docs] def __init__(self, state_space, action_space, hidden_dim_list, w_init=tf.keras.initializers.glorot_normal(), activation=tf.nn.relu, output_activation=None, trainable=True, name=None, state_only=False, dueling=False): """ Q-value network with multiple fully-connected layers or convolutional layers (according to state shape) :param state_space: (gym.spaces) space of the state from gym environments :param action_space: (gym.spaces) space of the action from gym environments :param hidden_dim_list: (list[int]) a list of dimensions of hidden layers :param w_init: (callable) weights initialization :param activation: (callable) activation function :param output_activation: (callable or None) output activation function :param trainable: (bool) set training and evaluation mode :param name: (str) name the model :param state_only: (bool) only input state or not, available in discrete action space :param dueling: (bool) whether use the dueling output or not, available in discrete action space """ self._state_space, self._action_space = state_space, action_space self.state_only = state_only self.dueling = dueling # create state input layer obs_inputs, current_layer, self._state_shape = CreateInputLayer(state_space) # create action input layer if isinstance(self._action_space, spaces.Discrete): self._action_shape = self._action_space.n, if not self.state_only: act_inputs = Input((None,), name='Act_Input_Layer', dtype=tf.int64) elif isinstance(self._action_space, spaces.Box): self._action_shape = self._action_space.shape assert len(self._action_shape) == 1 act_inputs = Input((None,) + self._action_shape, name='Act_Input_Layer') else: raise NotImplementedError # concat multi-head state if isinstance(state_space, spaces.Dict): assert isinstance(obs_inputs, dict) assert isinstance(current_layer, dict) self.input_dict = obs_inputs obs_inputs = list(obs_inputs.values()) current_layer = tl.layers.Concat(-1)(list(current_layer.values())) if isinstance(self._action_space, spaces.Box): current_layer = tl.layers.Concat(-1)([current_layer, act_inputs]) with tf.name_scope('QNet_MLP'): for i, dim in enumerate(hidden_dim_list): current_layer = Dense(n_units=dim, act=activation, W_init=w_init, name='mlp_hidden_layer%d' % (i + 1))(current_layer) with tf.name_scope('Outputs'): if isinstance(self._action_space, spaces.Discrete): if self.dueling: v = Dense(1, None, tf.initializers.Orthogonal(1.0))(current_layer) q = Dense(n_units=self._action_shape[0], act=output_activation, W_init=w_init)( current_layer) mean_q = tl.layers.Lambda(lambda x: tf.reduce_mean(x, 1, True))(q) current_layer = tl.layers.Lambda(lambda x: x[0] + x[1] - x[2])((v, q, mean_q)) else: current_layer = Dense(n_units=self._action_shape[0], act=output_activation, W_init=w_init)( current_layer) if not self.state_only: act_one_hot = tl.layers.OneHot(depth=self._action_shape[0], axis=1)( act_inputs) # discrete action choice to one-hot vector outputs = tl.layers.Lambda( lambda x: tf.reduce_sum(tf.reduce_prod(x, axis=0), axis=1))((current_layer, act_one_hot)) else: outputs = current_layer elif isinstance(self._action_space, spaces.Box): outputs = Dense(n_units=1, act=output_activation, W_init=w_init)(current_layer) else: raise ValueError("State Shape Not Accepted!") if isinstance(state_space, spaces.Dict): if self.state_only: super().__init__(inputs=obs_inputs, outputs=outputs, name=name) else: super().__init__(inputs=obs_inputs + [act_inputs], outputs=outputs, name=name) else: if self.state_only: super().__init__(inputs=obs_inputs, outputs=outputs, name=name) else: super().__init__(inputs=[obs_inputs, act_inputs], outputs=outputs, name=name) if trainable: self.train() else: self.eval()
[docs] def __call__(self, inputs, *args, **kwargs): if self.state_only: states = inputs else: states, actions = inputs # states and actions must have the same length if not self.state_only and len(states) != len(actions): raise ValueError( 'Length of states and actions not match. States length is {} but actions length is {}'.format( len(states), len(actions))) if isinstance(self._state_space, spaces.Dict): states = np.array(states).transpose([1, 0]).tolist() # batch states to multi-head ssv = list(self._state_shape.values()) # check state shape for i, each_head in enumerate(states): if np.shape(each_head)[1:] != ssv[i]: raise ValueError('Input state shape error.') else: if np.shape(states)[1:] != self.state_shape: raise ValueError( 'Input state shape error. Shape can be {} but your shape is {}'.format((None,) + self.state_shape, np.shape(states))) states = np.array(states, dtype=np.float32) if not self.state_only: if isinstance(self._action_space, spaces.Discrete) and np.any(actions % 1): raise ValueError('Input float actions in discrete action space') if isinstance(self._action_space, spaces.Discrete): actions = tf.convert_to_tensor(actions, dtype=tf.int64) elif isinstance(self._action_space, spaces.Box): actions = tf.convert_to_tensor(actions, dtype=tf.float32) if isinstance(self._state_space, spaces.Dict): return super().__call__(states + [actions], *args, **kwargs) else: return super().__call__([states, actions], *args, **kwargs) else: return super().__call__(states, *args, **kwargs)
@property def state_space(self): return copy.deepcopy(self._state_space) @property def action_space(self): return copy.deepcopy(self._action_space) @property def state_shape(self): return copy.deepcopy(self._state_shape) @property def action_shape(self): return copy.deepcopy(self._action_shape)
class NAFLayer(tl.layers.Layer): def __init__(self, action_dim, name=None): super(NAFLayer, self).__init__(name) self.action_dim = action_dim def forward(self, inputs): L, u, mu, value = inputs pivot = 0 rows = [] for idx in range(self.action_dim): offset = self.action_dim - idx diag = tf.exp(tf.slice(L, (0, pivot), (-1, 1))) nondiag = tf.slice(L, (0, pivot + 1), (-1, offset - 1)) row = tf.pad(tf.concat([diag, nondiag], 1), ((0, 0), (idx, 0))) pivot += offset rows.append(row) L_T = tf.stack(rows, axis=1) P = tf.matmul(tf.transpose(L_T, (0, 2, 1)), L_T) # L L^T temp = tf.expand_dims(u - mu, -1) adv = tf.squeeze(-0.5 * tf.matmul(tf.transpose(temp, [0, 2, 1]), tf.matmul(P, temp)), -1) return adv + value def build(self, inputs_shape=None): pass class NAFQNetwork(Model): def __init__(self, state_space, action_space, hidden_dim_list, w_init=tf.keras.initializers.glorot_normal(), activation=tf.nn.tanh, trainable=True, name=None): """ NAF Q-value network with multiple fully-connected layers :param state_space: (gym.spaces) space of the state from gym environments :param action_space: (gym.spaces) space of the action from gym environments :param hidden_dim_list: (list[int]) a list of dimensions of hidden layers :param w_init: (callable) weights initialization :param activation: (callable) activation function :param trainable: (bool) set training and evaluation mode :param name: (str) name the model """ assert isinstance(action_space, spaces.Box) self._state_space, self._action_space = state_space, action_space self._action_shape = self._action_space.shape assert len(self._action_shape) == 1 act_inputs = Input((None,) + self._action_shape, name='Act_Input_Layer') # create state input layer obs_inputs, current_layer, self._state_shape = CreateInputLayer(state_space) # concat multi-head state if isinstance(state_space, spaces.Dict): assert isinstance(obs_inputs, dict) assert isinstance(current_layer, dict) self.input_dict = obs_inputs obs_inputs = list(obs_inputs.values()) current_layer = tl.layers.Concat(-1)(list(current_layer.values())) # calculate value current_layer = BatchNorm()(current_layer) with tf.name_scope('NAF_VALUE_MLP'): for i, dim in enumerate(hidden_dim_list): current_layer = Dense(n_units=dim, act=activation, W_init=w_init, name='mlp_hidden_layer%d' % (i + 1))(current_layer) value = Dense(n_units=1, W_init=w_init, name='naf_value_mlp_output')(current_layer) # calculate advantange and Q-value dim = self._action_shape[0] with tf.name_scope('NAF_ADVANTAGE'): mu = Dense(n_units=dim, act=activation, W_init=w_init, name='mu')(current_layer) L = Dense(n_units=int((dim * (dim + 1)) / 2), W_init=w_init, name='L')(current_layer) qvalue = NAFLayer(dim)([L, act_inputs, mu, value]) super().__init__(inputs=[obs_inputs, act_inputs], outputs=qvalue, name=name) if trainable: self.train() else: self.eval() def __call__(self, inputs, *args, **kwargs): states, actions = inputs # states and actions must have the same length if len(states) != len(actions): raise ValueError( 'Length of states and actions not match. States length is {} but actions length is {}'.format( len(states), len(actions))) if isinstance(self._state_space, spaces.Dict): states = np.array(states).transpose([1, 0]).tolist() # batch states to multi-head ssv = list(self._state_shape.values()) # check state shape for i, each_head in enumerate(states): if np.shape(each_head)[1:] != ssv[i]: raise ValueError('Input state shape error.') else: if np.shape(states)[1:] != self.state_shape: raise ValueError( 'Input state shape error. Shape can be {} but your shape is {}'.format((None,) + self.state_shape, np.shape(states))) states = np.array(states, dtype=np.float32) actions = tf.convert_to_tensor(actions, dtype=tf.float32) if isinstance(self._state_space, spaces.Dict): return super().__call__(states + [actions], *args, **kwargs) else: return super().__call__([states, actions], *args, **kwargs) @property def state_space(self): return copy.deepcopy(self._state_space) @property def action_space(self): return copy.deepcopy(self._action_space) @property def state_shape(self): return copy.deepcopy(self._state_shape) @property def action_shape(self): return copy.deepcopy(self._action_shape)