mindspore_lite.ModelParallelRunner
- class mindspore_lite.ModelParallelRunner[源代码]
ModelParallelRunner 类定义了MindSpore Lite的Runner,它支持模型并行。与 model 相比, model 不支持并行,但 ModelParallelRunner 支持并行。一个Runner包含多个worker,worker为实际执行并行推理的单元。典型场景为当多个客户端向服务器发送推理任务时,服务器执行并行推理,缩短推理时间,然后将理结果返回给客户端。
说明
首先使用 init 方法进行初始化后,再调用其他方法。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> model_parallel_runner = mslite.ModelParallelRunner() >>> print(model_parallel_runner) model_path: .
- get_inputs()[源代码]
获取模型的所有输入Tensor。
- 返回:
list[Tensor],模型的输入Tensor列表。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> context = mslite.Context() >>> context.append_device_info(mslite.CPUDeviceInfo()) >>> runner_config = mslite.RunnerConfig(context=context, workers_num=4) >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.init(model_path="mobilenetv2.ms", runner_config=runner_config) >>> inputs = model_parallel_runner.get_inputs()
- get_outputs()[源代码]
获取模型的所有输出Tensor。
- 返回:
list[Tensor],模型的输出Tensor列表。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> context = mslite.Context() >>> context.append_device_info(mslite.CPUDeviceInfo()) >>> runner_config = mslite.RunnerConfig(context=context, workers_num=4) >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.init(model_path="mobilenetv2.ms", runner_config=runner_config) >>> outputs = model_parallel_runner.get_outputs()
- init(model_path, runner_config=None)[源代码]
从模型路径构建模型并行Runner,以便它可以在设备上运行。
- 参数:
model_path (str) - 定义模型路径。
runner_config (RunnerConfig,可选) - 定义用于在模型池初始化期间传递上下文和选项的配置。默认值:None。
- 异常:
TypeError - model_path 不是str类型。
TypeError - runner_config 既不是RunnerConfig类型也不是None。
RuntimeError - model_path 文件路径不存在。
RuntimeError - 初始化模型并行Runner失败。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> context = mslite.Context() >>> context.append_device_info(mslite.CPUDeviceInfo()) >>> runner_config = mslite.RunnerConfig(context=context, workers_num=4) >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.init(model_path="mobilenetv2.ms", runner_config=runner_config) >>> print(model_parallel_runner) model_path: mobilenetv2.ms.
- predict(inputs, outputs)[源代码]
对模型并行Runner进行推理。
- 参数:
inputs (list[Tensor]) - 包含所有输入Tensor的顺序列表。
outputs (list[Tensor]) - 模型输出按顺序填充到容器中。
- 异常:
TypeError - inputs 不是list类型。
TypeError - inputs 是list类型,但元素不是Tensor类型。
TypeError - output 不是list类型。
TypeError - output 是list类型,但元素不是Tensor类型。
RuntimeError - 预测推理模型失败。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import time >>> from threading import Thread >>> import numpy as np >>> import mindspore_lite as mslite >>> >>> # Precondition 1: Download MindSpore Lite serving package or building MindSpore Lite serving package by >>> # export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # Precondition 2: Install wheel package of MindSpore Lite built by precondition 1. >>> >>> # the number of threads of one worker. >>> # WORKERS_NUM * THREAD_NUM should not exceed the number of cores of the machine. >>> THREAD_NUM = 1 >>> # In parallel inference, the number of workers in one `ModelParallelRunner` in server. >>> # If you prepare to compare the time difference between parallel inference and serial inference, >>> # you can set WORKERS_NUM = 1 as serial inference. >>> WORKERS_NUM = 3 >>> # Simulate 5 clients, and each client sends 2 inference tasks to the server at the same time. >>> PARALLEL_NUM = 5 >>> TASK_NUM = 2 >>> >>> >>> def parallel_runner_predict(parallel_runner, parallel_id): ... # One Runner with 3 workers, set model input, execute inference and get output. ... task_index = 0 ... while True: ... if task_index == TASK_NUM: ... break ... task_index += 1 ... # Set model input ... inputs = parallel_runner.get_inputs() ... in_data = np.fromfile("./model/input.bin", dtype=np.float32) ... inputs[0].set_data_from_numpy(in_data) ... once_start_time = time.time() ... # Execute inference ... outputs = [] ... parallel_runner.predict(inputs, outputs) ... once_end_time = time.time() ... print("parallel id: ", parallel_id, " | task index: ", task_index, " | run once time: ", ... once_end_time - once_start_time, " s") ... # Get output ... for output in outputs: ... tensor_name = output.get_tensor_name().rstrip() ... data_size = output.get_data_size() ... element_num = output.get_element_num() ... print("tensor name is:%s tensor size is:%s tensor elements num is:%s" % (tensor_name, ... data_size, ... element_num)) ... ... data = output.get_data_to_numpy() ... data = data.flatten() ... print("output data is:", end=" ") ... for j in range(5): ... print(data[j], end=" ") ... print("") ... >>> # Init RunnerConfig and context, and add CPU device info >>> cpu_device_info = mslite.CPUDeviceInfo(enable_fp16=False) >>> context = mslite.Context(thread_num=THREAD_NUM, inter_op_parallel_num=THREAD_NUM) >>> context.append_device_info(cpu_device_info) >>> parallel_runner_config = mslite.RunnerConfig(context=context, workers_num=WORKERS_NUM) >>> # Build ModelParallelRunner from file >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.init(model_path="./model/mobilenetv2.ms", runner_config=parallel_runner_config) >>> # The server creates 5 threads to store the inference tasks of 5 clients. >>> threads = [] >>> total_start_time = time.time() >>> for i in range(PARALLEL_NUM): ... threads.append(Thread(target=parallel_runner_predict, args=(model_parallel_runner, i,))) ... >>> # Start threads to perform parallel inference. >>> for th in threads: ... th.start() ... >>> for th in threads: ... th.join() ... >>> total_end_time = time.time() >>> print("total run time: ", total_end_time - total_start_time, " s")