mindspore_lite.ModelParallelRunner

class mindspore_lite.ModelParallelRunner[源代码]

ModelParallelRunner 类定义了MindSpore Lite的Runner,它支持模型并行。与 model 相比, model 不支持并行,但 ModelParallelRunner 支持并行。一个Runner包含多个worker,worker为实际执行并行推理的单元。典型场景为当多个客户端向服务器发送推理任务时,服务器执行并行推理,缩短推理时间,然后将理结果返回给客户端。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> print(model_parallel_runner)
model_path: .
build_from_file(model_path, context=None)[源代码]

从模型路径构建模型并行Runner,以便它可以在设备上运行。

参数:
  • model_path (str) - 定义模型路径。

  • context (Context,可选) - 定义用于在模型池初始化期间传递上下文和选项的配置。默认值: NoneNone 表示设置target为cpu的Context,Context带有默认的parallel属性。

异常:
  • TypeError - model_path 不是str类型。

  • TypeError - context 既不是Context类型也不是 None

  • RuntimeError - model_path 文件路径不存在。

  • RuntimeError - 初始化模型并行Runner失败。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> context = mslite.Context()
>>> context.target = ["cpu"]
>>> context.parallel.workers_num = 4
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.build_from_file(model_path="mobilenetv2.mindir", context=context)
>>> print(model_parallel_runner)
model_path: mobilenetv2.mindir.
get_inputs()[源代码]

获取模型的所有输入Tensor。

返回:

list[Tensor],模型的输入Tensor列表。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> context = mslite.Context()
>>> context.target = ["cpu"]
>>> context.parallel.workers_num = 4
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.build_from_file(model_path="mobilenetv2.mindir", context=context)
>>> inputs = model_parallel_runner.get_inputs()
predict(inputs)[源代码]

对模型并行Runner进行推理。

参数:
  • inputs (list[Tensor]) - 包含所有输入Tensor的顺序列表。

返回:

list[Tensor],模型的输出Tensor列表。

异常:
  • TypeError - inputs 不是list类型。

  • TypeError - inputs 是list类型,但元素不是Tensor类型。

  • RuntimeError - 预测推理模型失败。

样例:

>>> # Use case: serving inference.
>>> # Precondition 1: Download MindSpore Lite serving package or building MindSpore Lite serving package by
>>> #                 export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # Precondition 2: Install wheel package of MindSpore Lite built by precondition 1.
>>> # The result can be find in the tutorial of runtime_parallel_python.
>>> import time
>>> from threading import Thread
>>> import numpy as np
>>> import mindspore_lite as mslite
>>> # the number of threads of one worker.
>>> # WORKERS_NUM * THREAD_NUM should not exceed the number of cores of the machine.
>>> THREAD_NUM = 1
>>> # In parallel inference, the number of workers in one `ModelParallelRunner` in server.
>>> # If you prepare to compare the time difference between parallel inference and serial inference,
>>> # you can set WORKERS_NUM = 1 as serial inference.
>>> WORKERS_NUM = 3
>>> # Simulate 5 clients, and each client sends 2 inference tasks to the server at the same time.
>>> PARALLEL_NUM = 5
>>> TASK_NUM = 2
>>>
>>>
>>> def parallel_runner_predict(parallel_runner, parallel_id):
...     # One Runner with 3 workers, set model input, execute inference and get output.
...     task_index = 0
...     while True:
...         if task_index == TASK_NUM:
...             break
...         task_index += 1
...         # Set model input
...         inputs = parallel_runner.get_inputs()
...         in_data = np.fromfile("input.bin", dtype=np.float32)
...         inputs[0].set_data_from_numpy(in_data)
...         once_start_time = time.time()
...         # Execute inference
...         outputs = parallel_runner.predict(inputs)
...         once_end_time = time.time()
...         print("parallel id: ", parallel_id, " | task index: ", task_index, " | run once time: ",
...               once_end_time - once_start_time, " s")
...         # Get output
...         for output in outputs:
...             tensor_name = output.name.rstrip()
...             data_size = output.data_size
...             element_num = output.element_num
...             print("tensor name is:%s tensor size is:%s tensor elements num is:%s" % (tensor_name,
...                                                                                      data_size,
...                                                                                      element_num))
...
...             data = output.get_data_to_numpy()
...             data = data.flatten()
...             print("output data is:", end=" ")
...             for j in range(5):
...                 print(data[j], end=" ")
...             print("")
...
>>> # Init RunnerConfig and context, and add CPU device info
>>> context = mslite.Context()
>>> context.target = ["cpu"]
>>> context.cpu.enable_fp16 = False
>>> context.cpu.thread_num = THREAD_NUM
>>> context.cpu.inter_op_parallel_num = THREAD_NUM
>>> context.parallel.workers_num = WORKERS_NUM
>>> # Build ModelParallelRunner from file
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.build_from_file(model_path="mobilenetv2.mindir", context=context)
>>> # The server creates 5 threads to store the inference tasks of 5 clients.
>>> threads = []
>>> total_start_time = time.time()
>>> for i in range(PARALLEL_NUM):
...     threads.append(Thread(target=parallel_runner_predict, args=(model_parallel_runner, i,)))
...
>>> # Start threads to perform parallel inference.
>>> for th in threads:
...     th.start()
...
>>> for th in threads:
...     th.join()
...
>>> total_end_time = time.time()
>>> print("total run time: ", total_end_time - total_start_time, " s")