Source code for rlzoo.common.distributions

"""Definition of parametrized distributions. Adapted from openai/baselines"""
import copy
from functools import wraps

import numpy as np
import tensorflow as tf
from gym import spaces

[docs]def expand_dims(func): @wraps(func) def wrapper(*args, **kwargs): result = func(*args, **kwargs) result = tf.expand_dims(result, axis=-1) return result return wrapper
[docs]class Distribution(object): """A particular probability distribution"""
[docs] def set_param(self, *args, **kwargs): raise NotImplementedError
[docs] def sample(self, *args, **kwargs): """Sampling from distribution. Allow explore parameters.""" raise NotImplementedError
[docs] def logp(self, x): """Calculate log probability of a sample.""" return -self.neglogp(x)
[docs] def neglogp(self, x): """Calculate negative log probability of a sample.""" raise NotImplementedError
[docs] def kl(self, *parameters): """Calculate Kullback–Leibler divergence""" raise NotImplementedError
[docs] def entropy(self): """Calculate the entropy of distribution.""" raise NotImplementedError
[docs]class Categorical(Distribution): """Creates a categorical distribution""" def __init__(self, ndim, logits=None): """ Args: ndim (int): total number of actions logits (tensor): logits variables """ self._ndim = ndim self._logits = logits self.param = self._logits @property def ndim(self): return copy.copy(self._ndim)
[docs] def set_param(self, logits): """ Args: logits (tensor): logits variables to set """ self._logits = logits self.param = self._logits
[docs] def get_param(self): return copy.deepcopy(self._logits)
[docs] def sample(self): """ Sample actions from distribution, using the Gumbel-Softmax trick """ u = np.array(np.random.uniform(0, 1, size=np.shape(self._logits)), dtype=np.float32) res = tf.argmax(self._logits - tf.math.log(-tf.math.log(u)), axis=-1) return res
[docs] def greedy_sample(self): """ Get actions greedily """ _probs = tf.nn.softmax(self._logits) return tf.argmax(_probs, axis=-1)
[docs] def logp(self, x): return -self.neglogp(x)
[docs] @expand_dims def neglogp(self, x): x = np.array(x) if np.any(x % 1): raise ValueError('Input float actions in discrete action space') x = tf.convert_to_tensor(x, tf.int32) x = tf.one_hot(x, self._ndim, axis=-1) return tf.nn.softmax_cross_entropy_with_logits(x, self._logits)
[docs] @expand_dims def kl(self, logits): """ Args: logits (tensor): logits variables of another distribution """ a0 = self._logits - tf.reduce_max(self._logits, axis=-1, keepdims=True) a1 = logits - tf.reduce_max(logits, axis=-1, keepdims=True) ea0 = tf.exp(a0) ea1 = tf.exp(a1) z0 = tf.reduce_sum(ea0, axis=-1, keepdims=True) z1 = tf.reduce_sum(ea1, axis=-1, keepdims=True) p0 = ea0 / z0 return tf.reduce_sum( p0 * (a0 - tf.math.log(z0) - a1 + tf.math.log(z1)), axis=-1)
[docs] @expand_dims def entropy(self): a0 = self._logits - tf.reduce_max(self._logits, axis=-1, keepdims=True) ea0 = tf.exp(a0) z0 = tf.reduce_sum(ea0, axis=-1, keepdims=True) p0 = ea0 / z0 return tf.reduce_sum(p0 * (tf.math.log(z0) - a0), axis=-1)
[docs]class DiagGaussian(Distribution): """Creates a diagonal Gaussian distribution """ def __init__(self, ndim, mean_logstd=None): """ Args: ndim (int): the dimenstion of actions mean_logstd (tensor): mean and logstd stacked on the last axis """ self._ndim = ndim self.mean = None self.logstd = None self.std = None self.action_mean = None self.action_scale = None self.param = self.mean, self.logstd if mean_logstd is not None: self.set_param(mean_logstd) @property def ndim(self): return copy.copy(self._ndim)
[docs] def set_param(self, mean_logstd): """ Args: mean_logstd (tensor): mean and log std """ self.mean, self.logstd = mean_logstd self.std = tf.math.exp(self.logstd) self.param = self.mean, self.logstd
[docs] def get_param(self): """ Get parameters """ return copy.deepcopy(self.mean), copy.deepcopy(self.logstd)
[docs] def sample(self): """ Get actions in deterministic or stochastic manner """ return self.mean, self.std * np.random.normal(0, 1, np.shape(self.mean))
[docs] def greedy_sample(self): """ Get actions greedily/deterministically """ return self.mean
[docs] def logp(self, x): return -self.neglogp(x)
[docs] @expand_dims def neglogp(self, x): # here we reverse the action normalization to make the computation of negative log probability correct x = (x - self.action_mean)/self.action_scale return 0.5 * tf.reduce_sum(tf.square((x - self.mean) / self.std), axis=-1) \ + 0.5 * np.log(2.0 * np.pi) * float(self._ndim) + tf.reduce_sum(self.logstd, axis=-1)
[docs] @expand_dims def kl(self, mean_logstd): """ Args: mean_logstd (tensor): mean and logstd of another distribution """ mean, logstd = mean_logstd return tf.reduce_sum( logstd - self.logstd + (tf.square(self.std) + tf.square(self.mean - mean)) / (2.0 * tf.square(tf.math.exp(logstd))) - 0.5, axis=-1)
[docs] @expand_dims def entropy(self): return tf.reduce_sum( self.logstd + 0.5 * np.log(2.0 * np.pi * np.e), axis=-1)
[docs]def make_dist(ac_space): """Get distribution based on action space :param ac_space: gym.spaces.Space """ if isinstance(ac_space, spaces.Discrete): return Categorical(ac_space.n) elif isinstance(ac_space, spaces.Box): assert len(ac_space.shape) == 1 return DiagGaussian(ac_space.shape[0]) else: raise NotImplementedError