# Copyright 2021-2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Custom operator"""
from __future__ import absolute_import
import json
import os
import re
import ast
import hashlib
import stat
import inspect
import importlib
import platform
import subprocess
import numpy as np
from mindspore._c_expression import Oplib, typing
from mindspore import context
from mindspore.common import Tensor
from mindspore.common import dtype as mstype
from mindspore.ops import DataType
from mindspore import log as logger
from mindspore import ops
from mindspore.communication.management import get_rank, GlobalComm
from ._ms_kernel import determine_variable_usage
from ._custom_grad import autodiff_bprop
from ._pyfunc_registry import add_pyfunc
if platform.system() != "Windows":
import fcntl
def _get_cache_path():
"""
Automatically get the path to kernel_meta
Returns:
str, the path to the dir of kernel_meta.
"""
cache_path = os.getenv('MS_COMPILER_CACHE_PATH')
if cache_path is None:
cache_path = "./kernel_meta/"
elif cache_path[-1] != "/":
cache_path = cache_path + "/"
if not os.path.exists(cache_path):
os.makedirs(cache_path, exist_ok=True)
return cache_path
def _compile_aot(file):
"""
Automatically compile the source file for custom aot
Args:
file (str): The path to the source file.
Returns:
str, the path to the compiled library.
"""
cache_path = _get_cache_path()
# for distributed case, we create folders separately to avoid conflict
if GlobalComm.INITED:
cache_path = os.path.join(cache_path, "rank_" + str(get_rank()), "")
os.makedirs(cache_path, exist_ok=True)
search_res = importlib.util.find_spec("mindspore")
if search_res is None:
raise RuntimeError("Cannot find mindspore module!")
res_path = search_res.origin
find_pos = res_path.find("__init__.py")
if find_pos == -1:
raise RuntimeError(
"Find module mindspore __init__.py file failed!")
include_file = "-I{}include/api/".format(res_path[:find_pos])
file_name = file.split('/')[-1]
func_path = cache_path + file_name + ".so"
if not os.path.exists(func_path):
if file.endswith("cpp") or file.endswith("cc"):
cmd = ["g++", "-std=c++17", "--shared", "-fPIC"]
cmd += [include_file, "-o", func_path, file]
elif file.endswith("cu"):
cmd = ["nvcc"]
cmd += ["--shared", "-Xcompiler", "-fPIC", "-O3", "-gencode", "arch=compute_70, code=sm_70"]
cmd += ["--use_fast_math", "--expt-relaxed-constexpr", "--expt-extended-lambda"]
def _get_cuda_bare_metal_version():
raw_output = subprocess.check_output(["nvcc", "-V"],
universal_newlines=True)
output = raw_output.split()
release_idx = output.index("release") + 1
release = output[release_idx].split(".")
version_major = release[0]
return int(version_major)
if _get_cuda_bare_metal_version() >= 11:
cmd += ["-gencode", "arch=compute_80,code=sm_80"]
cmd += [include_file, "-o", func_path, file]
else:
raise ValueError("The source file must be a cc/cpp/cu file, but get: {}".format(file))
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
(out, _) = proc.communicate(timeout=30)
if proc.returncode != 0:
msg = "Compilation error in compiling {}:\n".format(file)
msg += out.decode('utf-8')
raise RuntimeError(msg)
if not bytearray(open(func_path, "rb").read()):
raise RuntimeError(
"Compilation error: empty result is generated for {}".format(file))
return func_path
[文档]class Custom(ops.PrimitiveWithInfer):
r"""
`Custom` primitive is used for user defined operators and is to enhance the expressive ability of built-in
primitives. You can construct a `Custom` object with a predefined function, which describes the computation
logic of a user defined operator. You can also construct another `Custom` object with another predefined
function if needed. Then these `Custom` objects can be directly used in neural networks.
.. warning::
This is an experimental prototype that is subject to change.
.. note::
The supported platforms are determined by the input `func_type`:
- "hybrid": supports ["Ascend", "GPU", "CPU"].
- "akg": supports ["Ascend", "GPU", "CPU"].
- "tbe": supports ["Ascend"].
- "aot": supports ["GPU", "CPU"].
- "pyfunc": supports ["CPU"].
- "julia": supports ["CPU"].
- "aicpu": supports ["Ascend"].
Args:
func (Union[function, str]):
- function: If func is of function type, then func should be a Python function which describes the
computation logic of a user defined operator. The function can be one of the following:
1. A AKG operator implementation function, which can use ir builder/tvm compute/hybrid grammar.
2. A TBE operator implementation function.
3. A pure python function
4. An ms_kernel decorated function written by the Hybrid DSL.
- str: If func is of str type, then str should be a path of file along with a function name.
This could be used when func_type is "aot" or "julia".
1. for "aot":
Currently "aot" supports GPU/CPU(linux only) platform.
"aot" means ahead of time, in which case Custom directly launches user defined "xxx.so" file as an
operator. Users need to compile a handwriting "xxx.cu"/"xxx.cc" file into "xxx.so" ahead of time,
and offer the path of the file along with a function name.
- "xxx.so" file generation:
1) GPU Platform: Given user defined "xxx.cu" file (ex. "{path}/add.cu"), use nvcc command to compile
it.(ex. "nvcc --shared -Xcompiler -fPIC -o add.so add.cu")
2) CPU Platform: Given user defined "xxx.cc" file (ex. "{path}/add.cc"), use g++/gcc command to
compile it.(ex. "g++ --shared -fPIC -o add.so add.cc")
- Define a "xxx.cc"/"xxx.cu" file:
"aot" is a cross-platform identity. The functions defined in "xxx.cc" or "xxx.cu" share
the same args. Typically, the function should be as:
.. code-block::
int func(int nparam, void **params, int *ndims, int64_t **shapes, const char **dtypes,
void *stream, void *extra)
Parameters:
- nparam(int): total number of inputs plus outputs; suppose the operator has 2 inputs and 3 outputs,
then nparam=5
- params(void \*\*): a pointer to the array of inputs and outputs' pointer; the pointer type of
inputs and outputs is void \* ; suppose the operator has 2 inputs and 3 outputs, then the first
input's pointer is params[0] and the second output's pointer is params[3]
- ndims(int \*): a pointer to the array of inputs and outputs' dimension num; suppose params[i] is a
1024x1024 tensor and params[j] is a 77x83x4 tensor, then ndims[i]=2, ndims[j]=3.
- shapes(int64_t \*\*): a pointer to the array of inputs and outputs' shapes(int64_t \*); the ith
input's jth dimension's size is shapes[i][j](0<=j<ndims[i]); suppose params[i] is a 2x3 tensor and
params[j] is a 3x3x4 tensor, then shapes[i][0]=2, shapes[j][2]=4.
- dtypes(const char \*\*): a pointer to the array of inputs and outputs' types(const char \*);
(ex. "float32", "float16", "float", "float64", "int", "int8", "int16", "int32", "int64", "uint",
"uint8", "uint16", "uint32", "uint64", "bool")
- stream(void \*): stream pointer, only used in cuda file
- extra(void \*): used for further extension
Return Value(int):
- 0: MindSpore will continue to run if this aot kernel is successfully executed
- others: MindSpore will raise exception and exit
Examples: see details in tests/st/ops/graph_kernel/custom/aot_test_files/
- Use it in Custom:
.. code-block::
Custom(func="{dir_path}/{file_name}:{func_name}",...)
(ex. Custom(func="./reorganize.so:CustomReorganize", out_shape=[1], out_dtype=mstype.float32,
"aot"))
2. for "julia":
Currently "julia" supports CPU(linux only) platform.
For julia use JIT compiler, and julia support c api to call julia code.
The Custom can directly launches user defined "xxx.jl" file as an operator.
Users need to write a "xxx.jl" file which include modules and functions,
and offer the path of the file along with a module name and function name.
Examples: see details in tests/st/ops/graph_kernel/custom/julia_test_files/
- Use it in Custom:
.. code-block::
Custom(func="{dir_path}/{file_name}:{module_name}:{func_name}",...)
(ex. Custom(func="./add.jl:Add:add", out_shape=[1], out_dtype=mstype.float32, "julia"))
out_shape (Union[function, list, tuple]): The output shape infer function or the value of output shape of
`func`. Default: None.
If func has single output, then the value of output shape is a list or tuple of int.
If func has multiple outputs, then the value of output shape is a tuple, each item represents the shape
of each output.
The input can be None only when the func_type input is "hybrid". In this case, the automatic infer
shape mechanic will be enabled.
out_dtype (Union[function, :class:`mindspore.dtype`, tuple[:class:`mindspore.dtype`]]): The output data type
infer function or the value of output data type of `func`. Default: None.
If func has single output, then the value of output shape is a `mindspore.dtype`.
If func has multiple outputs, then the value of output shape is a tuple of `mindspore.dtype`, each item
represents the data type of each output.
The input can be None only when the func_type input is "hybrid". In this case, the automatic infer
value mechanic will be enabled.
func_type (str): The implementation type of `func`, should be one of
["hybrid", "akg", "tbe", "aot", "pyfunc", "julia", "aicpu"].
Each `func_type` only supports specific platforms(targets). Default: "hybrid".
The supported platforms of `func_type`:
- "hybrid": supports ["Ascend", "GPU", "CPU"].
- "akg": supports ["Ascend", "GPU", "CPU"].
- "tbe": supports ["Ascend"].
- "aot": supports ["GPU", "CPU"].
- "pyfunc": supports ["CPU"].
- "julia": supports ["CPU"].
- "aicpu": supports ["Ascend"].
bprop (function): The back propagation function of `func`. Default: None.
reg_info (Union[str, dict, list, tuple]): Represents the registration information(reg info) of `func` with
json format of type str or dict. The reg info specifies supported data types and formats of inputs and
outputs, attributes and target of `func`. Default: None.
If reg info is a list or tuple, then each item should be with json format of type str or dict, which
represents the registration information of `func` in a specific target. You need to invoke `CustomRegOp`
or the subclass of `RegOp` to generate the reg info for `func`. Then you can invoke
`custom_info_register` to bind the reg info to `func` or just pass the reg info to `reg_info` parameter.
The `reg_info` parameter takes higher priority than `custom_info_register` and the reg info in a
specific target will be registered only once.
If reg info is not set, then we will infer the data types and formats from the inputs of `Custom` operator.
Please note that, if `func_type` is "tbe" or the `func` only supports some specified data types and formats,
or it has attribute inputs, then you should set the reg info for `func`.
Inputs:
- **input** (Union(tuple, list)) - The input tuple or list is made up of multiple tensors, and attributes
value(optional).
Outputs:
Tensor or tuple[Tensor], execution results.
Raises:
TypeError: If the type of `func` is invalid or the type of register information for `func` is invalid.
ValueError: If `func_type` is invalid.
ValueError: If the register information is invalid, including the target is not supported, the input numbers
or the attributes of `func` differs in different targets.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> import mindspore.ops as ops
>>> import numpy as np
>>> from mindspore.ops import CustomRegOp, custom_info_register, DataType, ms_kernel
>>> from mindspore.common import dtype as mstype
>>> from mindspore.nn import Cell
>>> input_x = Tensor(np.ones([16, 16]).astype(np.float32))
>>> input_y = Tensor(np.ones([16, 16]).astype(np.float32))
>>>
>>> # Example, func_type = "hybrid"
>>> # This is the default func_type in Custom,
>>> # and both out_shape and out_dtype can be None(default value).
>>> # In this case, the input func must be a function written in the Hybrid DSL
>>> # and decorated by @ms_kernel.
>>> @ms_kernel
... def outer_product_script(a, b):
... c = output_tensor(a.shape, a.dtype)
... for i0 in range(a.shape[0]):
... for i1 in range(b.shape[1]):
... c[i0, i1] = 0.0
... for i2 in range(a.shape[1]):
... c[i0, i1] = c[i0, i1] + (a[i0, i2] * b[i2, i1])
... return c
>>>
>>> test_op_hybrid = ops.Custom(outer_product_script)
>>> output = test_op_hybrid(input_x, input_y)
>>> # the result will be a 16 * 16 tensor with all elements 16
>>> print(output.shape)
(16, 16)
>>> # Example, func_type = "akg"
>>> def outer_product(a_1, b_1):
... d = output_tensor(a_1.shape, a_1.dtype)
... for i0 in range(a_1.shape[0]):
... for i1 in range(b_1.shape[1]):
... d[i0, i1] = 0.0
... for i2 in range(a.shape[1]):
... d[i0, i1] = d[i0, i1] + (a_1[i0, i2] * b_1[i2, i1])
... return d
>>>
>>> # Example, func_type = "tbe"
>>> square_with_bias_op_info = CustomRegOp() \
... .fusion_type("OPAQUE") \
... .attr("bias", "required", "float") \
... .input(0, "x") \
... .output(0, "y") \
... .dtype_format(DataType.F32_Default, DataType.F32_Default) \
... .dtype_format(DataType.F16_Default, DataType.F16_Default) \
... .target("Ascend") \
... .get_op_info()
>>>
>>> @custom_info_register(square_with_bias_op_info)
... def square_with_bias(input_x, output_y, bias=0.0, kernel_name="square_with_bias"):
... import te.lang.cce
... from te import tvm
... from topi.cce import util
...
... shape = input_x.get("shape")
... dtype = input_x.get("dtype").lower()
...
... shape = util.shape_refine(shape)
... data = tvm.placeholder(shape, name="data", dtype=dtype)
...
... with tvm.target.cce():
... res0 = te.lang.cce.vmul(data, data)
... res = te.lang.cce.vadds(res0, bias)
... sch = te.lang.cce.auto_schedule(res)
...
... config = {"print_ir": False,
... "name": kernel_name,
... "tensor_list": [data, res]}
...
... te.lang.cce.cce_build_code(sch, config)
>>>
>>> def test_tbe():
... square_with_bias = ops.Custom(square_with_bias, out_shape=lambda x, _: x, \
... out_dtype=lambda x, _: x, func_type="tbe")
... res = self.square_with_bias(input_x, 1.0)
... return res
>>>
>>> # Example, func_type = "aicpu"
>>> resize_bilinear_op_info = CustomRegOp("ResizeBilinear") \
... .fusion_type("OPAQUE") \
... .input(0, "input", "required") \
... .output(1, "output", "required") \
... .attr("align_corners", "required", "bool") \
... .attr("cust_aicpu", "optional", "str", "aicpu_kernels") \
... .dtype_format(DataType.F32_Default, DataType.F32_Default) \
... .dtype_format(DataType.F16_Default, DataType.F32_Default) \
... .target("Ascend") \
... .get_op_info()
>>>
>>> @custom_info_register(resize_bilinear_op_info)
... def resize_bilinear_aicpu():
... return
>>>
>>> def test_aicpu(x):
... resize_bilinear_op = ops.Custom(resize_bilinear_aicpu, out_shape=[1, 1, 9, 9], \
... out_dtype=mstype.float32, func_type="aicpu")
... res = resize_bilinear_op(x, True, "aicpu_kernels")
... return res
>>>
>>> # Example, func_type = "aot"
>>> def test_aot(x, y, out_shapes, out_types):
... program = ops.Custom("./reorganize.so:CustomReorganize", out_shapes, out_types, "aot")
... out = program(x, y)
... return out
>>>
>>> # Example, func_type = "pyfunc"
>>> def func_multi_output(x1, x2):
... return (x1 + x2), (x1 - x2)
>>>
>>> test_pyfunc = ops.Custom(func_multi_output, lambda x, _: (x, x), lambda x, _: (x, x), "pyfunc")
>>> output = test_pyfunc(input_x, input_y)
>>>
>>> # Example, func_type = "julia"
>>> # julia code:
>>> # add.jl
>>> # module Add
>>> # function add(x, y, z)
>>> # z .= x + y
>>> # return z
>>> # end
>>> # end
>>> def test_julia(x, y, out_shapes, out_types):
... program = ops.Custom("./add.jl:Add:add", out_shapes, out_types, "julia")
... out = program(x, y)
... return out
"""
registered_func = {}
attr_dict = {} # Save input_names and attr_names for func.
def __init__(self, func, out_shape=None, out_dtype=None, func_type="hybrid", bprop=None, reg_info=None):
ops.PrimitiveWithInfer.__init__(self, "Custom")
self.supported_targets = ["Ascend", "GPU", "CPU"]
self.supported_func_type = ["hybrid", "akg", "tbe", "aicpu", "aot", "pyfunc", "julia"]
self.log_prefix = "For '{}', 'func_type': {}, 'func': {}".format(self.name, func_type, func)
self.func = func
self.func_type = func_type
self.func_name = ""
self.uniq_name = ""
self.imply_path = ""
self.func_source_str = ""
self._func_compile_attrs = {}
self._is_ms_kernel = False
self._check_func()
self._update_func_info()
self.add_prim_attr("func_name", self.func_name)
self.add_prim_attr("uniq_name", self.uniq_name)
if self.func_type == "hybrid":
self.add_prim_attr("func_compile_attrs", self._func_compile_attrs)
self.add_prim_attr("imply_path", self.imply_path)
if self.func_type == "pyfunc":
func_id = id(self.func)
add_pyfunc(func_id, self.func)
self.add_prim_attr("fn_id", func_id)
self.out_shape = out_shape
self.out_dtype = out_dtype
self.bprop = bprop
self.fake_output = False
self.single_scalar_output = False
if not self.out_dtype:
self.fake_output = True
elif not self.out_shape:
self.single_scalar_output = True
self.add_prim_attr("fake_output", self.fake_output)
self.add_prim_attr("single_scalar_output", self.single_scalar_output)
# Register info
self._register_info(reg_info)
if func_type == "akg":
self.add_prim_attr('func_source_str', self.func_source_str)
if "ir_builder" in self.func_source_str:
self.func_type = "ir_builder"
elif "compute" in self.func_source_str:
self.func_type = "tvm_compute"
else:
self.func_type = "hybrid"
self._hybrid_func_analyser()
if not self.bprop and self.func_type == "hybrid":
self._hybrid_autodiff(func_type)
self.add_prim_attr("func_type", self.func_type)
self._update_attr()
def __infer__(self, *args):
if callable(self.out_shape):
infer_shape = self.out_shape(*(x["shape"] for x in args))
else:
infer_shape = self.out_shape
if callable(self.out_dtype):
infer_dtype = self.out_dtype(*(x["dtype"] for x in args))
else:
infer_dtype = self.out_dtype
infer_value = None
# deal with the case of ms script
# enable auto infer function if any infer information is missing
if self._is_ms_kernel and (infer_dtype is None or infer_shape is None):
logger.warning("{}, 'out_shape' or 'out_dtype' is None, infer the output shape and output dtype "
"automatically. There might be some Python RuntimeWarning but it wouldn't influence the "
"result.".format(self.log_prefix))
auto_infer_result = self._auto_infer(*args)
# use automatically inferred shape/dtype if the input infer values are null
infer_shape = auto_infer_result[0] if infer_shape is None else infer_shape
infer_dtype = auto_infer_result[1] if infer_dtype is None else infer_dtype
infer_value = auto_infer_result[2]
# deal with case that the custom op is of type pyfunc with empty output
if self.func_type == "pyfunc":
if infer_shape == ():
logger.warning("{}, 'out_shape' is an empty tuple. Add a placeholder instead. "
"Not recommend to use it as it could be any uninitialized data.".format(self.log_prefix))
infer_shape = (1,)
if infer_dtype == ():
logger.warning("{}, 'out_dtype' is an empty tuple. Add a placeholder instead. "
"Not recommend to use it as it could be any uninitialized data.".format(self.log_prefix))
infer_dtype = mstype.int32
# after all automatic infer information fulfillment, throw error if infer_shape/infer_dtype is still None
if not isinstance(infer_shape, (tuple, list)):
raise TypeError("{}, 'out_shape' must be one of [tuple, list, function], but got {}"
.format(self.log_prefix, type(infer_shape)))
if not isinstance(infer_dtype, (typing.Type, tuple, list)):
raise TypeError("{}, 'out_dtype' must be one of [mindspore.dtype, tuple, list, function], but got {}"
.format(self.log_prefix, type(infer_dtype)))
out = {
"shape": infer_shape,
"dtype": infer_dtype,
"value": infer_value,
}
return out
def get_bprop(self):
return self.bprop
def _check_julia_func(self):
"""Check the validity of julia func"""
if not isinstance(self.func, str):
raise TypeError("{}, 'func' must be of type str, but got {}".format(self.log_prefix, type(self.func)))
if self.func.count(':') != 2:
raise ValueError("{}, the format of 'func' must be file:module:func".format(self.log_prefix))
source_file, module, func = self.func.split(':')
with open(source_file, 'r') as f:
jl = f.read()
if 'module ' + module not in jl:
raise Exception("{}, module {} is not found in source file {}!"
.format(self.log_prefix, module, source_file))
if 'function ' + func not in jl:
raise Exception("{}, function {} is not found in source file {}!"
.format(self.log_prefix, func, source_file))
def _check_func(self):
"""Check the validity of func_type and type of func"""
if self.func_type not in self.supported_func_type:
raise ValueError("{}, 'func_type' must be one of {}, but got {}"
.format(self.log_prefix, self.supported_func_type, self.func_type))
if self.func_type == "aot":
if not isinstance(self.func, str):
raise TypeError("{}, 'func' must be of type str, but got {}".format(
self.log_prefix, type(self.func)))
file_name_list = self.func.split(":")
if len(file_name_list) != 2:
raise TypeError(
"{}, 'func' should be like 'file_name:func_name', but got {}".format(
self.log_prefix, self.func))
if not file_name_list[0].endswith("so"):
file_path = _compile_aot(file_name_list[0])
self.func = file_path + ":" + file_name_list[1]
elif self.func_type == "julia":
self._check_julia_func()
elif self.func_type == "hybrid":
if not hasattr(self.func, "ms_kernel_flag"):
raise TypeError("{}, 'func' must be a function decorated by ms_kernel".format(self.log_prefix))
self._is_ms_kernel = True
self._func_compile_attrs = getattr(self.func, "compile_attrs", {})
elif self.func_type == "akg":
if hasattr(self.func, "ms_kernel_flag"):
logger.warning("{}. To have a better user experience, the mode hybrid is suggested "
"for the input function with decorator @ms_kernel. "
"To enable this mode, set the 'func_type' to be \"hybrid\"".format(self.log_prefix))
elif self.func_type == "pyfunc":
if hasattr(self.func, "ms_kernel_flag"):
logger.warning("{}. Now you are using the function with decorator @ms_kernel in the mode pyfunc. "
"The kernel will be executed as a native python function, which might lead to "
"low efficiency. To accelerate the kernel, set the 'func_type' to be \"hybrid\""
.format(self.log_prefix))
else:
if not callable(self.func):
raise TypeError("{}, 'func' must be of type function, but got {}"
.format(self.log_prefix, type(self.func)))
def _update_func_info(self):
"""Update information of func"""
if callable(self.func):
# For the func_type other then hybrid, get the original function if func is decorated
if "__wrapped__" in self.func.__dict__ and self.func_type not in ["hybrid", "pyfunc"]:
self.func = self.func.__dict__["__wrapped__"]
# func name
self.func_name = self.func.__name__
# source code of func, not include the decorator before def
self.func_source_str = inspect.getsource(self.func)
index = self.func_source_str.find("def ")
if index != -1:
self.func_source_str = self.func_source_str[index:]
op_imply_path = os.path.realpath(_get_cache_path() + self.func_name + ".py")
if os.path.exists(op_imply_path):
os.remove(op_imply_path)
with open(op_imply_path, 'at') as file:
if platform.system() != "Windows":
fcntl.flock(file.fileno(), fcntl.LOCK_EX)
file.seek(0, 2)
if file.tell() == 0:
file.write(self.func_source_str)
os.chmod(op_imply_path, stat.S_IRUSR | stat.S_IWUSR)
# path of func
self.imply_path = op_imply_path
if self._is_ms_kernel:
# static check for the Hybrid DSL in hybrid
root = ast.parse(self.func_source_str)
inplace_assign_output = determine_variable_usage(root, self.func_name)
if inplace_assign_output:
self.add_prim_attr("inplace_assign_output",
" ".join((str(j)
for i in inplace_assign_output
for j in i)))
self.add_prim_attr('func_source_str', self.func_source_str)
# unique func name
sha256 = hashlib.sha256()
sha256.update(self.imply_path.encode("utf-8"))
sha256.update(self.func_source_str.encode("utf-8"))
hash_str = sha256.hexdigest()
self.uniq_name = self.name + "_" + self.func_name + "_" + hash_str
elif isinstance(self.func, str):
# func name
self.func_name = self.func
# uniq func name
self.uniq_name = self.name + "_" + self.func_name
else:
raise TypeError("For '{}', 'func' must be of type function or str, but got {}"
.format(self.name, type(self.func)))
def _register_info(self, info):
"""Register reg_info."""
reg_info = info
if reg_info is None and hasattr(self.func, "reg_info"):
reg_info = getattr(self.func, "reg_info")
if self.func_type == "aicpu" and reg_info is None:
raise ValueError("{}, the registration information must be set, but current is None. Use 'CustomRegOp' to "
"generate the registration information, then pass it to 'reg_info'"
.format(self.log_prefix))
reg_info_list = self._get_expanded_list(reg_info)
for reg_info in reg_info_list:
if not isinstance(reg_info, (str, dict)):
continue
if isinstance(reg_info, str):
reg_info = json.loads(reg_info)
if self.fake_output:
reg_info["outputs"].append(dict({"index": 0, "name": "y", "param_type": "required"}))
new_dtype_format = []
for i in reg_info["dtype_format"]:
new_dtype_format.append(i + (DataType.I32_Default,))
reg_info["dtype_format"] = new_dtype_format
for _, item in enumerate(reg_info.get("outputs", [])):
output_name_list = []
if isinstance(item, dict) and item.get("name"):
output_name_list.append(item.get("name"))
self.add_prim_attr("output_names", output_name_list)
if isinstance(reg_info.get("op_name"), str):
self.add_prim_attr("reg_op_name", reg_info.get("op_name"))
target = self._get_target(reg_info)
# Reg info for func is only registered once for a certain target
if self._has_registered(target):
continue
# Register
reg_info = self._reformat_reg_info(reg_info, target)
reg_info_str = json.dumps(reg_info)
op_lib = Oplib()
if not op_lib.reg_op(reg_info_str, self.imply_path):
raise ValueError("{}, the registration information is registered failed. Use 'CustomRegOp' to "
"generate the registration information, then pass it to 'reg_info' or use "
"'custom_info_register' to bind it to 'func' if 'func' is a function."
.format(self.log_prefix))
self._save_attr(reg_info)
self._save_register_status(target)
def _get_expanded_list(self, data):
"""Recursive function to parse elements in list or tuple."""
data_list = []
if isinstance(data, (list, tuple)):
for i in data:
tmp_list = self._get_expanded_list(i)
for ii in tmp_list:
data_list.append(ii)
else:
data_list.append(data)
return data_list
def _get_registered_targets(self):
"""Get the registered targets of func."""
targets = []
if callable(self.func):
targets = getattr(self.func, "registered_targets", [])
elif isinstance(self.func, str):
targets = Custom.registered_func.get(self.func, [])
if not isinstance(targets, list):
targets = [targets]
return targets
def _has_registered(self, target):
"""Check if registration information is registered in target."""
registered_targets = self._get_registered_targets()
return target in registered_targets
def _save_register_status(self, target):
"""Save registration status for target."""
if callable(self.func):
registered_targets = getattr(self.func, "registered_targets", [])
registered_targets.append(target)
setattr(self.func, "registered_targets", registered_targets)
elif isinstance(self.func, str):
if isinstance(Custom.registered_func.get(self.func), list):
Custom.registered_func[self.func].append(target)
else:
Custom.registered_func[self.func] = [target]
def _get_op_name(self, reg_info):
if self.func_type == "aicpu":
self.uniq_name = reg_info["op_name"]
self.add_prim_attr("uniq_name", self.uniq_name)
return self.uniq_name
def _reformat_reg_info(self, reg_info, target):
"""Reformat registration information."""
if not isinstance(reg_info, dict):
raise TypeError("{}, the registration information must be of type dict, but got {} with type {}. Use "
"'CustomRegOp' to generate the registration information, then pass it to 'reg_info' or "
"use 'custom_info_register' to bind it to 'func' if 'func' is a function."
.format(self.log_prefix, reg_info, type(reg_info)))
reg_info["op_name"] = self._get_op_name(reg_info)
reg_info["imply_type"] = self._get_imply_type(reg_info, target)
if not isinstance(reg_info.get("fusion_type"), str) or not reg_info["fusion_type"].strip():
reg_info["fusion_type"] = "OPAQUE"
# Supplement necessary info for TBE if these information is missing in reg_info
if reg_info["imply_type"] == "TBE":
if reg_info.get("attr") is not None and isinstance(reg_info["attr"], list):
for i, item in enumerate(reg_info["attr"]):
if isinstance(item, dict) and item.get("value") is None:
reg_info["attr"][i]["value"] = "all"
reg_info["async_flag"] = reg_info.get("async_flag", False)
reg_info["binfile_name"] = "%s.so" % self.func_name
reg_info["compute_cost"] = reg_info.get("compute_cost", 10)
reg_info["kernel_name"] = self.func_name
reg_info["partial_flag"] = reg_info.get("partial_flag", True)
reg_info["need_check_supported"] = reg_info.get("need_check_supported", False)
# Supplement necessary info for AKG if these information is missing in reg_info
if reg_info["imply_type"] == "AKG":
target_to_processor = {"Ascend": "AiCore", "GPU": "CUDA", "CPU": "CPU"}
reg_info["processor"] = reg_info.get("processor", target_to_processor.get(target))
if self.func_type == "aot":
if reg_info.get("attr") is not None and isinstance(reg_info["attr"], list):
for i, item in enumerate(reg_info["attr"]):
if isinstance(item, dict) and item.get("value") is not None:
self.add_prim_attr(reg_info["attr"][i]["name"], reg_info["attr"][i]["value"])
reg_info["attr"] = []
return reg_info
def _get_target(self, reg_info):
"""Get target information."""
target = None
if isinstance(reg_info, dict):
# Get target from reg_info["target"]
target = reg_info.get("target")
# Infer target from reg_info["processor"], reg_info generated from AkgGpuRegOp or AkgAscendRegOp
# will have the processor information.
if target not in self.supported_targets:
processor_to_target = {"AiCore": "Ascend", "CUDA": "GPU", "CPU": "CPU"}
target = processor_to_target.get(reg_info.get("processor"))
# Infer target from reg_info["imply_type"]
if target not in self.supported_targets:
imply_type_to_target = {"TBE": "Ascend", "GPU": "GPU", "CPU": "CPU"}
target = imply_type_to_target.get(reg_info.get("imply_type"))
# Infer target from func_type
if target not in self.supported_targets:
func_type_to_target = {"tbe": "Ascend", "pyfunc": "CPU"}
target = func_type_to_target.get(self.func_type)
if target not in self.supported_targets:
raise ValueError("{}, target set in registration information must be one of {}, but got {}"
.format(self.log_prefix, self.supported_targets, target))
return target
def _get_imply_type(self, reg_info, target):
"""Get imply_typ information."""
# Get imply_type from reg_info["imply_type"]
if isinstance(reg_info, dict) and isinstance(reg_info.get("imply_type"), str) and \
reg_info["imply_type"].strip():
return reg_info["imply_type"]
# Infer imply_type from func_type
func_type_to_imply_type = {"hybrid": "AKG", "akg": "AKG", "tbe": "TBE", "aicpu": "AiCPU", "aot": target,
"pyfunc": target, "julia": target}
return func_type_to_imply_type.get(self.func_type, "AKG")
def _save_attr(self, reg_info):
"""Save input_names and attr_names of current func."""
if not isinstance(reg_info, dict):
return
tensor_inputs = reg_info.get("inputs", [])
attr = reg_info.get("attr", [])
if not isinstance(tensor_inputs, (list, tuple)):
tensor_inputs = [tensor_inputs]
if not isinstance(attr, (list, tuple)):
attr = [attr]
# input_names include tensor input names and attr input names
input_names = []
# attr_names only includes attr input names
attr_names = []
for item in tensor_inputs:
if isinstance(item, dict) and item.get("name") is not None:
input_names.append(item["name"])
for item in attr:
if isinstance(item, dict) and item.get("name") is not None:
input_names.append(item["name"])
attr_names.append(item["name"])
cur_attr = {"input_names": input_names, "attr_names": attr_names}
# If func does not have attr, save current attr.
# Else, check if current attr is same as previous saved one.
prev_input_names = input_names
prev_attr_names = attr_names
if callable(self.func):
func_attr = getattr(self.func, "func_attr", None)
if not isinstance(func_attr, dict):
setattr(self.func, "func_attr", cur_attr)
else:
prev_input_names = func_attr.get("input_names")
prev_attr_names = func_attr.get("attr_names")
elif isinstance(self.func, str):
func_attr = Custom.attr_dict.get(self.func)
if not isinstance(func_attr, dict):
Custom.attr_dict[self.func] = cur_attr
else:
prev_input_names = func_attr.get("input_names")
prev_attr_names = func_attr.get("attr_names")
if len(input_names) != len(prev_input_names):
raise ValueError("{}, length of input names set in registration information must be the same as previous "
"saved one, but got {} vs {}"
.format(self.log_prefix, len(input_names), len(prev_input_names)))
if attr_names != prev_attr_names:
raise ValueError("{}, attr names set in registration information must be the same as previous saved one, "
"but got {} vs {}".format(self.log_prefix, attr_names, prev_attr_names))
def _add_prim_target(self):
"""Add primitive_target to primitive's attr."""
registered_targets = self._get_registered_targets()
if self.func_type == "pyfunc":
self.add_prim_attr("primitive_target", "CPU")
if registered_targets and registered_targets != ["CPU"]:
logger.warning("{}, only supports CPU platform, but got registered target {}. "
"We will run it on CPU".format(self.log_prefix, registered_targets))
elif self.func_type == "aot":
if len(registered_targets) != 1:
logger.info("{}, target will be set according to context.".format(self.log_prefix))
elif registered_targets == ["GPU"]:
self.add_prim_attr("primitive_target", "GPU")
elif registered_targets == ["CPU"]:
self.add_prim_attr("primitive_target", "CPU")
elif self.func_type == "julia":
self.add_prim_attr("primitive_target", "CPU")
device_target = context.get_context('device_target')
if device_target == "CPU":
pass
elif device_target == "GPU" and registered_targets and registered_targets == ["CPU"]:
logger.warning("{}, only supports CPU platform, but got registered target {}. "
"We will run it on CPU".format(self.log_prefix, registered_targets))
else:
raise ValueError("{}, only supports CPU platform, but got target {}."
.format(self.log_prefix, device_target))
def _update_attr(self):
"""Add input_names, attr_names, primitive_target to primitive's attr."""
# add input_names, attr_names
func_attr = {}
if callable(self.func):
inputs_num = len(inspect.signature(self.func).parameters)
self.add_prim_attr("inputs_num", inputs_num)
func_attr = getattr(self.func, "func_attr", None)
elif isinstance(self.func, str):
func_attr = Custom.attr_dict.get(self.func)
if isinstance(func_attr, dict):
input_names = func_attr.get("input_names")
attr_names = func_attr.get("attr_names")
if input_names:
self.add_prim_attr("input_names", input_names)
if attr_names:
self.add_prim_attr("attr_names", attr_names)
self._add_prim_target()
if callable(self.func) and callable(self.out_shape):
if hasattr(self.out_shape, "type") and getattr(self.out_shape, "type") == "autodiff":
self.add_prim_attr("autodiff", True)
else:
self.add_prim_attr("autodiff", False)
def _hybrid_autodiff(self, input_func_type):
"""generate backward op for a custom hybrid op"""
inputs_num = len(inspect.signature(self.func).parameters)
if inputs_num == 0:
logger.warning("{}, 'func' with no input has no backward operator.".format(self.log_prefix))
elif inputs_num > 10:
logger.warning("{}, currently automatic differentiation for 'func' with more than 10 inputs is not "
"supported.".format(self.log_prefix))
else:
grad_func = autodiff_bprop(inputs_num)
def infer_func(*args):
return args[:inputs_num]
setattr(infer_func, "type", "autodiff")
op = Custom(func=self.func, out_shape=infer_func, out_dtype=infer_func,
func_type=input_func_type, bprop=True)
self.bprop = grad_func(op)
def _hybrid_func_analyser(self):
"""analyze hybrid source string and add corresponding attrs."""
args = {val: idx for idx, val in enumerate(list(inspect.signature(self.func).parameters))}
return_num = self.func_source_str.count('return')
if return_num != 1:
logger.warning("{}, source code of 'func' should have only one 'return' syntax, but got {}"
.format(self.log_prefix, return_num))
else:
sentences = [s for s in self.func_source_str.split('\n') if s.count("return") == 1]
symbols = re.sub(r"return|\s|\[|\]|\(|\)", "", sentences[-1]).split(',')
inplace_assign_output = [[idx, args[val]] if val in args else [idx, -1]
for idx, val in enumerate(symbols)]
if any(i[1] != -1 for i in inplace_assign_output):
self.add_prim_attr("inplace_assign_output", " ".join(
(str(j) for i in inplace_assign_output for j in i)))
def _auto_infer(self, *args):
"""
the automatic infer function for functions with @ms_kernel decorator
"""
fake_input = []
enable_infer_value = True
for arg in args:
if arg["value"] is not None:
fake_input.append(arg["value"].asnumpy())
else:
arg_dtype = arg["dtype"]
# if any value is missing from input, disable infer value
enable_infer_value = False
if isinstance(arg_dtype, mstype.tensor_type):
arg_dtype = arg_dtype.element_type()
fake_arg = np.zeros(arg["shape"]).astype(
mstype.dtype_to_nptype(arg_dtype))
fake_input.append(fake_arg)
fake_output = self.func(*fake_input)
if hasattr(fake_output, 'shape'):
infer_shape = fake_output.shape
infer_dtype = mstype.tensor_type(mstype.pytype_to_dtype(fake_output.dtype))
else:
infer_shape = (1,)
infer_dtype = mstype.pytype_to_dtype(fake_output.dtype)
infer_value = Tensor(fake_output) if enable_infer_value else None
return infer_shape, infer_dtype, infer_value