Source code for sponge.metrics.metrics

# Copyright 2021-2023 @ Shenzhen Bay Laboratory &
#                       Peking University &
#                       Huawei Technologies Co., Ltd
#
# This code is a part of MindSPONGE:
# MindSpore Simulation Package tOwards Next Generation molecular modelling.
#
# MindSPONGE is open-source software based on the AI-framework:
# MindSpore (https://www.mindspore.cn/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Metrics for collective variables
"""

from typing import Union
import numpy as np
import mindspore.common.dtype as mstype
import mindspore.communication.management as D
import mindspore.nn as nn
import mindspore.numpy as mnp

from mindspore import Parameter, Tensor
from mindspore.ops import functional as F
from mindspore.ops import operations as P
from mindspore.nn import Metric as _Metric

from ..colvar import Colvar, get_colvar
from ..function import Units


def get_metrics(metrics: Union[dict, set]) -> dict:
    """
    Get metrics used in analysis.

    Args:
        metrics (Union[dict, set]): Dict or set of Metric or Colvar to be evaluated by the model
                                    during MD running or analysis.

    Returns:
        dict, the key is metric name, the value is class instance of metric method.

    Raises:
        TypeError: If the type of argument `metrics` is not ``None``, dict or set.

    Supported Platforms:
        ``Ascend`` ``GPU``

    Examples:
        >>> from mindspore import Tensor
        >>> from sponge.colvar import Distance
        >>> from sponge.metrics import get_metrics
        >>> cv = Distance([0,1])
        >>> metric = get_metrics({"distance": cv})
        >>> coordinate = Tensor([[0.0, 0.0, 0.0], [0.0, 0.0, 1.0]])
        >>> metric.update(coordinate)
        >>> print(metric.eval())
        [1.]
    """
    if metrics is None:
        return metrics

    if isinstance(metrics, dict):
        for name, metric in metrics.items():
            if not isinstance(name, str):
                raise TypeError(f"The key in 'metrics' must be string and but got key: {type(name)}.")
            if isinstance(metric, Colvar):
                metrics[name] = MetricCV(metric)
            elif not isinstance(metric, Metric):
                raise TypeError(f"The value in 'metrics' must be Metric or Colvar, but got: {type(metric)}.")
        return metrics

    if isinstance(metrics, set):
        out_metrics = {}
        for metric in metrics:
            if not isinstance(metric, Colvar):
                raise TypeError(f"When 'metrics' is set, the type of the value in 'metrics must be Colvar, '"
                                f"but got: {type(metric)}")
            out_metrics[metric.name] = MetricCV(metric)
        return out_metrics

    raise TypeError("For 'get_metrics', the argument 'metrics' must be None, dict or set, "
                    "but got {}".format(metrics))


[docs]class Metric(_Metric): """Metric is fundamental tool used to assess the state and performance of a simulation system. Which provides a mechanism to track the changes in various physical quantities within the simulation system. The base class of Metrics defines a set of methods that are used to update the state information of the simulation system and to calculate the corresponding metrics."""
[docs] def update(self, coordinate: Tensor, pbc_box: Tensor = None, energy: Tensor = None, force: Tensor = None, potentials: Tensor = None, total_bias: Tensor = None, biases: Tensor = None, ): """ update the state information of the simulation system. Args: coordinate (Tensor): Tensor of shape (B, A, D). Data type is float. Position coordinate of atoms in system. pbc_box (Tensor, optional): Tensor of shape (B, D). Data type is float. Tensor of PBC box. Default: ``None``. energy (Tensor, optional): Tensor of shape (B, 1). Data type is float. Total energy of the simulation system. Default: ``None``. force (Tensor, optional): Tensor of shape (B, A, D). Data type is float. Force on each atoms of the simulation system. Default: ``None``. potentials (Tensor, optional): Tensor of shape (B, U). Data type is float. All potential energies. Default: ``None``. total_bias (Tensor, optional): Tensor of shape (B, 1). Data type is float. Total bias energy for reweighting. Default: ``None``. biases (Tensor, optional): Tensor of shape (B, V). Data type is float All bias potential energies. Default: ``None``. Note: - B: Batchsize, i.e. number of walkers in simulation. - A: Number of atoms of the simulation system. - D: Dimension of the space of the simulation system. Usually is 3. - U: Number of potential energies. - V: Number of bias potential energies. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> import numpy as np >>> from mindspore import Tensor >>> from sponge.metrics import Metric >>> net = Metric() """ #pylint: disable=unused-argument raise NotImplementedError
[docs]class MetricCV(Metric): """Metric for collective variables (CVs)""" def __init__(self, colvar: Colvar, ): super().__init__() self.colvar = get_colvar(colvar) self._value = None @property def shape(self) -> tuple: return self.colvar.shape @property def ndim(self) -> int: return self.colvar.ndim @property def dtype(self) -> type: return self.colvar.dtype
[docs] def get_unit(self, units: Units = None) -> str: r"""Return unit of the collective variables. Args: units (Units, optional): Units of the collective variables. Default: ``None``. """ return self.colvar.get_unit(units)
def clear(self): self._value = 0
[docs] def update(self, coordinate: Tensor, pbc_box: Tensor = None, energy: Tensor = None, force: Tensor = None, potentials: Tensor = None, total_bias: Tensor = None, biases: Tensor = None, ): """ update the state information of the system. Args: coordinate (Tensor): Tensor of shape (B, A, D). Data type is float. Position coordinate of atoms in system. pbc_box (Tensor, optional): Tensor of shape (B, D). Data type is float. Tensor of PBC box. Default: ``None``. energy (Tensor, optional): Tensor of shape (B, 1). Data type is float. Total potential energy of the simulation system. Default: ``None``. force (Tensor, optional): Tensor of shape (B, A, D). Data type is float. Force on each atoms of the simulation system. Default: ``None``. potentials (Tensor, optional): Tensor of shape (B, U). Data type is float. Original potential energies from force field. Default: ``None``. total_bias (Tensor, optional): Tensor of shape (B, 1). Data type is float. Total bias energy for reweighting. Default: ``None``. biases (Tensor, optional): Tensor of shape (B, V). Data type is float. Original bias potential energies from bias functions. Default: ``None``. Note: - B: Batchsize, i.e. number of walkers in simulation. - A: Number of atoms of the simulation system. - D: Dimension of the space of the simulation system. Usually is 3. - U: Number of potential energies. - V: Number of bias potential energies. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore import Tensor >>> from sponge.colvar import Distance >>> from sponge.metrics import MetricCV >>> cv = Distance([0,1]) >>> coordinate = Tensor([[0.0, 0.0, 0.0], [0.0, 0.0, 1.0]]) >>> metric = MetricCV(cv) >>> metric.update(coordinate) >>> print(metric.eval()) [1.] """ #pylint: disable=unused-argument colvar = self.colvar(coordinate, pbc_box) self._value = self._convert_data(colvar)
def eval(self): return self._value
class Average(Metric): """Average of collective variables (CVs)""" def __init__(self, colvar: Colvar, ): super().__init__() self.colvar = get_colvar(colvar) self._value = None self._average = None self._weights = 0 def clear(self): self._value = 0 self._weights = 0 def update(self, coordinate: Tensor, pbc_box: Tensor = None, energy: Tensor = None, force: Tensor = None, potentials: Tensor = None, total_bias: Tensor = None, biases: Tensor = None, ): """ Args: coordinate (Tensor): Tensor of shape (B, A, D). Data type is float. Position coordinate of atoms in system. pbc_box (Tensor, optional): Tensor of shape (B, D). Data type is float. Tensor of PBC box. Default: ``None``. energy (Tensor, optional): Tensor of shape (B, 1). Data type is float. Total potential energy of the simulation system. Default: ``None``. force (Tensor, optional): Tensor of shape (B, A, D). Data type is float. Force on each atoms of the simulation system. Default: ``None``. potentials (Tensor, optional): Tensor of shape (B, U). Data type is float. Original potential energies from force field. Default: ``None``. total_bias (Tensor, optional): Tensor of shape (B, 1). Data type is float. Total bias energy for reweighting. Default: ``None``. biases (Tensor, optional): Tensor of shape (B, V). Data type is float. Original bias potential energies from bias functions. Default: ``None``. Note: - B: Batchsize, i.e. number of walkers in simulation. - A: Number of atoms of the simulation system. - D: Dimension of the space of the simulation system. Usually is 3. - U: Number of potential energies. - V: Number of bias potential energies. """ #pylint: disable=unused-argument colvar = self.colvar(coordinate, pbc_box) self._average += self._convert_data(colvar) self._weights += 1 def eval(self): return self._average / self._weights
[docs]class BalancedMSE(nn.Cell): r""" Balanced MSE error Compute Balanced MSE error between the prediction and the ground truth to solve unbalanced labels in regression task. Refer to `Ren, Jiawei, et al. 'Balanced MSE for Imbalanced Visual Regression' <https://arxiv.org/abs/2203.16427>`_. .. math:: L =-\log \mathcal{N}\left(\boldsymbol{y} ; \boldsymbol{y}_{\text {pred }}, \sigma_{\text {noise }}^{2} \mathrm{I}\right) +\log \sum_{i=1}^{N} p_{\text {train }}\left(\boldsymbol{y}_{(i)}\right) \cdot \mathcal{N}\left(\boldsymbol{y}_{(i)} ; \boldsymbol{y}_{\text {pred }}, \sigma_{\text {noise }}^{2} \mathrm{I}\right) Args: first_break (float): The begin value of bin. last_break (float): The end value of bin. num_bins (int): The bin numbers. beta (float, optional): The moving average coefficient, default: ``0.99``. reducer_flag (bool, optional): Whether to aggregate the label values of multiple devices, default: ``False``. Inputs: - **prediction** (Tensor) - Predict values, shape is :math:`(batch\_size, ndim)`. - **target** (Tensor) - Label values, shape is :math:`(batch\_size, ndim)`. Outputs: Tensor, shape is :math:`(batch\_size, ndim)`. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> import numpy as np >>> from sponge.metrics import BalancedMSE >>> from mindspore import Tensor >>> net = BalancedMSE(0, 1, 20) >>> prediction = Tensor(np.random.randn(32, 10).astype(np.float32)) >>> target = Tensor(np.random.randn(32, 10).astype(np.float32)) >>> out = net(prediction, target) >>> print(out.shape) (32, 10) """ def __init__(self, first_break, last_break, num_bins, beta=0.99, reducer_flag=False): super(BalancedMSE, self).__init__() self.beta = beta self.first_break = first_break self.last_break = last_break self.num_bins = num_bins self.breaks = mnp.linspace(self.first_break, self.last_break, self.num_bins) self.width = self.breaks[1] - self.breaks[0] bin_width = 2 start_n = 1 stop = self.num_bins * 2 centers = mnp.divide(mnp.arange(start=start_n, stop=stop, step=bin_width), num_bins * 2.0) self.centers = centers/(self.last_break-self.first_break) + self.first_break self.log_noise_scale = Parameter(Tensor([0.], mstype.float32)) self.p_bins = Parameter(Tensor(np.ones((self.num_bins)) / self.num_bins, dtype=mstype.float32), \ name='p_bins', requires_grad=False) self.softmax = nn.Softmax(-1) self.zero = Tensor([0.]) self.onehot = nn.OneHot(depth=self.num_bins) self.reducer_flag = reducer_flag if self.reducer_flag: self.allreduce = P.AllReduce() self.device_num = D.get_group_size() def construct(self, prediction, target): """construct""" p_bins = self._compute_p_bins(prediction) log_sigma2 = self.log_noise_scale * 1. log_sigma2 = 5. * P.Tanh()(log_sigma2 / 5.) sigma2 = mnp.exp(log_sigma2) + 0.25 * self.width tau = 2. * sigma2 a = - F.square(prediction - target) / tau ndim = prediction.ndim y_bins = mnp.reshape(self.centers * 1., ndim * (1,) + (-1,)) b_term = - F.square(mnp.expand_dims(prediction, -1) - y_bins) / tau p_clip = mnp.clip(p_bins, 1e-8, 1 - 1e-8) log_p = mnp.log(p_clip) log_p = mnp.reshape(log_p, ndim * (1,) + (-1,)) b_term += log_p b = nn.ReduceLogSumExp(-1, False)(b_term) err = -a + b return err def _compute_p_bins(self, y_gt): """compute bins""" ndim = y_gt.ndim breaks = mnp.reshape(self.breaks, (1,) * ndim + (-1,)) y_gt = mnp.expand_dims(y_gt, -1) y_bins = (y_gt > breaks).astype(mstype.float32) y_bins = P.ReduceSum()(y_bins, -1).astype(mstype.int32) p_gt = self.onehot(y_bins) p_gt = P.Reshape()(p_gt, (-1, self.num_bins)) p_bins = P.ReduceMean()(p_gt, 0) if self.reducer_flag: p_bins = self.allreduce(p_bins) / self.device_num p_bins = self.beta * self.p_bins + (1 - self.beta) * p_bins P.Assign()(self.p_bins, p_bins) return p_bins
[docs]class MultiClassFocal(nn.Cell): r"""Focal error for multi-class classifications. Compute the multiple classes focal error between `prediction` and the ground truth `target`. Refer to `Lin, Tsung-Yi, et al. 'Focal loss for dense object detection' <https://arxiv.org/abs/1708.02002>`_ . Args: num_class (int): The class numbers. beta (float, optional): The moving average coefficient, default: ``0.99``. gamma (float, optional): The hyperparameters, default: ``2.0``. e (float, optional): The proportion of focal loss, default: ``0.1``. neighbors(int, optional): The neighbors to be mask in the target, default ``2``. not_focal (bool, optional): Whether focal loss, default: ``False``. reducer_flag (bool, optional): Whether to aggregate the label values of multiple devices, default: ``False``. Inputs: - **prediction** (Tensor) - Predict values, shape is :math:`(batch\_size, ndim)`. - **target** (Tensor) - Label values, shape is :math:`(batch\_size, ndim)`. Outputs: Tensor, shape is :math:`(batch\_size, )`. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> import numpy as np >>> from mindspore import Tensor >>> from sponge.metrics import MultiClassFocal >>> net = MultiClassFocal(10) >>> prediction = Tensor(np.random.randn(32, 10).astype(np.float32)) >>> target = Tensor(np.random.randn(32, 10).astype(np.float32)) >>> out = net(prediction, target) >>> print(out.shape) (32,) """ def __init__(self, num_class, beta=0.99, gamma=2., e=0.1, neighbors=2, not_focal=False, reducer_flag=False): super(MultiClassFocal, self).__init__() self.num_class = num_class self.beta = beta self.gamma = gamma self.e = e self.neighbors = neighbors self.not_focal = not_focal neighbor_mask = np.ones((self.num_class, self.num_class)) neighbor_mask = neighbor_mask - np.triu(neighbor_mask, neighbors) - np.tril(neighbor_mask, -neighbors) neighbor_mask = neighbor_mask / (np.sum(neighbor_mask, axis=-1, keepdims=True) + 1e-10) self.neighbor_mask = Tensor(neighbor_mask, mstype.float32) self.class_weights = Parameter(Tensor(np.ones((self.num_class)) / self.num_class, dtype=mstype.float32), \ name='class_weights', requires_grad=False) self.softmax = nn.Softmax(-1) self.cross_entropy = P.SoftmaxCrossEntropyWithLogits() self.zero = Tensor([0.]) self.reducer_flag = reducer_flag if self.reducer_flag: self.allreduce = P.AllReduce() def construct(self, prediction, target): """construct""" prediction_tensor = self.softmax(prediction) zeros = mnp.zeros_like(prediction_tensor) one_minus_p = mnp.where(target > 1e-5, target - prediction_tensor, zeros) ft = -1 * mnp.power(one_minus_p, self.gamma) * mnp.log(mnp.clip(prediction_tensor, 1e-8, 1.0)) classes_num = self._compute_classes_num(target) total_num = mnp.sum(classes_num) classes_w_t1 = total_num / classes_num sum_ = mnp.sum(classes_w_t1) classes_w_t2 = classes_w_t1 / sum_ classes_w_tensor = F.cast(classes_w_t2, mstype.float32) weights = self.beta * self.class_weights + (1 - self.beta) * classes_w_tensor P.Assign()(self.class_weights, weights) classes_weight = mnp.broadcast_to(mnp.expand_dims(weights, 0), target.shape) alpha = mnp.where(target > zeros, classes_weight, zeros) balanced_fl = alpha * ft balanced_fl = mnp.sum(balanced_fl, -1) labels = P.MatMul()(target, self.neighbor_mask) xent, _ = self.cross_entropy(prediction, target) final_loss = (1 - self.e) * balanced_fl + self.e * xent if self.not_focal: softmax_xent, _ = self.cross_entropy(prediction, labels) final_loss = (1 - self.e) * softmax_xent + self.e * xent return final_loss def _compute_classes_num(self, target): "get global classes number" classes_num = mnp.sum(target, 0) if self.reducer_flag: classes_num = self.allreduce(classes_num) classes_num = F.cast(classes_num, mstype.float32) classes_num += 1. return classes_num
[docs]class BinaryFocal(nn.Cell): r""" Focal error for Binary classifications. Compute the binary classes focal error between `prediction` and the ground truth `target`. Refer to `Lin, Tsung-Yi, et al. 'Focal loss for dense object detection' <https://arxiv.org/abs/1708.02002>`_ . .. math:: \mathrm{FL}\left(p_{\mathrm{t}}\right)=-\alpha_{\mathrm{t}}\left(1-p_{\mathrm{t}}\right)^{\gamma} \log \left(p_{\mathrm{t}}\right) Args: alpha (float, optional): The weight of cross entropy, default: ``0.25``. gamma (float, optional): The hyperparameters, modulating loss from hard to easy, default: ``2.0``. feed_in (bool, optional): Whether to convert prediction, default: ``False``. not_focal (bool, optional): Whether focal loss, default: ``False``. Inputs: - **prediction** (Tensor) - Predict values, shape is :math:`(batch\_size, ndim)`. - **target** (Tensor) - Label values, shape is :math:`(batch\_size, ndim)`. Outputs: Tensor, shape is :math:`(batch\_size,)`. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> import numpy as np >>> from mindspore import Tensor >>> from sponge.metrics import BinaryFocal >>> net = BinaryFocal() >>> prediction = Tensor(np.random.randn(32, 10).astype(np.float32)) >>> target = Tensor(np.random.randn(32, 10).astype(np.float32)) >>> out = net(prediction, target) >>> print(out.shape) (32,) """ def __init__(self, alpha=0.25, gamma=2., feed_in=False, not_focal=False): super(BinaryFocal, self).__init__() self.alpha = alpha self.gamma = gamma self.feed_in = feed_in self.not_focal = not_focal self.cross_entropy = P.BinaryCrossEntropy(reduction='none') self.sigmoid = P.Sigmoid() self.epsilon = 1e-8 def construct(self, prediction, target): """construct""" epsilon = self.epsilon target = F.cast(target, mstype.float32) probs = F.cast(prediction, mstype.float32) if self.feed_in: probs = self.sigmoid(prediction) else: prediction = self._convert(prediction) ones_tensor = mnp.ones_like(target) positive_pt = mnp.where(target > 1e-5, probs, ones_tensor) negative_pt = mnp.where(target < 1e-5, 1 - probs, ones_tensor) focal_loss = -self.alpha * mnp.power(1 - positive_pt, self.gamma) * \ mnp.log(mnp.clip(positive_pt, epsilon, 1.)) - (1 - self.alpha) * \ mnp.power(1 - negative_pt, self.gamma) * mnp.log(mnp.clip(negative_pt, epsilon, 1.)) focal_loss *= 2. if self.not_focal: focal_loss = self.cross_entropy(prediction, target, ones_tensor) return focal_loss def _convert(self, probs): """convert function""" probs = mnp.clip(probs, 1e-5, 1. - 1e-5) prediction = mnp.log(probs / (1 - probs)) return prediction