# This is the Python adaptation and derivative work of Myia (https://github.com/mila-iqia/myia/).
#
# Copyright 2020-2024 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Basic composite operations."""
from __future__ import absolute_import
from functools import partial
from types import FunctionType, MethodType
import numpy as np
import mindspore as ms
from mindspore import context
from mindspore.common.parameter import Parameter, ParameterTuple
from mindspore.parallel._utils import _grads_divided_by_device_num_if_recomputation
from mindspore._c_expression import GradOperation_, HyperMap_, Map_, MultitypeFuncGraph_, Tail_, \
TupleAdd_, UnpackCall_, ZipOperation_, ListAppend_, TupleGetItemTensor_, ListInsert_, \
SequenceSliceGetItem_, ListSliceSetItem_, VmapOperation_, TaylorOperation_, ListPop_, \
ListClear_, ListReverse_, ListExtend_, DictClear_, DictHasKey_, DictUpdate_, DictFromKeys_, \
ZerosLike_, TensorIndexGetitem_, TensorIndexSetitem_, ListAdd_, DictSetItem_, \
HandleBoolTensor_, PreSetitemByTuple_, StarredGetItem_, \
StarredUnpack_, StarredUnpackMerge_, IterConverter_, HasNext_, Next_, MSContext
from mindspore.common import dtype as mstype
from mindspore.common.api import jit, _pynative_executor, _wrap_func
from mindspore.common.api import _add_flags, _core
from mindspore.ops.primitive import Primitive
from mindspore.ops import signature as sig
__all__ = [TupleAdd_, ListAdd_, UnpackCall_, TupleGetItemTensor_, SequenceSliceGetItem_,
ListSliceSetItem_, ZerosLike_, TensorIndexGetitem_, TensorIndexSetitem_,
HandleBoolTensor_, PreSetitemByTuple_]
def add_flags(fn=None, **flags):
"""
A decorator that adds a flag to the function.
Note:
Only supports bool value.
Args:
fn (Function): Function or cell to add flag. Default: ``None`` .
flags (dict): Flags use kwargs. Default: ``None`` .
Returns:
Function, the function with added flags.
Examples:
>>> net = Net();
>>> net = add_flags(net, predit=True)
>>> print(hasattr(net, '_func_graph_flags'))
True
"""
return _add_flags(fn, **flags)
def core(fn=None, **flags):
"""
A decorator that adds a flag to the function.
By default, the function is marked as True, enabling to use this decorator to
set flag to a graph.
Args:
fn (Function, optional): Function to add flag. Default: ``None`` .
flags (dict, optional): The following flags can be set core, which indicates that this is a core function or
other flag. Default: ``None`` .
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> net = Net()
>>> net = core(net, predit=True)
>>> print(hasattr(net, '_func_graph_flags'))
True
"""
return _core(fn, **flags)
def _get_grad_weights_id(weights=None):
"""generate id of parameters"""
res = ""
if isinstance(weights, Parameter):
res = weights.name + str(weights.requires_grad)
if isinstance(weights, ParameterTuple):
res = ''.join(item.name + str(item.requires_grad) for item in weights)
if isinstance(weights, list):
res = ''.join(item.name + str(item.requires_grad) for item in weights if isinstance(item, Parameter))
return res
[文档]class GradOperation(GradOperation_):
"""
A higher-order function which is used to generate the gradient function for the input function.
The gradient function generated by `GradOperation` higher-order function can be customized by
construction arguments.
For example, given an input function `net = Net()` that takes `x` and `y` as inputs, and has a parameter `z`,
see `Net` in Examples.
- Used to get the derivative of the input:
1. Returns gradients with respect to the first input (see `GradNetWrtX` in Examples).
1) Construct a `GradOperation` higher-order function with default arguments: `grad_op = GradOperation()`.
2) Call it with input function as argument to get the gradient function: `gradient_function = grad_op(net)`.
3) Call the gradient function with input function's inputs to get the gradients with respect to the first
input: `grad_op(net)(x, y)`.
2. Returns gradients with respect to all inputs (see `GradNetWrtXY` in Examples).
1) Construct a `GradOperation` higher-order function with `get_all=True` which indicates getting gradients
with respect to all inputs, they are `x` and `y` in example function `Net()`:
`grad_op = GradOperation(get_all=True)`.
2) Call it with input function as argument to get the gradient function: `gradient_function = grad_op(net)`.
3) Call the gradient function with input function's inputs to get the gradients with respect to all inputs:
`gradient_function(x, y)`.
- Used to get the derivative of the parameters:
Returns gradients with respect to given parameters (see `GradNetWithWrtParams` in Examples).
1. Construct a `GradOperation` higher-order function with `get_by_list=True`:
`grad_op = GradOperation(get_by_list=True)`.
2. Construct a `ParameterTuple` that will be passed to the input function when constructing
`GradOperation` higher-order function, it will be used as a parameter filter that determine
which gradient to return: `params = ParameterTuple(net.trainable_params())`.
3. Call it with input function and `params` as arguments to get the gradient function:
`gradient_function = grad_op(net, params)`.
4. Call the gradient function with input function's inputs to get the gradients with
respect to given parameters: `gradient_function(x, y)`.
- Used to get the derivative of the inputs and parameters at the same time:
Returns gradients with respect to all inputs and given parameters in the format of ((dx, dy), (dz))
(see `GradNetWrtInputsAndParams` in Examples).
1. Construct a `GradOperation` higher-order function with `get_all=True` and `get_by_list=True`:
`grad_op = GradOperation(get_all=True, get_by_list=True)`.
2. Construct a `ParameterTuple` that will be passed along input function when constructing
`GradOperation` higher-order function: `params = ParameterTuple(net.trainable_params())`.
3. Call it with input function and `params` as arguments to get the gradient function:
`gradient_function = grad_op(net, params)`.
4. Call the gradient function with input function's inputs to get the gradients with respect to
all inputs and given parameters: `gradient_function(x, y)`.
- We can configure the sensitivity(gradient with respect to output) by setting `sens_param` as True and
passing an extra sensitivity input to the gradient function, the sensitivity input should has the
same shape and type with input function's output(see `GradNetWrtXYWithSensParam` in Examples).
1. Construct a `GradOperation` higher-order function with `get_all=True` and `sens_param=True`:
`grad_op = GradOperation(get_all=True, sens_param=True)`.
2. Define `grad_wrt_output` as `sens_param` which works as the gradient with respect to output:
`grad_wrt_output = Tensor(np.ones([2, 2]).astype(np.float32))`.
3. Call it with input function as argument to get the gradient function: `gradient_function = grad_op(net)`.
4. Call the gradient function with input function's inputs and `sens_param` to
get the gradients with respect to all inputs: `gradient_function(x, y, grad_wrt_output)`.
Note:
For above gradient functions, the returned gradient result may vary for grad result element number:
- Return a single value if only one result.
- Return a tuple for multiple results.
- Return an empty tuple for no result.
Args:
get_all (bool): If ``True`` , get all the gradients with respect to inputs. Default: ``False`` .
get_by_list (bool): If ``True`` , get all the gradients with respect to Parameter free variables.
If get_all and get_by_list are both ``False`` , get the gradient with respect to first input.
If get_all and get_by_list are both ``True`` , get the gradients with respect to inputs and
Parameter free variables at the same time in the form of ("gradients with respect to inputs",
"gradients with respect to parameter free variables"). Default: ``False`` .
sens_param (bool): Whether to append sensitivity (gradient with respect to output) as input.
If sens_param is ``False`` , a 'ones_like(outputs)' sensitivity will be attached automatically.
Default: ``False`` .
If the sensor_param is ``True`` , a sensitivity (gradient with respect to output) needs to be transferred
through the positional parameter or key-value pair parameter. If the value is transferred through
the key-value pair parameter, the key must be sens.
Returns:
The higher-order function which takes a function as argument and returns gradient function for it.
Raises:
TypeError: If `get_all`, `get_by_list` or `sens_param` is not a bool.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import numpy as np
>>> from mindspore import dtype as mstype
>>> from mindspore import Tensor, ops, nn, Parameter
>>> class Net(nn.Cell):
... def __init__(self):
... super(Net, self).__init__()
... self.matmul = ops.MatMul()
... self.z = Parameter(Tensor(np.array([1.0], np.float32)), name='z')
... def construct(self, x, y):
... x = x * self.z
... out = self.matmul(x, y)
... return out
...
>>> class GradNetWrtX(nn.Cell):
... def __init__(self, net):
... super(GradNetWrtX, self).__init__()
... self.net = net
... self.grad_op = ops.GradOperation()
... def construct(self, x, y):
... gradient_function = self.grad_op(self.net)
... return gradient_function(x, y)
...
>>> x = Tensor([[0.5, 0.6, 0.4], [1.2, 1.3, 1.1]], dtype=mstype.float32)
>>> y = Tensor([[0.01, 0.3, 1.1], [0.1, 0.2, 1.3], [2.1, 1.2, 3.3]], dtype=mstype.float32)
>>> output = GradNetWrtX(Net())(x, y)
>>> print(output)
[[1.4100001 1.5999999 6.6 ]
[1.4100001 1.5999999 6.6 ]]
>>>
>>> class GradNetWrtXY(nn.Cell):
... def __init__(self, net):
... super(GradNetWrtXY, self).__init__()
... self.net = net
... self.grad_op = ops.GradOperation(get_all=True)
... def construct(self, x, y):
... gradient_function = self.grad_op(self.net)
... return gradient_function(x, y)
>>>
>>> x = Tensor([[0.8, 0.6, 0.2], [1.8, 1.3, 1.1]], dtype=mstype.float32)
>>> y = Tensor([[0.1, 3.3, 1.1], [1.1, 0.2, 1.4], [1.1, 2.2, 0.3]], dtype=mstype.float32)
>>> output = GradNetWrtXY(Net())(x, y)
>>> print(output)
(Tensor(shape=[2, 3], dtype=Float32, value=
[[ 4.50000000e+00, 2.70000005e+00, 3.60000014e+00],
[ 4.50000000e+00, 2.70000005e+00, 3.60000014e+00]]), Tensor(shape=[3, 3], dtype=Float32, value=
[[ 2.59999990e+00, 2.59999990e+00, 2.59999990e+00],
[ 1.89999998e+00, 1.89999998e+00, 1.89999998e+00],
[ 1.30000007e+00, 1.30000007e+00, 1.30000007e+00]]))
>>>
>>> class GradNetWrtXYWithSensParam(nn.Cell):
... def __init__(self, net):
... super(GradNetWrtXYWithSensParam, self).__init__()
... self.net = net
... self.grad_op = ops.GradOperation(get_all=True, sens_param=True)
... self.grad_wrt_output = Tensor([[0.1, 0.6, 0.2], [0.8, 1.3, 1.1]], dtype=mstype.float32)
... def construct(self, x, y):
... gradient_function = self.grad_op(self.net)
... return gradient_function(x, y, self.grad_wrt_output)
>>>
>>> x = Tensor([[0.8, 0.6, 0.2], [1.8, 1.3, 1.1]], dtype=mstype.float32)
>>> y = Tensor([[0.11, 3.3, 1.1], [1.1, 0.2, 1.4], [1.1, 2.2, 0.3]], dtype=mstype.float32)
>>> output = GradNetWrtXYWithSensParam(Net())(x, y)
>>> print(output)
(Tensor(shape=[2, 3], dtype=Float32, value=
[[ 2.21099997e+00, 5.09999990e-01, 1.49000001e+00],
[ 5.58800030e+00, 2.68000007e+00, 4.07000017e+00]]), Tensor(shape=[3, 3], dtype=Float32, value=
[[ 1.51999998e+00, 2.81999993e+00, 2.14000010e+00],
[ 1.09999990e+00, 2.04999995e+00, 1.54999995e+00],
[ 9.00000036e-01, 1.54999995e+00, 1.25000000e+00]]))
>>>
>>> class GradNetWithWrtParams(nn.Cell):
... def __init__(self, net):
... super(GradNetWithWrtParams, self).__init__()
... self.net = net
... self.params = ParameterTuple(net.trainable_params())
... self.grad_op = ops.GradOperation(get_by_list=True)
... def construct(self, x, y):
... gradient_function = self.grad_op(self.net, self.params)
... return gradient_function(x, y)
>>>
>>> x = Tensor([[0.8, 0.6, 0.2], [1.8, 1.3, 1.1]], dtype=mstype.float32)
>>> y = Tensor([[0.11, 3.3, 1.1], [1.1, 0.2, 1.4], [1.1, 2.2, 0.3]], dtype=mstype.float32)
>>> output = GradNetWithWrtParams(Net())(x, y)
>>> print(output)
(Tensor(shape=[1], dtype=Float32, value= [ 2.15359993e+01]),)
>>>
>>> class GradNetWrtInputsAndParams(nn.Cell):
... def __init__(self, net):
... super(GradNetWrtInputsAndParams, self).__init__()
... self.net = net
... self.params = ParameterTuple(net.trainable_params())
... self.grad_op = ops.GradOperation(get_all=True, get_by_list=True)
... def construct(self, x, y):
... gradient_function = self.grad_op(self.net, self.params)
... return gradient_function(x, y)
>>>
>>> x = Tensor([[0.1, 0.6, 1.2], [0.5, 1.3, 0.1]], dtype=mstype.float32)
>>> y = Tensor([[0.12, 2.3, 1.1], [1.3, 0.2, 2.4], [0.1, 2.2, 0.3]], dtype=mstype.float32)
>>> output = GradNetWrtInputsAndParams(Net())(x, y)
>>> print(output)
((Tensor(shape=[2, 3], dtype=Float32, value=
[[ 3.51999998e+00, 3.90000010e+00, 2.59999990e+00],
[ 3.51999998e+00, 3.90000010e+00, 2.59999990e+00]]), Tensor(shape=[3, 3], dtype=Float32, value=
[[ 6.00000024e-01, 6.00000024e-01, 6.00000024e-01],
[ 1.89999998e+00, 1.89999998e+00, 1.89999998e+00],
[ 1.30000007e+00, 1.30000007e+00, 1.30000007e+00]])), (Tensor(shape=[1], dtype=Float32, value=
[ 1.29020004e+01]),))
"""
def __init__(self, get_all=False, get_by_list=False, sens_param=False):
"""Initialize GradOperation."""
if not isinstance(get_all, bool):
raise TypeError(f"For 'GradOperation', the 'get_all' should be bool, but got {type(get_all).__name__}")
if not isinstance(get_by_list, bool):
raise TypeError(f"For 'GradOperation', the 'get_by_list' should be bool, "
f"but got {type(get_by_list).__name__}")
if not isinstance(sens_param, bool):
raise TypeError(f"For 'GradOperation', the 'sens_param' should be bool, "
f"but got {type(sens_param).__name__}")
self.get_all = get_all
self.get_by_list = get_by_list
self.sens_param = sens_param
GradOperation_.__init__(self, 'grad', get_all, get_by_list, sens_param, False, False, False, False, False)
self.grad_fn = None
self.fn = None
self.weights_id = None
self.pynative_ = False
self.grad_position = (0,)
def __call__(self, fn, weights=None):
weights_id = ''
if context.get_context("mode") == context.GRAPH_MODE:
weights_id = _get_grad_weights_id(weights)
if self.grad_fn is not None and self.fn == fn and self.weights_id == weights_id:
return self.grad_fn
grad_ = GradOperation(self.get_all, self.get_by_list, self.sens_param)
# If calling Grad in GRAPH_MODE or calling Grad in functions decorated with 'jit', do grad in GRAPH_MODE
# If calling Grad in pure PYNATIVE_MODE do grad in PYNATIVE_MODE
# In pure PYNATIVE_MODE the out layer after_grad just used to set pynative flag for inner GradOperation.
# In PYNATIVE_MODE calling Grad from functions decorated with 'jit', use the out layer after_grad do
# grad in GRAPH_MODE.
if context.get_context("mode") == context.GRAPH_MODE:
dynamic_shape_inputs = None
if isinstance(fn, ms.nn.Cell):
dynamic_shape_inputs = fn.get_inputs()
fn.grad_ops_label = True
if self.get_by_list:
@jit(input_signature=dynamic_shape_inputs)
def after_grad(*args, **kwargs):
return grad_(fn, weights)(*args, **kwargs)
else:
@jit(input_signature=dynamic_shape_inputs)
def after_grad(*args, **kwargs):
return grad_(fn)(*args, **kwargs)
elif self.pynative_:
if not _pynative_executor.enable_grad():
raise RuntimeError("In no_grad context, you can not calculate gradient")
@_wrap_func
def after_grad(*args, **kwargs):
run_args = self._pynative_forward_run(fn, grad_, weights, *args, **kwargs)
out = _pynative_executor.grad(fn, grad_, weights, self.grad_position, *run_args)
out = _grads_divided_by_device_num_if_recomputation(out)
return out
else:
MSContext.get_instance()._set_not_convert_jit(True)
grad_.pynative_ = True
if not _pynative_executor.enable_grad():
raise RuntimeError("In no_grad context, you can not calculate gradient")
# after_grad of this branch can't use @jit, just directly call grad_
if self.get_by_list:
def after_grad(*args, **kwargs):
return grad_(fn, weights)(*args, **kwargs)
else:
def after_grad(*args, **kwargs):
return grad_(fn)(*args, **kwargs)
self.grad_fn = after_grad
self.fn = fn
self.weights_id = weights_id
return self.grad_fn
def _pynative_forward_run(self, fn, grad, weights, *args, **kwargs):
""" PyNative forward run to build grad graph. """
sens = None
if self.sens_param:
if 'sens' in kwargs.keys():
sens = kwargs.pop('sens')
else:
# default use args last elem as sens
sens = args[-1]
args = args[:-1]
run_args = args
if kwargs:
run_args = args + tuple(kwargs.values())
# check run exclude sens
if isinstance(fn, (FunctionType, MethodType)):
if not _pynative_executor.check_run(grad, fn, weights, None, *run_args):
_pynative_executor.set_grad_flag(True)
_pynative_executor.new_graph(fn, *args, **kwargs)
output = fn(*args, **kwargs)
_pynative_executor.end_graph(fn, output, *args, **kwargs)
else:
# Check if fn has run already
if not _pynative_executor.check_run(grad, fn, weights, None, *run_args):
requires_grad = fn.requires_grad
fn.requires_grad = True
fn(*args, **kwargs)
fn.requires_grad = requires_grad
# If it has sens, keep sens as the last element
if sens is not None:
run_args += (sens,) if sens is not isinstance(run_args, tuple) else sens
return run_args
class _TaylorOperation(TaylorOperation_):
"""
Generate the higher order derivatives function for the input function.
"""
def __init__(self):
"""Initialize TaylorOperation."""
TaylorOperation_.__init__(self, 'taylorgrad')
self.grad_fn = None
self.fn = None
def __call__(self, fn):
if self.grad_fn is not None and self.fn == fn:
return self.grad_fn
taylor_grad_ = _TaylorOperation()
# If calling Grad in GRAPH_MODE or calling Grad in functions decorated with 'jit', do grad in GRAPH_MODE
@jit
def after_taylor_grad(*args):
return taylor_grad_(fn)(*args)
self.grad_fn = after_taylor_grad
self.fn = fn
return self.grad_fn
def _combine_weight(grad_position, weights, out, out_with_ids):
""" Making resulting tuple for weight, when return_ids is set to True. """
weight_tuple = []
position = 0
if isinstance(weights, (list, ParameterTuple, tuple)) and grad_position:
for weight in weights:
weight_tuple.append((weight.name, out[1][position]))
position += 1
elif isinstance(weights, (list, ParameterTuple, tuple)):
for weight in weights:
weight_tuple.append((weight.name, out[position]))
position += 1
elif grad_position:
weight_tuple.append(weights.name)
weight_tuple.append(out[1])
else:
weight_tuple.append(weights.name)
weight_tuple.append(out)
if grad_position:
out_with_ids.append(tuple(weight_tuple))
else:
out_with_ids = weight_tuple
return out_with_ids
def _combine_position(grad_position, weights, out, out_with_ids):
""" Making resulting tuple for position, when return_ids is set to True. """
position_tuple = []
position = 0
if grad_position == (0,) and weights is not None:
position_tuple.append(0)
position_tuple.append(out[0])
elif grad_position == (0,):
position_tuple.append(0)
position_tuple.append(out)
elif weights is not None:
for index in grad_position:
position_tuple.append((index, out[0][position]))
position += 1
else:
for index in grad_position:
position_tuple.append((index, out[position]))
position += 1
if weights:
out_with_ids.append(tuple(position_tuple))
else:
out_with_ids = position_tuple
return out_with_ids
def _combine_with_ids(grad_position, weights, out):
""" Making resulting tuple, when return_ids is set to True. """
out_with_ids = []
if grad_position:
out_with_ids = _combine_position(
grad_position, weights, out, out_with_ids)
if weights is not None:
out_with_ids = _combine_weight(
grad_position, weights, out, out_with_ids)
if not out_with_ids:
raise ValueError(f"output tuple should not be a empty tuple.")
return tuple(out_with_ids)
class _Grad(GradOperation_):
"""
A higher-order function which is used to generate the gradient function by position for the input function.
"""
def __init__(self, get_all=False, get_by_list=False, sens_param=False, get_by_position=False, has_aux=False,
get_value=False, return_ids=False, merge_forward=False):
"""Initialize _Grad."""
if not isinstance(get_by_position, bool):
raise TypeError(f"For '_Grad', the 'get_by_position' should be bool, "
f"but got {type(get_by_position).__name__}")
if not isinstance(get_by_list, bool):
raise TypeError(f"For '_Grad', the 'get_by_list' should be bool, "
f"but got {type(get_by_list).__name__}")
if not isinstance(sens_param, bool):
raise TypeError(f"For '_Grad', the 'sens_param' should be bool, "
f"but got {type(sens_param).__name__}")
if not isinstance(has_aux, bool):
raise TypeError(f"For '_Grad', the 'has_aux' should be bool, "
f"but got {type(has_aux).__name__}")
if not isinstance(get_value, bool):
raise TypeError(f"For '_Grad', the 'get_value' should be bool, "
f"but got {type(get_value).__name__}")
if not isinstance(return_ids, bool):
raise TypeError(f"For '_Grad', the 'return_ids' should be bool, "
f"but got {type(return_ids).__name__}")
self.get_all = get_all
self.get_by_position = get_by_position
self.get_by_list = get_by_list
self.sens_param = sens_param
self.has_aux = has_aux
self.get_value = get_value
self.return_ids = return_ids
self.merge_forward = merge_forward
GradOperation_.__init__(self, 'grad', get_all, get_by_list, sens_param, get_by_position, has_aux, get_value,
return_ids, merge_forward)
self.grad_fn = None
self.fn = None
self.pynative_ = False
self.grad_position = None
self.weights_id = None
def __call__(self, fn, weights=None, grad_position=0):
weights_id = ''
if context.get_context("mode") == context.GRAPH_MODE:
weights_id = _get_grad_weights_id(weights)
if self.grad_fn is not None and self.fn == fn and self.grad_position == grad_position and \
self.weights_id == weights_id:
return self.grad_fn
def aux_fn(*args, **kwargs):
outputs = fn(*args, **kwargs)
if not isinstance(outputs, tuple) or len(outputs) < 2:
raise ValueError("When has_aux is True, origin fn requires more than one outputs.")
res = (outputs[0],)
stop_gradient = Primitive("StopGradient")
for item in outputs[1:]:
res += (stop_gradient(item),)
return res
grad_ = _Grad(self.get_all, self.get_by_list, self.sens_param, self.get_by_position, self.has_aux,
self.get_value, self.return_ids, self.merge_forward)
# If calling Grad in GRAPH_MODE or calling Grad in functions decorated with 'jit', do grad in GRAPH_MODE
# If calling Grad in pure PYNATIVE_MODE do grad in PYNATIVE_MODE
# In pure PYNATIVE_MODE the out layer after_grad just used to set pynative flag for inner GradOperation.
# In PYNATIVE_MODE calling Grad from functions decorated with 'jit', use the out layer after_grad do
# grad in GRAPH_MODE.
if context.get_context("mode") == context.GRAPH_MODE:
dynamic_shape_inputs = None
if isinstance(fn, ms.nn.Cell):
dynamic_shape_inputs = fn.get_inputs()
if self.get_by_position:
@jit(input_signature=dynamic_shape_inputs)
def after_grad(*args):
return grad_(fn, weights, grad_position)(*args)
else:
if self.get_by_list:
@jit(input_signature=dynamic_shape_inputs)
def after_grad(*args):
return grad_(fn, weights)(*args)
else:
@jit(input_signature=dynamic_shape_inputs)
def after_grad(*args):
return grad_(fn)(*args)
elif self.pynative_:
if not _pynative_executor.enable_grad():
raise RuntimeError("In no_grad context, you can not calculate gradient")
@_wrap_func
def after_grad(*args, **kwargs):
run_args, res = self._pynative_forward_run(fn, grad_, weights, *args, **kwargs)
out = _pynative_executor.grad(fn, grad_, weights, grad_position, *run_args)
out = _grads_divided_by_device_num_if_recomputation(out)
if self.return_ids and out:
out = _combine_with_ids(grad_position, weights, out)
if self.get_value:
return res, out
if self.has_aux:
return out, res[1:]
return out
else:
MSContext.get_instance()._set_not_convert_jit(True)
if not _pynative_executor.enable_grad():
raise RuntimeError("In no_grad context, you can not calculate gradient")
grad_.pynative_ = True
fn_ = fn
if self.has_aux:
fn_ = aux_fn
# after_grad of this branch can't use @jit, just directly call grad_
if self.get_by_position:
def after_grad(*args, **kwargs):
return grad_(fn_, weights, grad_position)(*args, **kwargs)
else:
if self.get_by_list:
def after_grad(*args, **kwargs):
return grad_(fn_, weights)(*args, **kwargs)
else:
def after_grad(*args, **kwargs):
return grad_(fn_)(*args, **kwargs)
self.grad_fn = after_grad
self.fn = fn
self.grad_position = grad_position
self.weights_id = weights_id
return self.grad_fn
def _pynative_forward_run(self, fn, grad, weights, *args, **kwargs):
""" PyNative forward runs to build grad graph. """
sens = None
if self.sens_param:
if 'sens' in kwargs.keys():
sens = kwargs.pop('sens')
else:
# default use args last elem as sens
sens = args[-1]
args = args[:-1]
run_args = args
if kwargs:
run_args = args + tuple(kwargs.values())
# check run exclude sens
outputs = ()
run_forward = False
if isinstance(fn, (FunctionType, MethodType)):
if not _pynative_executor.check_run(grad, fn, weights, self.grad_position, *run_args):
_pynative_executor.set_grad_flag(True)
_pynative_executor.new_graph(fn, *args, **kwargs)
outputs = fn(*args, **kwargs)
_pynative_executor.end_graph(fn, outputs, *args, **kwargs)
run_forward = True
else:
# Check if fn has run already.
if not _pynative_executor.check_run(grad, fn, weights, self.grad_position, *run_args):
requires_grad = fn.requires_grad
fn.requires_grad = True
outputs = fn(*args, **kwargs)
fn.requires_grad = requires_grad
run_forward = True
# If it has sens, keep sens as the last element
if sens is not None:
run_args += (sens,) if sens is not isinstance(run_args, tuple) else sens
# Normal run grad
if run_forward:
return run_args, outputs
if (self.get_value or self.has_aux) and not outputs:
outputs = fn(*args, **kwargs)
return run_args, outputs
class _Vmap(VmapOperation_):
"""
A higher-order function which is used to generate the vectorizing map function.
"""
def __init__(self):
"""Initialize _Vmap."""
VmapOperation_.__init__(self, 'vmap')
self.vmap_fn = None
self.fn = None
self.in_axes = None
self.out_axes = None
def __call__(self, fn, in_axes=0, out_axes=0):
if self.vmap_fn is not None and self.fn == fn and self.in_axes == in_axes and self.out_axes == out_axes:
return self.vmap_fn
vmap_ = self
@jit
def after_vmap(*args, **kwargs):
return vmap_(fn, in_axes, out_axes)(*args, **kwargs)
self.vmap_fn = after_vmap
self.fn = fn
self.in_axes = in_axes
self.out_axes = out_axes
return self.vmap_fn
[文档]class MultitypeFuncGraph(MultitypeFuncGraph_):
"""
MultitypeFuncGraph is a class used to generate overloaded functions, considering different types as inputs.
Initialize an `MultitypeFuncGraph` object with name, and use `register` with input types as the decorator
for the function to be registered. And the object can be called with different types of inputs,
and work with `HyperMap` and `Map`.
Args:
name (str): Operator name.
read_value (bool, optional): If the registered function do not need to set value on Parameter,
and all inputs will pass by value, set `read_value` to ``True`` . Default: ``False`` .
Raises:
ValueError: If failed to find a matching function for the given arguments.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> # `add` is a metagraph object which will add two objects according to
>>> # input type using ".register" decorator.
>>> from mindspore import Tensor
>>> from mindspore import dtype as mstype
>>> from mindspore import ops
>>>
>>> tensor_add = ops.Add()
>>> add = ops.MultitypeFuncGraph('add')
>>> @add.register("Number", "Number")
... def add_scala(x, y):
... return x + y
>>> @add.register("Tensor", "Tensor")
... def add_tensor(x, y):
... return tensor_add(x, y)
>>> output = add(1, 2)
>>> print(output)
3
>>> output = add(Tensor([0.1, 0.6, 1.2], dtype=mstype.float32), Tensor([0.1, 0.6, 1.2], dtype=mstype.float32))
>>> print(output)
[0.2 1.2 2.4]
"""
def __init__(self, name, read_value=False):
"""Initialize MultitypeFuncGraph."""
MultitypeFuncGraph_.__init__(self, name)
self.entries = list()
self.default_func = None
if read_value:
self.set_signatures((
sig.make_sig('args', sig.sig_rw.RW_READ, sig.sig_kind.KIND_VAR_POSITIONAL),))
def __call__(self, *args):
if callable(self.default_func):
return self.default_func(*args)
for arg in args:
if isinstance(arg, np.ndarray):
raise TypeError("For 'MultitypeFuncGraph', the input can not be numpy.ndarray")
if len(self.entries) == 1:
output = self.entries[0][1](*args)
return output
types = tuple(map(mstype.get_py_obj_dtype, args))
for sigs, fn in self.entries:
if len(sigs) != len(types):
continue
if any(not mstype._issubclass_(type_, sig) for sig, type_ in zip(sigs, types)): # pylint: disable=W0212
continue
output = fn(*args)
return output
raise ValueError(f"For 'MultitypeFuncGraph', cannot find fn match given args. Got (sigs, fn): {self.entries}, "
f"and (dtype, args): {types}.")
[文档] def register(self, *type_names):
"""
Register a function for the given type string.
Args:
type_names (Union[str, :class:`mindspore.dtype`]): Inputs type names or types list.
Return:
decorator, a decorator to register the function to run, when called under the
types described in `type_names`.
"""
def deco(fn):
def convert_type(type_input):
if isinstance(type_input, str):
return mstype.typing.str_to_type(type_input)
if not isinstance(type_input, mstype.Type):
raise TypeError(f"For 'MultitypeFuncGraph', register only support str or {mstype.Type}, but got "
f"'type_input': {type_input}.")
return type_input
types = tuple(map(convert_type, type_names))
self.register_fn(type_names, fn)
self.entries.append((types, fn))
return fn
return deco
def register_default(self):
def deco(fn):
self.default_func = fn
return fn
return deco
# pylint: disable=missing-docstring
def set_doc_url(self, doc_url):
self.set_doc_url_(doc_url)
def set_need_raise(self):
self.set_need_raise_()
[文档]class HyperMap(HyperMap_):
"""
HyperMap will apply the set operation to input sequences.
Apply the operations to every element of the sequence or nested sequence. Different
from `mindspore.ops.Map`, the `HyperMap` supports to apply on nested structure. The
`HyperMap` also supports dynamic sequences as input, but it does not extend this
support to nested dynamic sequences.
Args:
ops (Union[MultitypeFuncGraph, None], optional): `ops` is the operation to apply. If `ops` is `None`,
the operations should be put in the first input of the instance. Default is None.
reverse (bool, optional): The optimizer needs to be inverted in some scenarios to improve parallel
performance, general users please ignore. `reverse` is the flag to decide if apply
the operation reversely. Only supported in graph mode. Default is False.
Inputs:
- **args** (Tuple[sequence]) -
- If `ops` is not `None`, all the inputs should be sequences with the same length.
And each row of the sequences will be the inputs of the operation.
- If `ops` is `None`, the first input is the operation, and the others are inputs.
Note:
Except for the operation input, the number of inputs should be equal to the number of inputs to `ops`.
Outputs:
Sequence or nested sequence, the sequence of output after applying the function.
e.g. `operation(args[0][i], args[1][i])`, `operation` is the function assigned by `ops`.
Raises:
TypeError: If `ops` is neither :class:`mindspore.ops.MultitypeFuncGraph` nor None.
TypeError: If `args` is not a Tuple.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> from mindspore import Tensor, ops
>>> from mindspore import dtype as mstype
>>> nest_tensor_list = ((Tensor(1, mstype.float32), Tensor(2, mstype.float32)),
... (Tensor(3, mstype.float32), Tensor(4, mstype.float32)))
>>> # square all the tensor in the nested list
>>>
>>> square = ops.MultitypeFuncGraph('square')
>>> @square.register("Tensor")
... def square_tensor(x):
... return ops.square(x)
>>>
>>> common_map = ops.HyperMap()
>>> output = common_map(square, nest_tensor_list)
>>> print(output)
((Tensor(shape=[], dtype=Float32, value= 1), Tensor(shape=[], dtype=Float32, value= 4)),
(Tensor(shape=[], dtype=Float32, value= 9), Tensor(shape=[], dtype=Float32, value= 16)))
>>> square_map = ops.HyperMap(square, False)
>>> output = square_map(nest_tensor_list)
>>> print(output)
((Tensor(shape=[], dtype=Float32, value= 1), Tensor(shape=[], dtype=Float32, value= 4)),
(Tensor(shape=[], dtype=Float32, value= 9), Tensor(shape=[], dtype=Float32, value= 16)))
"""
def __init__(self, ops=None, reverse=False):
"""Initialize HyperMap."""
self.ops = ops
if ops:
HyperMap_.__init__(self, reverse, ops)
else:
HyperMap_.__init__(self, reverse)
def __call__(self, *args):
func = self.ops
args_list = args
hypermap = self
if self.ops is None:
func = args[0]
args_list = args[1:]
hypermap = partial(self, func)
# is leaf
if not isinstance(args_list[0], (tuple, list)):
return func(*args_list)
return tuple(map(hypermap, *args_list))
[文档]class Map(Map_):
"""
Map will apply the set operation on input sequences.
Apply the operations to every element of the sequence.
Args:
ops (Union[MultitypeFuncGraph, None]): `ops` is the operation to apply. If `ops` is `None`,
the operations should be put in the first input of the instance. Default: ``None`` .
reverse (bool): The optimizer needs to be inverted in some scenarios to improve parallel performance,
general users please ignore. `Reverse` is the flag to decide if apply the operation reversely.
Only supported in graph mode. Default is ``False`` .
Inputs:
- **args** (Tuple[sequence]) - If `ops` is not `None`, all the inputs should be the same length sequences,
and each row of the sequences. e.g. If the length of args is 2, and for `i` in length of each sequence
`(args[0][i], args[1][i])` will be the input of the operation.
If `ops` is `None`, the first input is the operation, and the other is the sequence.
Outputs:
Sequence, the sequence of output after applying the ops function. e.g. `ops(args[0][i], args[1][i])`.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> from mindspore import dtype as mstype
>>> from mindspore import Tensor, ops
>>> from mindspore.ops import MultitypeFuncGraph, Map
>>> tensor_list = (Tensor(1, mstype.float32), Tensor(2, mstype.float32), Tensor(3, mstype.float32))
>>> # square all the tensor in the list
>>>
>>> square = MultitypeFuncGraph('square')
>>> @square.register("Tensor")
... def square_tensor(x):
... return ops.square(x)
>>>
>>> common_map = Map()
>>> output = common_map(square, tensor_list)
>>> print(output)
(Tensor(shape=[], dtype=Float32, value= 1), Tensor(shape=[], dtype=Float32, value= 4),
Tensor(shape=[], dtype=Float32, value= 9))
>>> square_map = Map(square, False)
>>> output = square_map(tensor_list)
>>> print(output)
(Tensor(shape=[], dtype=Float32, value= 1), Tensor(shape=[], dtype=Float32, value= 4),
Tensor(shape=[], dtype=Float32, value= 9))
"""
def __init__(self, ops=None, reverse=False):
"""Initialize Map."""
self.ops = ops
if ops:
Map_.__init__(self, reverse, ops)
else:
Map_.__init__(self, reverse)
def __call__(self, *args):
func = self.ops
args_list = args
if self.ops is None:
func = args[0]
args_list = args[1:]
return tuple(map(func, *args_list))
class _ListAppend(ListAppend_):
"""
A metafuncgraph class that append one element to list.
Args:
name (str): The name of the metafuncgraph object.
"""
# `__init__` method removed entirely
def __call__(self, *args):
pass
_append = _ListAppend("append")
class _ListInsert(ListInsert_):
"""
A metafuncgraph class that insert one element to list.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _ListInsert."""
ListInsert_.__init__(self, name)
def __call__(self, *args):
pass
_insert = _ListInsert("insert")
class _ListPop(ListPop_):
"""
A metafuncgraph class that pop one element from list.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _ListPop."""
ListPop_.__init__(self, name)
def __call__(self, *args):
pass
_pop = _ListPop("pop")
class _ListClear(ListClear_):
"""
A metafuncgraph class that clear the list.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _ListClear."""
ListClear_.__init__(self, name)
def __call__(self, *args):
pass
_list_clear = _ListClear("clear")
class _ListReverse(ListReverse_):
"""
A metafuncgraph class that reverse the list.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _ListReverse."""
ListReverse_.__init__(self, name)
def __call__(self, *args):
pass
_reverse = _ListReverse("reverse")
class _ListExtend(ListExtend_):
"""
A metafuncgraph class that append another list to the end of the list.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _ListExtend."""
ListExtend_.__init__(self, name)
def __call__(self, *args):
pass
_extend = _ListExtend("extend")
class _DictSetItem(DictSetItem_):
"""
A metafuncgraph class that setitem for the dict.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _DictClear."""
DictSetItem_.__init__(self, name)
def __call__(self, *args):
pass
_dict_setitem = _DictSetItem("setitem")
class _DictClear(DictClear_):
"""
A metafuncgraph class that clear the dict.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _DictClear."""
DictClear_.__init__(self, name)
def __call__(self, *args):
pass
_dict_clear = _DictClear("clear")
class _DictHasKey(DictHasKey_):
"""
A metafuncgraph class that Check if key is in dict.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _DictHasKey."""
DictHasKey_.__init__(self, name)
def __call__(self, *args):
pass
_haskey = _DictHasKey("has_key")
class _DictUpdate(DictUpdate_):
"""
A metafuncgraph class that append another dict to the end of the dict.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _DictUpdate."""
DictUpdate_.__init__(self, name)
def __call__(self, *args):
pass
_update = _DictUpdate("update")
class _DictFromKeys(DictFromKeys_):
"""
A metafuncgraph class that creates a new dict from the given sequence and value.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _DictFromKeys."""
DictFromKeys_.__init__(self, name)
def __call__(self, *args):
pass
_fromkeys = _DictFromKeys("fromkeys")
class _Tail(Tail_):
"""
A metafuncgraph class that generates tail elements of the tuple.
Args:
name (str): The name of the metafuncgraph object.
"""
def __init__(self, name):
"""Initialize _Tail."""
Tail_.__init__(self, name)
def __call__(self, *args):
pass
tail = _Tail('tail')
class _ZipOperation(ZipOperation_):
"""Generates a tuple of zip iterations for inputs."""
def __init__(self, name):
"""Initialize _ZipOperation."""
ZipOperation_.__init__(self, name)
def __call__(self, *args):
pass
zip_operation = _ZipOperation('zip_operation')
"""`zip_operation` will generate a tuple of zip iterations of inputs."""
class _StarredGetItem(StarredGetItem_):
"""Generates a list of starred get_item for inputs."""
def __init__(self, name):
"""Initialize _StarredGetItem."""
StarredGetItem_.__init__(self, name)
def __call__(self, *args):
pass
starred_get_item = _StarredGetItem('starred_get_item')
"""`starred_get_item` will generate a list of starred get_item for inputs."""
class _StarredUnpack(StarredUnpack_):
"""Generates a tuple of starred unpack for inputs."""
def __init__(self, name):
"""Initialize _StarredUnpack."""
StarredUnpack_.__init__(self, name)
def __call__(self, *args):
pass
starred_unpack = _StarredUnpack('starred_unpack')
"""`starred_unpack` will generate a tuple of starred unpack for inputs."""
class _StarredUnpackMerge(StarredUnpackMerge_):
"""Generates a tuple of starred unpack merge for inputs."""
def __init__(self, name):
"""Initialize _StarredUnpackMerge."""
StarredUnpackMerge_.__init__(self, name)
def __call__(self, *args):
pass
starred_unpack_merge = _StarredUnpackMerge('starred_unpack_merge')
"""`starred_unpack_merge` will generate a tuple of starred unpack merge for inputs."""
class _IterConverter(IterConverter_):
"""Convert input to interable object"""
def __init__(self, name):
"""Initialize _IterConverter."""
IterConverter_.__init__(self, name)
def __call__(self, *args):
pass
iter_converter = _IterConverter('iter_converter')
"""`iter_converter` will convert input to ietrable object"""
class _HasNext(HasNext_):
"""Check whether the input has next value"""
def __init__(self, name):
"""Initialize _HasNext."""
HasNext_.__init__(self, name)
def __call__(self, *args):
pass
ms_hasnext = _HasNext('has_next')
"""`ms_hasnext` will check whether the input has next value"""
class _Next(Next_):
"""Get next element and res elements for input"""
def __init__(self, name):
"""Initialize _Next."""
Next_.__init__(self, name)
def __call__(self, *args):
pass
ms_next = _Next('next')
"""`ms_next` will get next element and res elements for input"""