Source code for mindspore_serving.server.register.method

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Method registration interface"""

import inspect
import ast
from functools import wraps

from mindspore_serving._mindspore_serving import ServableRegister_
from mindspore_serving._mindspore_serving import MethodSignature_
from mindspore_serving import log as logger
from mindspore_serving.server.common import check_type, deprecated
from .utils import get_func_name, get_servable_dir
from .stage_function import register_stage_function, check_stage_function
from .model import g_declared_models, Model

method_def_context_ = MethodSignature_()
cur_stage_index_ = 0
has_called_preprocess_ = False
has_called_servable_ = False
has_called_postprocess_ = False

method_def_ast_meta_ = []


class _TensorDef:
    """Data flow item, for definitions of data flow in a method"""

    def __init__(self, tag, tensor_index):
        self.tag = tag
        self.tensor_index = tensor_index

    def as_pair(self):
        return self.tag, self.tensor_index


def _create_tensor_def_outputs(tag, outputs_cnt):
    """Create data flow item for output"""
    result = [_TensorDef(tag, i) for i in range(outputs_cnt)]
    if len(result) == 1:
        return result[0]
    return tuple(result)


def _wrap_fun_to_batch(fun, input_count):
    """wrap preprocess and postprocess to pipeline"""
    argspec_len = len(inspect.signature(fun).parameters)
    if argspec_len != input_count:
        raise RuntimeError(f"function {fun.__name__} input args count {argspec_len} not match the count {input_count} "
                           f"registered in method")

    @wraps(fun)
    def call_func(instances):
        for instance in instances:
            inputs = []
            for i in range(input_count):
                inputs.append(instance[i])
            yield fun(*inputs)

    return call_func


def _get_stage_outputs_count(call_name):
    global method_def_ast_meta_
    method_name = method_def_context_.method_name
    if call_name not in method_def_ast_meta_:
        raise RuntimeError(
            f"Failed to parse method '{method_name}', complex statements such as conditions and loops are not supported"
            f" in register_method when the interface '{call_name}' is used, use 'add_stage' to replace '{call_name}'")
    _, outputs_count = method_def_ast_meta_[call_name]
    return outputs_count


@deprecated("1.5.0", "mindspore_serving.server.register.add_stage")
def call_preprocess(preprocess_fun, *args):
    r"""For method registration, define the preprocessing function and its' parameters.

    .. warning::
        'call_preprocess' is deprecated from version 1.5.0 and will be removed in a future version, use
        :class:`mindspore_serving.server.register.add_stage` instead.

    Note:
        The length of 'args' should be equal to the inputs number of preprocess_fun.

    Args:
        preprocess_fun (function): Python function for preprocess.
        args: Preprocess inputs. The length of 'args' should equal to the input parameters number
            of implemented python function.

    Raises:
        RuntimeError: The type or value of the parameters are invalid, or other error happened.

    Examples:
        >>> from mindspore_serving.server import register
        >>> import numpy as np
        >>> def add_trans_datatype(x1, x2):
        ...     return x1.astype(np.float32), x2.astype(np.float32)
        >>>
        >>> register.declare_servable(servable_file="tensor_add.mindir", model_format="MindIR", with_batch_dim=False)
        >>>
        >>> @register.register_method(output_names=["y"]) # register add_cast method in add
        >>> def add_cast(x1, x2):
        ...     x1, x2 = register.call_preprocess(add_trans_datatype, x1, x2)  # cast input to float32
        ...     y = register.call_servable(x1, x2)
        ...     return y
    """
    global method_def_context_
    global has_called_preprocess_, has_called_servable_, has_called_postprocess_
    if has_called_preprocess_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_preprocess or call_preprocess_pipeline should not be invoked more than once")
    if has_called_servable_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_servable should be invoked after call_preprocess")
    if has_called_postprocess_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_postprocess or call_postprocess_pipeline should be invoked after call_preprocess")
    has_called_preprocess_ = True
    outputs_count = _get_stage_outputs_count('call_preprocess')
    return add_stage(preprocess_fun, *args, outputs_count=outputs_count, tag="Preprocess")


@deprecated("1.5.0", "mindspore_serving.server.register.add_stage")
def call_preprocess_pipeline(preprocess_fun, *args):
    r"""For method registration, define the preprocessing pipeline function and its' parameters.

    .. warning::
        'call_preprocess_pipeline' is deprecated from version 1.5.0 and will be removed in a future version, use
        :class:`mindspore_serving.server.register.add_stage` instead.

    A single request can include multiple instances, so multiple queued requests will also have multiple instances.
    If you need to process multiple instances through multi thread or other parallel processing capability
    in `preprocess` or `postprocess`, such as using MindData concurrency ability to process multiple input
    images in `preprocess`, MindSpore Serving provides 'call_preprocess_pipeline' and 'call_postprocess_pipeline'
    to register such preprocessing and postprocessing. For more detail,
    please refer to `Resnet50 model configuration example
    <https://gitee.com/mindspore/serving/blob/r2.0/example/resnet/resnet50/servable_config.py>`_.

    Args:
        preprocess_fun (function): Python pipeline function for preprocess.
        args: Preprocess inputs. The length of 'args' should equal to the input parameters number
            of implemented python function.

    Raises:
        RuntimeError: The type or value of the parameters are invalid, or other error happened.

    Examples:
        >>> from mindspore_serving.server import register
        >>> import numpy as np
        >>> def add_trans_datatype(instances):
        ...     for instance in instances:
        ...         x1 = instance[0]
        ...         x2 = instance[0]
        ...         yield x1.astype(np.float32), x2.astype(np.float32)
        >>>
        >>> register.declare_servable(servable_file="tensor_add.mindir", model_format="MindIR", with_batch_dim=False)
        >>>
        >>> @register.register_method(output_names=["y"]) # register add_cast method in add
        >>> def add_cast(x1, x2):
        ...     x1, x2 = register.call_preprocess_pipeline(add_trans_datatype, x1, x2)  # cast input to float32
        ...     y = register.call_servable(x1, x2)
        ...     return y
    """
    global method_def_context_
    global has_called_preprocess_, has_called_servable_, has_called_postprocess_
    if has_called_preprocess_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_preprocess or call_preprocess_pipeline should not be invoked more than once")
    if has_called_servable_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_servable should be invoked after call_preprocess_pipeline")
    if has_called_postprocess_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', call_postprocess "
                           f"or call_postprocess_pipeline should be invoked after call_preprocess_pipeline")
    has_called_preprocess_ = True
    outputs_count = _get_stage_outputs_count('call_preprocess_pipeline')
    return add_stage(preprocess_fun, *args, outputs_count=outputs_count, batch_size=0, tag="Preprocess")


@deprecated("1.5.0", "mindspore_serving.server.register.add_stage")
def call_postprocess(postprocess_fun, *args):
    r"""For method registration, define the postprocessing function and its' parameters.

    .. warning::
        'call_postprocess' is deprecated from version 1.5.0 and will be removed in a future version, use
        :class:`mindspore_serving.server.register.add_stage` instead.

    Note:
        The length of 'args' should be equal to the inputs number of postprocess_fun.

    Args:
        postprocess_fun (function): Python function for postprocess.
        args: Preprocess inputs. The length of 'args' should equal to the input parameters number
            of implemented python function.

    Raises:
        RuntimeError: The type or value of the parameters are invalid, or other error happened.
    """
    global method_def_context_
    global has_called_postprocess_
    if has_called_postprocess_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_postprocess or call_postprocess_pipeline should not be invoked more than once")
    has_called_postprocess_ = True
    outputs_count = _get_stage_outputs_count('call_postprocess')
    return add_stage(postprocess_fun, *args, outputs_count=outputs_count, tag="Postprocess")


@deprecated("1.5.0", "mindspore_serving.server.register.add_stage")
def call_postprocess_pipeline(postprocess_fun, *args):
    r"""For method registration, define the postprocessing pipeline function and its' parameters.

    .. warning::
        'call_postprocess_pipeline' is deprecated from version 1.5.0 and will be removed in a future version, use
        :class:`mindspore_serving.server.register.add_stage` instead.

    A single request can include multiple instances, so multiple queued requests will also have multiple instances.
    If you need to process multiple instances through multi thread or other parallel processing capability
    in `preprocess` or `postprocess`, such as using MindData concurrency ability to process multiple input
    images in `preprocess`, MindSpore Serving provides 'call_preprocess_pipeline' and 'call_postprocess_pipeline'
    to register such preprocessing and postprocessing. For more detail,
    please refer to `Resnet50 model configuration example
    <https://gitee.com/mindspore/serving/blob/r2.0/example/resnet/resnet50/servable_config.py>`_.

    Args:
        postprocess_fun (function): Python pipeline function for postprocess.
        args: Preprocess inputs. The length of 'args' should equal to the input parameters number
            of implemented python function.

    Raises:
        RuntimeError: The type or value of the parameters are invalid, or other error happened.
    """
    global method_def_context_
    global has_called_postprocess_
    if has_called_postprocess_:
        raise RuntimeError(f"Check failed in method '{method_def_context_.method_name}', "
                           f"call_postprocess or call_postprocess_pipeline should not be invoked more than once")
    has_called_postprocess_ = True
    outputs_count = _get_stage_outputs_count('call_postprocess_pipeline')
    return add_stage(postprocess_fun, *args, outputs_count=outputs_count, batch_size=0, tag="Postprocess")


@deprecated("1.5.0", "mindspore_serving.server.register.add_stage")
def call_servable(*args):
    r"""For method registration, define the inputs data of model inference.

    .. warning::
        'call_servable' is deprecated from version 1.5.0 and will be removed in a future version, use
        :class:`mindspore_serving.server.register.add_stage` instead.

    Note:
        The length of 'args' should be equal to the inputs number of model.

    Args:
        args: Model's inputs, the length of 'args' should be equal to the inputs number of model.

    Raises:
        RuntimeError: The type or value of the parameters are invalid, or other error happened.

    Examples:
        >>> from mindspore_serving.server import register
        >>> register.declare_servable(servable_file="tensor_add.mindir", model_format="MindIR", with_batch_dim=False)
        >>>
        >>> @register.register_method(output_names=["y"]) # register add_common method in add
        >>> def add_common(x1, x2):
        ...     y = register.call_servable(x1, x2)
        ...     return y
    """
    global method_def_context_
    global has_called_servable_, has_called_postprocess_
    method_name = method_def_context_.method_name
    if has_called_servable_:
        raise RuntimeError(f"Check failed in method '{method_name}', "
                           f"call_servable should not be invoked more than once")
    if has_called_postprocess_:
        raise RuntimeError(f"Check failed in method '{method_name}', "
                           f"call_postprocess or call_postprocess_pipeline should be invoked after call_servable")
    has_called_servable_ = True

    if not g_declared_models:
        raise RuntimeError(f"There is no model declared, you can use declare_model to declare models.")
    outputs_count = _get_stage_outputs_count("call_servable")
    if len(g_declared_models) == 1:
        model = g_declared_models[0]
    else:
        raise RuntimeError(
            f"There are more than one servable declared when the interface 'call_servable' is used, use 'add_stage'"
            f" to replace 'call_servable'")
    return add_stage(model, *args, outputs_count=outputs_count)


[docs]def add_stage(stage, *args, outputs_count, batch_size=None, tag=None): r"""In the `servable_config.py` file of one servable, we use `register_method` to wrap a Python function to define a `method` of the servable, and `add_stage` is used to define a stage of this `method`, which can be a Python function or a model. Note: The length of 'args' should be equal to the inputs number of function or model. Args: stage (Union(function, Model)): User-defined python function or `Model` object return by declare_model. outputs_count (int): Outputs count of the user-defined python function or model. batch_size (int, optional): This parameter is valid only when stage is a function and the function can process multi instances at a time. default None. - None, The input of the function will be the inputs of one instance. - 0, The input of the function will be tuple object of instances, and the maximum number of the instances is determined by the server based on the batch size of models. - int value >= 1, The input of the function will be tuple object of instances, and the maximum number of the instances is the value specified by 'batch_size'. args: Stage inputs placeholders, which come from the inputs of the function wrapped by register_method or the outputs of add_stage. The length of 'args' should equal to the input number of the function or model. tag (str, optional): Customized flag of the stage, such as "Preprocess", default None. Raises: RuntimeError: The type or value of the parameters are invalid, or other error happened. Examples: >>> import numpy as np >>> from mindspore_serving.server import register >>> add_model = register.declare_model(model_file="tensor_add.mindir", model_format="MindIR") >>> >>> def preprocess(x1, x2): ... return x1.astype(np.float32), x2.astype(np.float32) >>> >>> @register.register_method(output_names=["y"]) # register add_common method in add >>> def add_common(x1, x2): ... x1, x2 = register.add_stage(preprocess, x1, x2, outputs_count=2) # call preprocess in stage 1 ... y = register.add_stage(add_model, x1, x2, outputs_count=1) # call add model in stage 2 ... return y """ global method_def_context_ global cur_stage_index_ method_name = method_def_context_.method_name if tag is not None: check_type.check_str("tag", tag) else: tag = "" for item in args: if not isinstance(item, _TensorDef): raise RuntimeError(f"Each value of parameter *args is a placeholder for data and must come from the method" f" inputs or the outputs of add_stage") func_inputs = [item.as_pair() for item in args] inputs_count = len(args) if isinstance(stage, Model): if stage not in g_declared_models: raise RuntimeError( f"Check failed in method '{method_name}', the parameter 'stage' of add_stage must be function " f"or Model returned by declare_model, and ensure that interface 'declare_model' can take effect " f"when importing servable_config.py by the serving server") model = stage model_key = model.model_key ServableRegister_.register_model_input_output_info(model_key, inputs_count, outputs_count, 0) method_def_context_.add_stage_model(model_key, func_inputs, 0, tag) elif inspect.isfunction(stage): if batch_size is None: register_stage_function(method_name, _wrap_fun_to_batch(stage, inputs_count), inputs_count=inputs_count, outputs_count=outputs_count, use_with_size=False) batch_size = 0 else: check_type.check_int("batch_size", batch_size, 0) register_stage_function(method_name, stage, inputs_count=inputs_count, outputs_count=outputs_count, use_with_size=True) func_name = get_servable_dir() + "." + get_func_name(stage) method_def_context_.add_stage_function(func_name, func_inputs, batch_size, tag) else: if not isinstance(stage, str): raise RuntimeError( f"Check failed in method '{method_name}', the parameter 'stage' of add_stage must be function " f"or Model returned by declare_model, now is {type(stage)}") func_name = stage check_stage_function(method_name, func_name, inputs_count=inputs_count, outputs_count=outputs_count) method_def_context_.add_stage_function(func_name, func_inputs, 0, tag) cur_stage_index_ += 1 # call_xxx stage index start begin 1 return _create_tensor_def_outputs(cur_stage_index_, outputs_count)
_call_servable_name = call_servable.__name__ _call_stage_names = [call_preprocess.__name__, call_postprocess.__name__] _call_stage_batch_names = [call_preprocess_pipeline.__name__, call_postprocess_pipeline.__name__] def _ast_node_info(method_def_func, ast_node): """Ast node code info""" func_name = method_def_func.__name__ func_codes = inspect.getsource(method_def_func) func_codes_lines = func_codes.split("\n") _, start_lineno = inspect.findsource(method_def_func) codes = "" if hasattr(ast_node, "end_lineno"): end_lineno = ast_node.end_lineno else: end_lineno = ast_node.lineno for line in range(ast_node.lineno, end_lineno + 1): codes += func_codes_lines[line - 1] + "\n" lineno = ast_node.lineno + start_lineno end_lineno = end_lineno + start_lineno if lineno != end_lineno: line_info = f"{lineno}~{end_lineno}" else: line_info = f"{lineno}" return f"line {line_info} in {func_name}, code: \n" + codes def _get_method_def_stage_meta(method_def_func): """Parse register_method func, and get the input and output count of preprocess, servable and postprocess""" source = inspect.getsource(method_def_func) method_name = method_def_func.__name__ call_list = ast.parse(source).body[0].body func_meta = {} code_infos = [] code_other = None def update_other_code(code): nonlocal code_other if not code_other: code_other = code for call_item in call_list: if isinstance(call_item, ast.Return): continue if isinstance(call_item, ast.Expr): continue if not isinstance(call_item, ast.Assign): update_other_code(call_item) continue target = call_item.targets[0] if isinstance(target, ast.Name): outputs_count = 1 elif isinstance(target, ast.Tuple): outputs_count = len(target.elts) else: continue call = call_item.value if not isinstance(call, ast.Call): continue func = call.func if isinstance(func, ast.Attribute): func_name = func.attr elif isinstance(func, ast.Name): func_name = func.id else: update_other_code(call_item) continue inputs_count = len(call.args) if func_name in _call_stage_names or func_name in _call_stage_batch_names: inputs_count -= 1 elif func_name == _call_servable_name: pass else: update_other_code(call_item) continue if inputs_count <= 0: raise RuntimeError(f"Invalid '{func_name}' invoke args") logger.info(f"stage {len(func_meta) + 1} call type '{func_name}', inputs count {inputs_count}, " f"outputs count {outputs_count}") func_meta[func_name] = [inputs_count, outputs_count] code_infos.append([call_item, func_name]) if code_infos and code_other: call_names = [item[1] for item in code_infos] call_names = ";".join(call_names) raise RuntimeError( f"Failed to parse method '{method_name}', complex statements such as conditions and loops are not supported" f" in register_method when the interface '{call_names}' is used, use 'add_stage' to replace '{call_names}'," f" code {type(code_other)}: {_ast_node_info(method_def_func, code_other)}") if code_infos and _call_servable_name not in func_meta: raise RuntimeError(f"Not find the invoke of '{_call_servable_name}'") return func_meta
[docs]def register_method(output_names): """Define a method of the servable when importing servable_config.py of one servable. One servable can include one or more methods, and eache method provides different services base on models. A client needs to specify the servable name and method name when accessing one service. MindSpore Serving supports a service consisting of multiple python functions and multiple models. Note: This interface should take effect when importing servable_config.py by the serving server. Therefore, it's recommended that this interface be used globally in servable_config.py. This interface will define the signatures and pipeline of the method. The signatures include the method name, input and outputs names of the method. When accessing a service, the client needs to specify the servable name, the method name, and provide one or more inference instances. Each instance specifies the input data by the input names and obtains the output data by the outputs names. The pipeline consists of one or more stages, each stage can be a python function or a model. This is, a pipline can include one or more python functions and one or more models. In addition, the interface also defines the data flow of these stages. Args: output_names (Union[str, tuple[str], list[str]]): The output names of method. The input names is the args names of the registered function. Raises: RuntimeError: The type or value of the parameters are invalid, or other error happened. Examples: >>> from mindspore_serving.server import register >>> add_model = register.declare_model(model_file="tensor_add.mindir", model_format="MindIR") >>> sub_model = register.declare_model(model_file="tensor_sub.mindir", model_format="MindIR") >>> >>> @register.register_method(output_names=["y"]) # register predict method in servable >>> def predict(x1, x2, x3): # x1+x2-x3 ... y = register.add_stage(add_model, x1, x2, outputs_count=1) ... y = register.add_stage(sub_model, y, x3, outputs_count=1) ... return y """ output_names = check_type.check_and_as_str_tuple_list('output_names', output_names) def register(func): name = get_func_name(func) sig = inspect.signature(func) input_names = [] for k, v in sig.parameters.items(): if v.kind == inspect.Parameter.VAR_POSITIONAL: raise RuntimeError(f"'{name}' input {k} cannot be VAR_POSITIONAL !") if v.kind == inspect.Parameter.VAR_KEYWORD: raise RuntimeError(f"'{name}' input {k} cannot be VAR_KEYWORD !") if v.kind == inspect.Parameter.KEYWORD_ONLY: raise RuntimeError(f"'{name}' input {k} cannot be KEYWORD_ONLY !") input_names.append(k) input_tensors = [] for i in range(len(input_names)): input_tensors.append(_TensorDef(0, i)) servable_name = get_servable_dir() global method_def_context_ method_def_context_ = MethodSignature_() method_def_context_.servable_name = servable_name method_def_context_.method_name = name method_def_context_.inputs = input_names method_def_context_.outputs = output_names global method_def_ast_meta_ method_def_ast_meta_ = _get_method_def_stage_meta(func) global cur_stage_index_ cur_stage_index_ = 0 global has_called_preprocess_, has_called_servable_, has_called_postprocess_ has_called_preprocess_ = False has_called_servable_ = False has_called_postprocess_ = False output_tensors = func(*tuple(input_tensors)) if method_def_ast_meta_ and cur_stage_index_ != len(method_def_ast_meta_): raise RuntimeError(f"Failed to parse method {name}, the number of stages obtained through the AST " f"{len(method_def_ast_meta_)} is inconsistent with the running result {cur_stage_index_}" f". Condition and loop statements are not supported in methods currently.") if isinstance(output_tensors, _TensorDef): output_tensors = (output_tensors,) for item in output_tensors: if not isinstance(item, _TensorDef): raise RuntimeError(f"Each value returned is a placeholder for data and must come from the method" f" inputs or the outputs of add_stage") if len(output_tensors) != len(output_names): raise RuntimeError( f"Method return output size {len(output_tensors)} not match registered {len(output_names)}") return_inputs = [item.as_pair() for item in output_tensors] method_def_context_.set_return(return_inputs) logger.info(f"Register method: method_name {method_def_context_.method_name}, " f"inputs: {input_names}, outputs: {output_names}") ServableRegister_.register_method(method_def_context_) return func return register