mindspore_lite.ModelParallelRunner
- class mindspore_lite.ModelParallelRunner[源代码]
ModelParallelRunner 类定义了MindSpore Lite的Runner,它支持模型并行。与 model 相比, model 不支持并行,但 ModelParallelRunner 支持并行。一个Runner包含多个worker,worker为实际执行并行推理的单元。典型场景为当多个客户端向服务器发送推理任务时,服务器执行并行推理,缩短推理时间,然后将理结果返回给客户端。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> model_parallel_runner = mslite.ModelParallelRunner() >>> print(model_parallel_runner) model_path: .
- build_from_file(model_path, context=None)[源代码]
从模型路径构建模型并行Runner,以便它可以在设备上运行。
- 参数:
model_path (str) - 定义模型路径。
context (Context,可选) - 定义用于在模型池初始化期间传递上下文和选项的配置。默认值:None。None表示设置target为cpu的Context,Context带有默认的parallel属性。
- 异常:
TypeError - model_path 不是str类型。
TypeError - context 既不是Context类型也不是None。
RuntimeError - model_path 文件路径不存在。
RuntimeError - 初始化模型并行Runner失败。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> context = mslite.Context() >>> context.target = ["cpu"] >>> context.parallel.workers_num = 4 >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.build_from_file(model_path="mobilenetv2.mindir", context=context) >>> print(model_parallel_runner) model_path: mobilenetv2.mindir.
- get_inputs()[源代码]
获取模型的所有输入Tensor。
- 返回:
list[Tensor],模型的输入Tensor列表。
样例:
>>> # Use case: serving inference. >>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1. >>> import mindspore_lite as mslite >>> context = mslite.Context() >>> context.target = ["cpu"] >>> context.parallel.workers_num = 4 >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.build_from_file(model_path="mobilenetv2.mindir", context=context) >>> inputs = model_parallel_runner.get_inputs()
- predict(inputs)[源代码]
对模型并行Runner进行推理。
- 参数:
inputs (list[Tensor]) - 包含所有输入Tensor的顺序列表。
- 返回:
list[Tensor],模型的输出Tensor列表。
- 异常:
TypeError - inputs 不是list类型。
TypeError - inputs 是list类型,但元素不是Tensor类型。
RuntimeError - 预测推理模型失败。
样例:
>>> # Use case: serving inference. >>> # Precondition 1: Download MindSpore Lite serving package or building MindSpore Lite serving package by >>> # export MSLITE_ENABLE_SERVER_INFERENCE=on. >>> # Precondition 2: Install wheel package of MindSpore Lite built by precondition 1. >>> # The result can be find in the tutorial of runtime_parallel_python. >>> import time >>> from threading import Thread >>> import numpy as np >>> import mindspore_lite as mslite >>> # the number of threads of one worker. >>> # WORKERS_NUM * THREAD_NUM should not exceed the number of cores of the machine. >>> THREAD_NUM = 1 >>> # In parallel inference, the number of workers in one `ModelParallelRunner` in server. >>> # If you prepare to compare the time difference between parallel inference and serial inference, >>> # you can set WORKERS_NUM = 1 as serial inference. >>> WORKERS_NUM = 3 >>> # Simulate 5 clients, and each client sends 2 inference tasks to the server at the same time. >>> PARALLEL_NUM = 5 >>> TASK_NUM = 2 >>> >>> >>> def parallel_runner_predict(parallel_runner, parallel_id): ... # One Runner with 3 workers, set model input, execute inference and get output. ... task_index = 0 ... while True: ... if task_index == TASK_NUM: ... break ... task_index += 1 ... # Set model input ... inputs = parallel_runner.get_inputs() ... in_data = np.fromfile("input.bin", dtype=np.float32) ... inputs[0].set_data_from_numpy(in_data) ... once_start_time = time.time() ... # Execute inference ... outputs = parallel_runner.predict(inputs) ... once_end_time = time.time() ... print("parallel id: ", parallel_id, " | task index: ", task_index, " | run once time: ", ... once_end_time - once_start_time, " s") ... # Get output ... for output in outputs: ... tensor_name = output.name.rstrip() ... data_size = output.data_size ... element_num = output.element_num ... print("tensor name is:%s tensor size is:%s tensor elements num is:%s" % (tensor_name, ... data_size, ... element_num)) ... ... data = output.get_data_to_numpy() ... data = data.flatten() ... print("output data is:", end=" ") ... for j in range(5): ... print(data[j], end=" ") ... print("") ... >>> # Init RunnerConfig and context, and add CPU device info >>> context = mslite.Context() >>> context.target = ["cpu"] >>> context.cpu.enable_fp16 = False >>> context.cpu.thread_num = THREAD_NUM >>> context.cpu.inter_op_parallel_num = THREAD_NUM >>> context.parallel.workers_num = WORKERS_NUM >>> # Build ModelParallelRunner from file >>> model_parallel_runner = mslite.ModelParallelRunner() >>> model_parallel_runner.build_from_file(model_path="mobilenetv2.mindir", context=context) >>> # The server creates 5 threads to store the inference tasks of 5 clients. >>> threads = [] >>> total_start_time = time.time() >>> for i in range(PARALLEL_NUM): ... threads.append(Thread(target=parallel_runner_predict, args=(model_parallel_runner, i,))) ... >>> # Start threads to perform parallel inference. >>> for th in threads: ... th.start() ... >>> for th in threads: ... th.join() ... >>> total_end_time = time.time() >>> print("total run time: ", total_end_time - total_start_time, " s")