Source code for mindspore.nn.probability.distribution.transformed_distribution

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Transformed Distribution"""
import numpy as np
from mindspore._checkparam import Validator as validator
from mindspore.ops import operations as P
from mindspore.common import dtype as mstype
import mindspore.nn as nn
from .distribution import Distribution
from ._utils.utils import raise_not_impl_error
from ._utils.custom_ops import exp_generic, log_generic


[docs]class TransformedDistribution(Distribution): """ Transformed Distribution. This class contains a bijector and a distribution and transforms the original distribution to a new distribution through the operation defined by the bijector. Args: bijector (Bijector): The transformation to perform. distribution (Distribution): The original distribution. Must has a float dtype. seed (int): The seed is used in sampling. The global seed is used if it is None. Default:None. If this seed is given when a TransformedDistribution object is initialized, the object's sampling function will use this seed; elsewise, the underlying distribution's seed will be used. name (str): The name of the transformed distribution. Default: 'transformed_distribution'. Supported Platforms: ``Ascend`` ``GPU`` Note: The arguments used to initialize the original distribution cannot be None. For example, mynormal = msd.Normal(dtype=mindspore.float32) cannot be used to initialized a TransformedDistribution since `mean` and `sd` are not specified. `batch_shape` is the batch_shape of the original distribution. `broadcast_shape` is the broadcast shape between the original distribution and bijector. `is_scalar_batch` is only true if both the original distribution and the bijector are scalar batches. `default_parameters`, `parameter_names` and `parameter_type` are set to be consistent with the original distribution. Derived class can overwrite `default_parameters` and `parameter_names` by calling `reset_parameters` followed by `add_parameter`. Examples: >>> import numpy as np >>> import mindspore >>> import mindspore.nn as nn >>> import mindspore.nn.probability.distribution as msd >>> import mindspore.nn.probability.bijector as msb >>> from mindspore import Tensor >>> class Net(nn.Cell): ... def __init__(self, shape, dtype=mindspore.float32, seed=0, name='transformed_distribution'): ... super(Net, self).__init__() ... # create TransformedDistribution distribution ... self.exp = msb.Exp() ... self.normal = msd.Normal(0.0, 1.0, dtype=dtype) ... self.lognormal = msd.TransformedDistribution(self.exp, self.normal, seed=seed, name=name) ... self.shape = shape ... ... def construct(self, value): ... cdf = self.lognormal.cdf(value) ... sample = self.lognormal.sample(self.shape) ... return cdf, sample >>> shape = (2, 3) >>> net = Net(shape=shape, name="LogNormal") >>> x = np.array([2.0, 3.0, 4.0, 5.0]).astype(np.float32) >>> tx = Tensor(x, dtype=mindspore.float32) >>> cdf, sample = net(tx) >>> print(sample.shape) (2, 3) """ def __init__(self, bijector, distribution, seed=None, name="transformed_distribution"): """ Constructor of transformed_distribution class. """ param = dict(locals()) validator.check_value_type('bijector', bijector, [nn.probability.bijector.Bijector], type(self).__name__) validator.check_value_type('distribution', distribution, [Distribution], type(self).__name__) validator.check_type_name("dtype", distribution.dtype, mstype.float_type, type(self).__name__) super(TransformedDistribution, self).__init__(seed, distribution.dtype, name, param) self._bijector = bijector self._distribution = distribution # set attributes self._is_linear_transformation = self.bijector.is_constant_jacobian self._dtype = self.distribution.dtype self._is_scalar_batch = self.distribution.is_scalar_batch and self.bijector.is_scalar_batch self._batch_shape = self.distribution.batch_shape self.default_parameters = self.distribution.default_parameters self.parameter_names = self.distribution.parameter_names # by default, set the parameter_type to be the distribution's parameter_type self.parameter_type = self.distribution.parameter_type self.exp = exp_generic self.log = log_generic self.isnan = P.IsNan() self.cast_base = P.Cast() self.equal_base = P.Equal() self.select_base = P.Select() self.fill_base = P.Fill() # broadcast bijector batch_shape and distribution batch_shape self._broadcast_shape = self._broadcast_bijector_dist() @property def bijector(self): return self._bijector @property def distribution(self): return self._distribution @property def dtype(self): return self._dtype @property def is_linear_transformation(self): return self._is_linear_transformation def _broadcast_bijector_dist(self): """ check if the batch shape of base distribution and the bijector is broadcastable. """ if self.batch_shape is None or self.bijector.batch_shape is None: return None bijector_shape_tensor = self.fill_base(self.dtype, self.bijector.batch_shape, 0.0) dist_shape_tensor = self.fill_base(self.dtype, self.batch_shape, 0.0) return (bijector_shape_tensor + dist_shape_tensor).shape def _cdf(self, value, *args, **kwargs): r""" .. math:: Y = g(X) P(Y <= a) = P(X <= g^{-1}(a)) """ inverse_value = self.bijector("inverse", value) return self.distribution("cdf", inverse_value, *args, **kwargs) def _log_cdf(self, value, *args, **kwargs): return self.log(self._cdf(value, *args, **kwargs)) def _survival_function(self, value, *args, **kwargs): return 1.0 - self._cdf(value, *args, **kwargs) def _log_survival(self, value, *args, **kwargs): return self.log(self._survival_function(value, *args, **kwargs)) def _log_prob(self, value, *args, **kwargs): r""" .. math:: Y = g(X) Py(a) = Px(g^{-1}(a)) * (g^{-1})'(a) \log(Py(a)) = \log(Px(g^{-1}(a))) + \log((g^{-1})'(a)) """ inverse_value = self.bijector("inverse", value) unadjust_prob = self.distribution("log_prob", inverse_value, *args, **kwargs) log_jacobian = self.bijector("inverse_log_jacobian", value) isneginf = self.equal_base(unadjust_prob, -np.inf) isnan = self.equal_base(unadjust_prob + log_jacobian, np.nan) return self.select_base(isneginf, self.select_base(isnan, unadjust_prob + log_jacobian, unadjust_prob), unadjust_prob + log_jacobian) def _prob(self, value, *args, **kwargs): return self.exp(self._log_prob(value, *args, **kwargs)) def _sample(self, *args, **kwargs): org_sample = self.distribution("sample", *args, **kwargs) return self.bijector("forward", org_sample) def _mean(self, *args, **kwargs): """ Note: This function maybe overridden by derived class. """ if not self.is_linear_transformation: raise_not_impl_error("mean") return self.bijector("forward", self.distribution("mean", *args, **kwargs))