mindspore.nn.transformer.layers 源代码

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
The basic layer of the Transformer Networks. This is an experimental interface that is subject to
change or deletion.
"""
from __future__ import absolute_import

from functools import wraps, partial
import inspect
import math
import numpy as np

from mindspore import nn, context
from mindspore.common.parameter import Parameter
from mindspore.common.initializer import initializer, Tensor
import mindspore.common.dtype as mstype
from mindspore.common.seed import _get_graph_seed
from mindspore.ops import operations as P
from mindspore._extends import cell_attr_register
from mindspore.nn.cell import Cell
from mindspore.nn.layer.activation import get_activation
from mindspore.ops import functional as F
from mindspore._checkparam import Validator
from mindspore.ops.primitive import constexpr
from mindspore.parallel._utils import _get_parallel_mode, _is_sharding_propagation
from mindspore.context import ParallelMode
from mindspore.nn.transformer.op_parallel_config import default_dpmp_config, OpParallelConfig, MoEParallelConfig
from mindspore import log as logger

__all__ = [
    "FixedSparseAttention"
]


def _args_type_validator_check(*type_args, **type_kwargs):
    """Check whether input data type is correct."""

    def type_check(func):
        sig = inspect.signature(func)
        bound_types = sig.bind_partial(*type_args, **type_kwargs).arguments

        @wraps(func)
        def wrapper(*args, **kwargs):
            nonlocal bound_types
            bound_values = sig.bind(*args, **kwargs)

            argument_dict = bound_values.arguments
            if "kwargs" in bound_types:
                bound_types = bound_types["kwargs"]
            if "kwargs" in argument_dict:
                argument_dict = argument_dict["kwargs"]
            for name, value in argument_dict.items():
                if name in bound_types:
                    bound_types[name](value, name)
            return func(*args, **kwargs)

        return wrapper

    return type_check


def _valid_type_checks(types, class_name):
    # types should be a list of types, this function check if the type is in the valid dtypes
    def validator_check_func(value, name):
        # The args of Validator.check_type_name is (arg_name, arg_type, valid_types, prim_name)
        # as the input of _args_type_validator_check is fixed, so we need to manually change the input order
        partial_check = partial(Validator.check_type_name, valid_types=types, prim_name=class_name)
        return partial_check(name, type(value))

    return validator_check_func


def _valid_value_checks(types, class_name):
    # the value should be a list of types, this function check if the value is in the valid dtypes
    def validator_check_func(value, name):
        # The args of Validator.check_type_name is (arg_name, arg_type, valid_types, prim_name)
        # as the input of _args_type_validator_check is fixed, so we need to manually change the input order
        partial_check = partial(Validator.check_type_name, valid_types=types, prim_name=class_name)
        return partial_check(name, value)

    return validator_check_func


class _LayerInputCheck:
    """
       A input check class for the inputs of the transformer model.
    """

    @staticmethod
    def check_shape_length(input_shape, param_name, func_name, target_len):
        """
        Check the input shape's length is equal to the expected shape
        :param input_shape(list): a list of the tensor shapes.
        :param param_name(str): the name of the checked parameter.
        :param func_name(str): the name of the function.
        :param target_len: the expected length of the shape.
        :return:
        """
        if not isinstance(target_len, list):
            target_len = [target_len]
        matched = False
        for item in target_len:
            if len(input_shape) == item:
                matched = True
        if not matched:
            raise ValueError(f"{func_name} {param_name} shape length must be one of {target_len} dimension, "
                             f"but got shape {input_shape}")
        return True

    @staticmethod
    def check_shape_equal(input_shape, param_name, func_name, target_shape):
        """
        Check the input shape's is equal to the expected shape
        :param input_shape(list): a list of the tensor shapes.
        :param param_name(str): the name of the checked parameter.
        :param func_name(str): the name of the function.
        :param target_shape: the expected shape.
        :return:
        """
        if not isinstance(target_shape[0], list):
            target_shape = [target_shape]
        if isinstance(input_shape, tuple):
            input_shape = list(input_shape)
        _LayerInputCheck.check_shape_length(input_shape, param_name, func_name,
                                            [len(item) for item in target_shape])
        matched = False
        for item in target_shape:
            if item == input_shape:
                matched = True
                break

        if not matched:
            raise ValueError(f"{func_name} {param_name} shape must be one of {target_shape},"
                             f"but got {input_shape}")
        return True

    @staticmethod
    def check_shape_value_on_axis(input_shape, dim, param_name, cls_name, target_value):
        """ Check whether the input_shape[dim] is equal to target value"""
        if input_shape[dim] != target_value:
            raise ValueError(f"{cls_name} {param_name} at {dim} shape must be {target_value},"
                             f"but got {input_shape[dim]}")
        return True

    @staticmethod
    def check_shape_equal_without_batch(input_shape, param_name, func_name, target_shape):
        """
        Check the input shape's is equal to the expected shape, the value on 0-th is viewed as batch, and the
        batch size will not be checked.
        """
        target_shape = target_shape
        length, hidden = target_shape
        if isinstance(input_shape, tuple):
            input_shape = list(input_shape)
        _LayerInputCheck.check_shape_length(input_shape, param_name, func_name,
                                            [len(target_shape), len(target_shape) + 1])
        if input_shape[-1] != hidden:
            raise ValueError(f"For {func_name}, the last dimension of {param_name} shape must be {hidden},"
                             f"but got the last dimension {input_shape[-1]} in {input_shape}.")
        if input_shape[0] == 0:
            raise ValueError(f"For {func_name}, the first dimension of {param_name} shape greater than 0,"
                             f"but got the first dimension {input_shape[0]} in {input_shape}.")
        if len(input_shape) == 2 and input_shape[0] % length != 0:
            raise ValueError(f"For {func_name}, the first dimension of {param_name} shape should be divisible "
                             f"by {length}, "
                             f"but got the first dimension {input_shape[0]} in {input_shape}.")
        return True


@constexpr
def _check_past_none_input_none(use_past, param_name, func_name, default_value, is_tensor, is_default):
    """ If the past is True, check whether the inputs is None"""
    if not use_past:
        if is_tensor:
            raise TypeError(f"{func_name} {param_name} must be {default_value}, if use_pat is False, but found "
                            f"a tensor")
        if not is_default:
            raise TypeError(f"{func_name} {param_name} must be {default_value}, if use_pat is False.")
    else:
        if not is_tensor:
            raise TypeError(f"{func_name} {param_name} must be tensor, if use_pat is True")
    return True


@constexpr
def _check_input_dtype(input_dtype, param_name, allow_dtypes, cls_name):
    Validator.check_type_name(param_name, input_dtype, allow_dtypes, cls_name)


@constexpr
def _check_input_shape(input_shape, param_name, func_name, target_len):
    # check the input length
    _LayerInputCheck.check_shape_length(input_shape, param_name, func_name, target_len)


@constexpr
def _check_shape_equal(input_shape, param_name, func_name, target_shape):
    # check the input length
    _LayerInputCheck.check_shape_equal(input_shape, param_name, func_name, target_shape)


@constexpr
def _check_input_shape_value(input_shape, dim, param_name, cls_name, target_value):
    _LayerInputCheck.check_shape_value_on_axis(input_shape, dim, param_name, cls_name, target_value)


@constexpr
def _check_shape_equal_without_batch(input_shape, param_name, func_name, target_shape):
    _LayerInputCheck.check_shape_equal_without_batch(input_shape, param_name, func_name, target_shape)


class _Dropout(nn.Cell):
    r"""
        A Dropout Implements with P.DropoutGenMask and  P.DropoutDoMask for parallel training.
    """

    def __init__(self, keep_prob=0.5, dtype=mstype.float32):
        super(_Dropout, self).__init__()
        if keep_prob <= 0 or keep_prob > 1:
            raise ValueError(
                "dropout probability should be a number in range (0, 1], but got {}".format(
                    keep_prob))
        Validator.check_subclass("dtype", dtype, mstype.number_type, self.cls_name)
        Validator.check_value_type('keep_prob', keep_prob, [float], self.cls_name)
        self.keep_prob = keep_prob
        self.is_ascend = context.get_context('device_target') in ["Ascend"]
        if self.is_ascend:
            seed0, seed1 = _get_graph_seed(0, "dropout")
            self.seed0 = seed0
            self.seed1 = seed1
            self.dtype = dtype
            self.get_shape = P.Shape()
            self.dropout_gen_mask = P.DropoutGenMask(Seed0=self.seed0, Seed1=self.seed1)
            self.dropout_do_mask = P.DropoutDoMask()
            self.cast = P.Cast()
        else:
            self.dropout = P.Dropout(keep_prob)

    def construct(self, x):
        r"""
           Input: a tensor
           Returns: a tensor
        """
        if not self.training:
            return x

        if self.keep_prob == 1:
            return x

        if not self.is_ascend:
            out, _ = self.dropout(x)
            return out

        shape = self.get_shape(x)
        dtype = P.DType()(x)
        keep_prob = self.cast(self.keep_prob, dtype)
        output = self.dropout_gen_mask(shape, keep_prob)
        return self.dropout_do_mask(x, output, keep_prob)

    def extend_repr(self):
        return 'keep_prob={}'.format(self.keep_prob)

    def shard(self, strategy):
        if self.is_ascend:
            self.dropout_gen_mask.shard(strategy)
            self.dropout_do_mask.shard(strategy)
        else:
            self.dropout.shard(strategy)


class _LayerNorm(Cell):
    r"""
        A self-defined layer norm operation using reduce sum and reduce mean

        Args:
            normalized_shape (tuple): The shape of the input tensor
            eps (float): The epsilon value of the denominator. Default 1e-5.
            param_init_type: The param init type.
        Inputs:
            - **x** (Tensor) - Tensor of shape :math:`(batch, seq\_length, hidden\_size)`.

        Outputs:
            Tensor of shape :math:`(batch, seq\_length, hidden\_size)`.
    """

    def __init__(self, normalized_shape, eps=1e-5, param_init_type=mstype.float32, is_self_defined=False):
        super(_LayerNorm, self).__init__()
        if param_init_type not in [mstype.float32, mstype.float16]:
            raise TypeError("The type of parameter 'param_init_type' should in [float32, float16], "
                            "but got the type : {}.".format(type(param_init_type)))
        self.is_self_defined = is_self_defined
        if not self.is_self_defined:
            self.layer_norm = P.LayerNorm(begin_norm_axis=-1,
                                          begin_params_axis=-1,
                                          epsilon=eps)
        self.gamma = Parameter(initializer('ones', normalized_shape, param_init_type), name="gamma",
                               parallel_optimizer=False)
        self.beta = Parameter(initializer('zeros', normalized_shape, param_init_type), name="beta",
                              parallel_optimizer=False)
        self.mean = P.ReduceMean(keep_dims=True)
        self.square = P.Square()
        self.sqrt = P.Sqrt()
        self.sub1 = P.Sub()
        self.sub2 = P.Sub()
        self.add = P.Add()
        self.eps = eps
        self.mul = P.Mul()
        self.add2 = P.Add()
        self.real_div = P.RealDiv()

    def construct(self, x):
        r"""
          x : batch x seq_length x hidden_size
        """
        if self.is_self_defined:
            mean = self.mean(x, -1)
            diff = self.sub1(x, mean)
            variance = self.mean(self.square(diff), -1)
            variance_eps = self.sqrt(self.add(variance, self.eps))
            output = self.real_div(diff, variance_eps)
            output = self.add2(self.mul(output, self.gamma), self.beta)
        else:
            output, _, _ = self.layer_norm(x, self.gamma, self.beta)
        return output

    def shard(self, strategy):
        r"""
        Set the shard for the layer norm. the strategy size should be equal to the inputs.

        Note:
            It is valid only in semi auto parallel or auto parallel mode.
            In other parallel modes, strategies set here will be ignored.

        Args:
            strategy (tuple): The strategy for the dropout. Should be the same shape as the inputs.
        Examples:
            >>> import mindspore
            >>> net = mindspore.parallel.nn.transformer.LayerNorm(normalized_shape=(1024, 10))
            >>> net.shard(((10, 2, 1),))
        """
        if self.is_self_defined:
            self.mean.shard(strategy)
            self.square.shard(strategy)
            self.sqrt.shard(strategy)
            self.sub1.shard((strategy[0], strategy[0]))
            self.sub2.shard((strategy[0], strategy[0]))
            self.add.shard((strategy[0], ()))
            self.mul.shard((strategy[0], (1,)))
            self.add2.shard((strategy[0], (1,)))
            self.real_div.shard((strategy[0], strategy[0]))
        else:
            self.layer_norm.shard((strategy[0], (1,), (1,)))

        return self


class _Linear(Cell):
    r"""
    The dense connected layer. Once the parallel mode is enabled, the input shape should be
    3-D tensor.

    Applies dense connected layer for the input. This layer implements the operation as:

    .. math::
        \text{outputs} = \text{activation}(\text{X} * \text{kernel} + \text{bias}),

    where :math:`X` is the input tensors, :math:`\text{activation}` is the activation function passed as the activation
    argument (if passed in), :math:`\text{kernel}` is a weight matrix with the same
    data type as the :math:`X` created by the layer, and :math:`\text{bias}` is a bias vector
    with the same data type as the :math:`X` created by the layer (only if has_bias is True).

    Args:
        in_channels (int): The number of channels in the input space.
        out_channels (int): The number of channels in the output space.
        weight_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable weight_init parameter. The dtype
            is same as `x`. The values of str refer to the function `initializer`. Default: 'normal'.
        bias_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable bias_init parameter. The dtype is
            same as `x`. The values of str refer to the function `initializer`. Default: 'zeros'.
        has_bias (bool): Specifies whether the layer uses a bias vector. Default: True.
        activation (str): activate function applied to the output of the fully connected layer,
            eg. 'ReLU'. Default: None.
        expert_num (int): The number of experts used in this Linear. Here, for the case expert_num > 1, BatchMatMul is
            used and the first dimension in BatchMatMul indicate expert_num. Default: 1.
        outer_batch (int): The replication number of experts. The replication is effective only when MoE is applied.
            Default: 1.
        expert_group_size (int): The number of tokens in each data parallel group. Default: None.
        compute_dtype (dtype.Number): The computation type. Default: mstype.float16
    Inputs:
        - **x** (Tensor) - Tensor of shape :math:`(*, in\_channels)`. The `in_channels` in `Args` should be equal
          to :math:`in\_channels` in `Inputs`.

    Outputs:
        Tensor of shape :math:`(*, out\_channels)`.

    Raises:
        TypeError: If `in_channels` or `out_channels` is not an int.
        TypeError: If `has_bias` is not a bool.
        TypeError: If `activation` is not one of str, Cell, Primitive, None.
        ValueError: If length of shape of `weight_init` is not equal to 2 or shape[0] of `weight_init`
                    is not equal to `out_channels` or shape[1] of `weight_init` is not equal to `in_channels`.
        ValueError: If length of shape of `bias_init` is not equal to 1
                    or shape[0] of `bias_init` is not equal to `out_channels`.

    Supported Platforms:
        ``Ascend`` ``GPU``
    """

    @cell_attr_register
    @_args_type_validator_check(in_channels=Validator.check_positive_int,
                                out_channels=Validator.check_positive_int,
                                has_bias=Validator.check_bool,
                                transpose_b=Validator.check_bool,
                                expert_num=Validator.check_positive_int,
                                outer_batch=Validator.check_positive_int,
                                param_init_type=_valid_value_checks([mstype.float32, mstype.float16],
                                                                    "Linear"),
                                compute_dtype=_valid_value_checks([mstype.float32, mstype.float16],
                                                                  "Linear"))
    def __init__(self,
                 in_channels,
                 out_channels,
                 weight_init='normal',
                 bias_init='zeros',
                 has_bias=True,
                 activation=None,
                 transpose_b=True,
                 expert_num=1,
                 outer_batch=1,
                 expert_group_size=None,
                 param_init_type=mstype.float32,
                 compute_dtype=mstype.float16):
        super(_Linear, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        if not (isinstance(activation, str) or activation is None or issubclass(activation, nn.Cell)):
            raise TypeError(f"For Linear cell, the activation should str type or nn.Cell type, but got {activation}.")
        if isinstance(weight_init, Tensor) and (weight_init.ndim != 2 or weight_init.shape[0] != out_channels or
                                                weight_init.shape[1] != in_channels):
            raise ValueError("The shape of parameter 'weight_init' is error, please check shape of 'weight_init'.")
        weight_shape = [out_channels, in_channels] if transpose_b else [in_channels, out_channels]
        self.expert_num = expert_num
        self.outer_batch = outer_batch
        self.expert_group_size = expert_group_size
        if self.expert_num > 1:
            self.expert_flag = True
            self.weight = Parameter(initializer(weight_init, [self.expert_num] + weight_shape, param_init_type),
                                    name="weight")
            self.matmul = P.BatchMatMul(transpose_b=transpose_b)
        else:
            self.expert_flag = False
            self.weight = Parameter(initializer(weight_init, weight_shape, param_init_type), name="weight")
            self.matmul = P.MatMul(transpose_b=transpose_b)
        self.use_expert_group_size = _get_parallel_mode() in (ParallelMode.AUTO_PARALLEL,) \
                                     and not _is_sharding_propagation() and self.expert_flag is True
        if self.use_expert_group_size is True and self.expert_group_size is None:
            raise ValueError("'expert_group_size' should be configured as an integer in MoEConfig.")
        self.bias = None
        self.has_bias = has_bias
        if self.has_bias:
            if isinstance(bias_init, Tensor) and (bias_init.ndim != 1 or bias_init.shape[0] != out_channels):
                raise ValueError("The shape of parameter 'bias_init' is error, please check shape of 'bias_init'.")
            if self.expert_flag:
                self.bias = Parameter(initializer(bias_init,
                                                  [1, self.expert_num, 1, out_channels], param_init_type), name="bias")
            else:
                self.bias = Parameter(initializer(bias_init, [out_channels], param_init_type), name="bias")
            self.bias.parallel_optimizer = False
            self.bias_add = P.Add()
        self.act_name = activation
        if callable(activation):
            self.activation = activation()
        else:
            self.activation = get_activation(activation) if isinstance(activation, str) else activation
        self.activation_flag = self.activation is not None
        self.dtype = compute_dtype
        self.cast = P.Cast()

    def construct(self, x):
        out_shape = P.Shape()(x)[:-1] + (self.out_channels,)
        x = P.Reshape()(x, (-1, self.in_channels))
        if self.expert_flag:
            if self.use_expert_group_size is True:
                x = P.Reshape()(x, (-1, self.expert_num, self.expert_group_size, self.in_channels))
            else:
                x = P.Reshape()(x, (self.outer_batch, self.expert_num, -1, self.in_channels))
        ori_dtype = F.dtype(x)
        weight = self.cast(self.weight, self.dtype)
        x = self.cast(x, self.dtype)
        x = self.matmul(x, weight)
        if self.has_bias:
            x = self.bias_add(x, self.cast(self.bias, self.dtype))
        if self.activation_flag:
            x = self.activation(x)
        x = F.cast(x, ori_dtype)
        output = P.Reshape()(x, out_shape)
        return output

    def shard(self, strategy_matmul, strategy_bias=None, strategy_activation=None):
        r"""
        Set the shard for the linear. the strategy size should be equal to the inputs.

        Note:
            It is valid only in semi auto parallel or auto parallel mode.
            In other parallel modes, strategies set here will be ignored.

        Args:
            strategy_matmul (tuple): The strategy for the matmul. Should be the same shape as the inputs.
            strategy_bias (tuple): The strategy for the bias_add. Should be the same shape as the inputs.
            strategy_activation (tuple): The strategy for the strategy_activation. Should be the same shape as
            the inputs.
        """
        self.matmul.shard(strategy_matmul)
        if self.has_bias:
            self.bias_add.shard(strategy_bias)
        if self.activation_flag and isinstance(self.act_name, str):
            # some operations has many primitives, need to manually set the shard
            if self.act_name.lower() == "leakyrelu":
                self.activation.select_op.shard((strategy_activation[0], strategy_activation[0]))
            elif self.act_name.lower() == "logsigmoid":
                self.activation.mul.shard((strategy_activation[0], ()))
                self.activation.exp.shard(strategy_activation)
                self.activation.add.shard((strategy_activation[0], ()))
                self.activation.rec.shard(strategy_activation)
                self.activation.log.shard(strategy_activation)
            elif self.act_name.lower() == "logsoftmax":
                raise ValueError("The 'LogSoftmax' function is not supported in semi auto parallel "
                                 "or auto parallel mode.")
            else:
                getattr(self.activation, self.act_name).shard(strategy_activation)
        elif self.activation_flag and isinstance(self.activation, Cell):
            if hasattr(self.activation, 'activation_shard') and strategy_activation:
                shard_tuple = strategy_activation[0]
                if len(shard_tuple) == 2:
                    parallel_config = OpParallelConfig(data_parallel=shard_tuple[0],
                                                       model_parallel=shard_tuple[1])
                elif len(shard_tuple) == 4:
                    parallel_config = MoEParallelConfig(data_parallel=shard_tuple[0],
                                                        expert_parallel=shard_tuple[1],
                                                        model_parallel=shard_tuple[2])
                else:
                    raise ValueError("The user-defined activation function currently only supports the case where the "
                                     "input policy is 2 or 4, so that relevant policies can be extracted from it."
                                     "To avoid this error, you need to add the function of extracting "
                                     "'ParallelConfig' or 'OpParallelConfig' for the incoming strategy_activation ")
                self.activation.activation_shard(parallel_config)
            else:
                logger.warning(f"The user passed the custom defined activation function {self.activation_flag}. "
                               f"If the user want to enable shard for the activation cell, "
                               f"the user should set the shard for each primitives in the cell.")
        return self


[文档]class FixedSparseAttention(nn.Cell): """ Fixed Sparse Attention Layer. This function contains the sparse attention primitives used in Sparse Transformers (see paper) `Generating Long Sequences with Sparse Transformers <https://arxiv.org/abs/1904.10509>`_. Specifically, it includes the following: 1. A faster implementation of normal attention (the upper triangle is not computed, and many operations are fused). 2. An implementation of "strided" and "fixed" attention, as in the Sparse Transformers paper. Args: batch_size(int): Number of input batch size. num_heads(int): Number of attention heads. size_per_head(int): An integer determining embedding size of each attention head, only supports 64, 128 for now. block_size(int): An integer determining the block size. Current implementation of sparse self-attention is based on blocked sparse matrices. In which this parameter defines the size of such blocks, Block X Block. Only supports 64 for now. seq_length(int): length of input sequence, only supports 1024 for now. Default 1024. num_different_global_patterns(int): An integer determining the number of different global attentions layouts. While global attention can be fixed by which block/s are representative of any local window, since there are multi-heads, each head can use a different global representative, only supports 4 for now. Default 4. parallel_config(OpParallelConfig): The config of parallel setting, see `OpParallelConfig`. Default `default_dpmp_config`, an instance of `OpParallelConfig` with default args. Inputs: - **q** (Tensor) - Tensor query (:class:`mstype.fp16` [batch_size, seq_length, hidden_size]): Sequence of queries to query the context. - **k** (Tensor) - Tensor key (:class:`mstype.fp16` [batch_size, seq_length, hidden_size]): Sequence of queries to query the context. - **v** (Tensor) - Tensor value (:class:`mstype.fp16` [batch size, sequence length, Embedding Size]): Sequence of queries to query the context. - **attention_mask** (Tensor) - Float Tensor the mask of (:class:`mstype.fp32`, :class:`mstype.fp16` [batch_size, seq_length, seq_length]): Lower triangular matrix to pass masked information. Outputs: A Tensor. The output of the attention with shape [batch_size, seq_length, hidden_size] Supported Platforms: ``Ascend`` Examples: >>> import numpy as np >>> from mindspore import dtype as mstype >>> from mindspore.nn.transformer import FixedSparseAttention >>> from mindspore import Tensor >>> model = FixedSparseAttention(batch_size=2, ... num_heads=8, ... size_per_head=64, ... block_size=64) >>> q = Tensor(np.ones((2, 1024, 8*64)), mstype.float16) >>> k = Tensor(np.ones((2, 1024, 8*64)), mstype.float16) >>> v = Tensor(np.ones((2, 1024, 8*64)), mstype.float16) >>> attention_mask = Tensor(np.ones((2, 1024, 1024)), mstype.float32) >>> output = model(q, k, v, attention_mask) >>> print(output.shape) (2, 1024, 512) """ @_args_type_validator_check(batch_size=Validator.check_positive_int, num_heads=Validator.check_positive_int, size_per_head=Validator.check_positive_int, block_size=Validator.check_positive_int, seq_length=Validator.check_positive_int, num_different_global_patterns=Validator.check_positive_int, parallel_config=_valid_type_checks([OpParallelConfig], "FixedSparseAttention")) def __init__(self, batch_size, num_heads, size_per_head, block_size, seq_length=1024, num_different_global_patterns=4, parallel_config=default_dpmp_config): super(FixedSparseAttention, self).__init__() dp, mp = parallel_config.data_parallel, parallel_config.model_parallel if num_heads % mp != 0: raise ValueError(f"The number of heads {num_heads} must be a " f"multiple of parallel_config.model_parallel {mp}.") if batch_size % dp != 0: raise ValueError(f"The batch_size {batch_size} must be a " f"multiple of parallel_config.data_parallel {parallel_config.data_parallel}.") self.seq_length = seq_length self.batch_size = batch_size self.hidden_size = size_per_head * num_heads self.num_heads = num_heads self.block_size = block_size self.block_num = seq_length // block_size self.size_per_head = size_per_head self.global_size = seq_length // 4 self.reshape = P.Reshape() self.transpose = P.Transpose().shard(((dp, 1, mp, 1),)) self.batch_matmul = P.BatchMatMul().shard(((dp, 1, 1, 1), (dp, 1, 1, 1))) self.multiply = P.Mul().shard(((dp, 1, 1, 1), (1, 1, 1))) self.multiply_data = Tensor([-10000.0], dtype=mstype.float32) self.parallel_config = parallel_config size_per_head_list = [64, 128] if self.seq_length != 1024: raise ValueError("For 'FixedSparseAttention', the class variable 'seq_length' must be 1024, " "but got the value : {}.".format(seq_length)) if self.block_size != 64: raise ValueError("For 'FixedSparseAttention', the class variable 'block_size' must be 64, " "but got the value : {}.".format(block_size)) if num_different_global_patterns != 4: raise ValueError("For 'FixedSparseAttention', the class variable 'num_different_global_patterns' " "must be 4, but got the value : {}".format(num_different_global_patterns)) if self.size_per_head not in size_per_head_list: raise ValueError("For 'FixedSparseAttention', the class variable 'size_per_head' only supports {}, " "but got the value : {}.".format(size_per_head_list, self.size_per_head)) local_ones = np.ones((self.block_size, self.block_size), dtype=np.float16) global_mask_original = np.ones((self.seq_length, self.global_size), dtype=np.float16) for i in range(self.seq_length): for j in range(self.global_size): if i // 16 >= (j // 16 + 1) * 4: global_mask_original[i, j] = 0.0 global_mask_original = -10000 * global_mask_original global_mask_fx = global_mask_original.reshape((self.seq_length // 16, 16, self.global_size // 16, 16)) global_mask = np.transpose(global_mask_fx, (2, 0, 1, 3)) global_mask = np.repeat(global_mask[np.newaxis, :, :, :, :], self.batch_size, axis=0) global_mask = global_mask.reshape((self.batch_size * self.global_size // 16, self.seq_length // 16, 16, 16)) self.global_mask = Tensor(global_mask, mstype.float32) self.local_mask_triangle = Tensor(np.tril(local_ones), mstype.float32) self.scale_factor = Tensor((math.sqrt(self.size_per_head))) self.matmul_dds = P.MatmulDDS(self.batch_size, self.num_heads).shard(((mp, dp, 1, 1), (mp, dp, 1, 1), (1, dp, 1, 1), (dp, 1, 1, 1))) self.matmul_dsd = P.DSDMatmul().shard(((dp, mp, 1, 1, 1, 1, 1), (dp, mp, 1, 1, 1, 1, 1), (dp, mp, 1, 1))) self.sub1 = P.Sub().shard(((1,), (dp, 1, 1, 1))) self.mul1 = P.Mul().shard(((dp, 1, 1, 1), (1,))) self.transpose1 = P.Transpose().shard(((dp, 1, 1, 1),)) self.transpose2 = P.Transpose().shard(((dp, 1, 1, 1),)) self.transpose3 = P.Transpose().shard(((dp, mp, 1, 1, 1, 1),)) self.transpose4 = P.Transpose().shard(((dp, mp, 1, 1),)) self.div = P.RealDiv().shard(((mp, dp, 1, 1), ())) self.slice1 = P.StridedSlice().shard(((dp, 1, 1),)) def construct(self, q, k, v, attention_mask): _check_shape_equal(F.shape(q), "q", self.cls_name, [self.batch_size, self.seq_length, self.hidden_size]) _check_input_dtype(F.dtype(q), "q", [mstype.float16], self.cls_name) _check_shape_equal(F.shape(k), "k", self.cls_name, [self.batch_size, self.seq_length, self.hidden_size]) _check_input_dtype(F.dtype(k), "k", [mstype.float16], self.cls_name) _check_shape_equal(F.shape(v), "v", self.cls_name, [self.batch_size, self.seq_length, self.hidden_size]) _check_input_dtype(F.dtype(v), "v", [mstype.float16], self.cls_name) _check_shape_equal(F.shape(attention_mask), "attention_mask", self.cls_name, [self.batch_size, self.seq_length, self.seq_length]) _check_input_dtype(F.dtype(attention_mask), "attention_mask", [mstype.float32, mstype.float16], self.cls_name) q, k, v = self._transpose_inputs(q, k, v) local_mask, global_mask = self._generate_attention_mask(attention_mask) q = self.div(q, F.cast(self.scale_factor, F.dtype(q))) k = self.div(k, F.cast(self.scale_factor, F.dtype(k))) local_prob, global_prob = self.matmul_dds(q, k, local_mask, global_mask) attention = self.matmul_dsd(local_prob, global_prob, v) attention_merge = self.transpose3(attention, (0, 1, 3, 4, 2, 5)) attention_merge = F.reshape( attention_merge, (-1, self.num_heads, self.seq_length, self.size_per_head)) attention_merge = self.transpose4(attention_merge, (0, 2, 1, 3)) attention_merge = F.reshape( attention_merge, (-1, self.seq_length, self.size_per_head * self.num_heads)) return attention_merge def _generate_attention_mask(self, attention_mask): """ generate global attention mask and local attention mask from origin attention mask """ attention_mask = self.reshape(attention_mask, (-1, self.seq_length, self.seq_length)) input_mask = self.slice1(attention_mask, (0, self.seq_length - 1, 0), (self.batch_size, self.seq_length, self.seq_length), (1, 1, 1)) input_mask = self.reshape(input_mask, (-1, self.seq_length)) input_shape = P.Shape()(input_mask) # bs, seq_length # bs, block_num, 1, block_size local_shape_right = (input_shape[0], self.block_num, 1, self.block_size) # bs, block_num, block_size, 1 local_shape_left = (input_shape[0], self.block_num, self.block_size, 1) local_mask_left = self.reshape(input_mask, local_shape_left) local_mask_right = self.reshape(input_mask, local_shape_right) # bs, block_num, block_size, block_size local_attention_mask = self.batch_matmul(local_mask_left, local_mask_right) lower_triangle = P.ExpandDims()(self.local_mask_triangle, 0) local_attention_mask = self.multiply(local_attention_mask, lower_triangle) local_multiplied_out = self.sub1(P.Cast()(F.tuple_to_array((1.0,)), mstype.float32), P.Cast()(local_attention_mask, mstype.float32)) local_adder = self.mul1(local_multiplied_out, self.multiply_data) local_mask_original = self.transpose1(local_adder, (0, 2, 1, 3)) local_mask_original = self.reshape( local_mask_original, (self.batch_size * self.block_size, self.block_num * self.block_size)) local_mask_fx = self.reshape( local_mask_original, (self.batch_size * self.block_size // 16, 16, self.block_num * self.block_size // 16, 16)) local_mask = self.transpose2(local_mask_fx, (2, 0, 1, 3)) global_mask = self.global_mask return local_mask, global_mask def _transpose_inputs(self, q, k, v): """ do reshape and transpose to inputs """ q = self.transpose( self.reshape( q, (-1, 16, self.num_heads * self.size_per_head // 16, 16)), (2, 0, 1, 3)) k = self.transpose( self.reshape( k, (-1, 16, self.num_heads * self.size_per_head // 16, 16)), (2, 0, 1, 3)) v = self.transpose( self.reshape( v, (-1, 16, self.num_heads * self.size_per_head // 16, 16)), (0, 2, 3, 1)) return q, k, v