基于Pipeline实现多图调度

Linux Ascend Serving 中级 高级

查看源文件

概述

MindSpore支持一个模型可以生成多张子图,通过调度多张子图实现性能的提升。例如,在GPT3场景,会将模型拆成两个阶段的图,第一阶段为初始化图,只需执行一次,第二阶段的推理图,根据输入句子长度N决定需要执行N次。这样,相对于优化之前两张图合并在一起都需要执行N次,实现了推理服务性能的5-6倍提升。为此,MindSpore Serving提供了Pineline功能,实现多张图之间的调度,提升特定场景的推理服务性能。

当前对Pipeline使用有以下限制:

  • 当前仅支持batchsize为1场景。

  • 有Pipeline存在场景,服务仅是以Pineline的形式对外体现,即客户端调用的模型方法需为注册的Pipeline方法。

下面以一个简单的分布式场景为例,演示Pipeline部署流程。

环境准备

运行示例前,需确保已经正确安装了MindSpore Serving,并配置了环境变量。MindSpore Serving和安装和配置可以参考MindSpore Serving安装页面

导出多图模型

导出分布式模型需要的文件可以参考export_model目录,需要如下文件列表:

export_model
├── distributed_inference.py
├── export_model.sh
├── net.py
└── rank_table_8pcs.json
  • net.py为MatMul网络定义。

  • distributed_inference.py配置分布式相关的参数。

  • export_model.sh在当前机器上创建device目录并且导出每个device对应的模型文件。

  • rank_table_8pcs.json为配置当前多卡环境的组网信息的json文件,可以参考rank_table

使用net.py,构造一个包含MatMul、Neg算子的网络。

import numpy as np
from mindspore import Tensor, Parameter, ops
from mindspore.nn import Cell


class Net(Cell):
    def __init__(self, matmul_size, init_val, transpose_a=False, transpose_b=False, strategy=None):
        super().__init__()
        matmul_np = np.full(matmul_size, init_val, dtype=np.float32)
        self.matmul_weight = Parameter(Tensor(matmul_np))
        self.matmul = ops.MatMul(transpose_a=transpose_a, transpose_b=transpose_b)
        self.neg = ops.Neg()
        if strategy is not None:
            self.matmul.shard(strategy)

    def construct(self, inputs):
        x = self.matmul(inputs, self.matmul_weight)
        x = self.neg(x)
        return x

使用distributed_inference.py,生成多图模型。可以参考分布式推理

import numpy as np
from net import Net
from mindspore import context, Model, Tensor, export
from mindspore.communication import init


def test_inference():
    """distributed inference after distributed training"""
    context.set_context(mode=context.GRAPH_MODE)
    init(backend_name="hccl")
    context.set_auto_parallel_context(full_batch=True, parallel_mode="semi_auto_parallel",
                                      device_num=8, group_ckpt_save_file="./group_config.pb")

    predict_data = create_predict_data()
    network = Net(matmul_size=(96, 16), init_val = 0.5)
    model = Model(network)
    model.infer_predict_layout(Tensor(predict_data))
    export(model._predict_network, Tensor(predict_data), file_name="matmul_0", file_format="MINDIR")

    network_1 = Net(matmul_size=(96, 16), init_val = 1.5)
    model_1 = Model(network)
    model_1.infer_predict_layout(Tensor(predict_data))
    export(model_1._predict_network, Tensor(predict_data), file_name="matmul_1", file_format="MINDIR")


def create_predict_data():
    """user-defined predict data"""
    inputs_np = np.random.randn(128, 96).astype(np.float32)
    return Tensor(inputs_np)

使用export_model.sh,导出多图模型。执行成功后会在上一级目录创建model目录,结构如下:

model
├── device0
│   ├── group_config.pb
│   └── matmul.mindir
├── device1
├── device2
├── device3
├── device4
├── device5
├── device6
└── device7

每个device目录都包含两个文件group_config.pbmatmul_0.mindirmatmul_1.mindir,分别表示模型分组配置文件与两张图对应的模型文件。

部署分布式推理服务

启动分布式推理服务,可以参考pipeline_distributed,需要如下文件列表:

matmul_distributed
├── serving_agent.py
├── serving_server.py
├── matmul
│   └── servable_config.py
├── model
└── rank_table_8pcs.json
  • model为存放模型文件的目录。

  • serving_server.py为启动服务脚本,包括MainDistributed Worker进程。

  • serving_agent.py为启动Agent脚本。

  • servable_config.py模型配置文件,通过distributed.declare_servable声明了一个rank_size为8、stage_size为1的分布式模型,同时定义了一个Pipeline的方法predict

模型配置文件内容如下:

import numpy as np
from mindspore_serving.server import distributed
from mindspore_serving.server import register
from mindspore_serving.server.register import PipelineServable

distributed.declare_servable(rank_size=8, stage_size=1, with_batch_dim=False)

def add_preprocess(x):
    """define preprocess, this example has one input and one output"""
    x = np.add(x, x)
    return x

@register.register_method(output_names=["y"])
def fun1(x):
    x = register.call_preprocess(add_preprocess, x)
    y = register.call_servable(x, subgraph=0)
    return y

@register.register_method(output_names=["y"])
def fun2(x):
    y = register.call_servable(x, subgraph=1)
    return y

servable1 = PipelineServable(servable_name="matmul", method="fun1", version_number=0)
servable2 = PipelineServable(servable_name="matmul", method="fun2", version_number=0)

@register.register_pipeline(output_names=["x", "z"])
def predict(x, y):
    x = servable1.run(x)
    for i in range(10):
        print(i)
        z = servable2.run(y)
    return x, z

其中,call_servable方法的subgraph参数指定图的标号,从0开始,此编号在为图加载的序号,单机场景与declare_servable接口的servable_file的参数列表序号对应,分布式场景与startup_agents接口的model_files的参数列表序号对应。 PipelineServable类声明模型的服务函数,servable_name指定模型名, method指定函数方法, version_number指定版本号,register_pipeline实现对Pipeline函数的注册,入参output_names指定输出列表。

启动Serving服务器

使用serving_server.py,调用distributed.start_servable方法部署分布式Serving服务器。

import os
import sys
from mindspore_serving import server
from mindspore_serving.server import distributed


def start():
    servable_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
    distributed.start_servable(servable_dir, "matmul",
                               rank_table_json_file="rank_table_8pcs.json",
                               version_number=1,
                               distributed_address="127.0.0.1:6200")

    server.start_grpc_server("127.0.0.1:5500")
    server.start_restful_server("127.0.0.1:1500")


if __name__ == "__main__":
    start()
  • servable_dir为servable存放的目录。

  • servable_name为servable的名称,对应一个存放模型配置文件的目录。

  • rank_table_json_file为配置当前多卡环境的组网信息的json文件。

  • distributed_addressDistributed Worker的地址。

  • wait_agents_time_in_seconds设置等待所有Agent注册完成的时限,默认为0表示会一直等待。

启动Agent

使用serving_agent.py,调用startup_agents方法会在当前机器上启动的8个Agent进程。Agent会从Distributed Worker获取rank_table,这样Agent之间才能利用HCCL进行通信。

from mindspore_serving.server import distributed


def start_agents():
    """Start all the agents in current machine"""
    model_files = []
    group_configs = []
    for i in range(8):
        model_files.append([f"model/device{i}/matmul_0.mindir", f"model/device{i}/matmul_1.mindir"])
        group_configs.append([f"model/device{i}/group_config.pb"])

    distributed.startup_agents(distributed_address="127.0.0.1:6200", model_files=model_files,
                               group_config_files=group_configs)

if __name__ == '__main__':
    start_agents()

  • distributed_addressDistributed Worker的地址。

  • model_files为模型文件路径的列表,传入多个模型文件表示支持多图,文件传输顺序号决定call_servable方法的subgraph参数对应的图号。

  • group_config_files为模型分组配置文件路径的列表。

  • agent_start_port表示Agent占用的起始端口,默认为7000。

  • agent_ipAgent的ip地址,默认为None。AgentDistributed Worker通信的ip默认会从rank_table获取,如果该ip地址不可用,则需要同时设置agent_iprank_start

  • rank_start为当前机器起始的rank_id,默认为None。

执行推理

通过gRPC访问推理服务,client需要指定gRPC服务器的ip地址和port。运行serving_client.py,调用matmul分布式模型的predict方法,该方法对应注册的pipeline方法,执行推理。

import numpy as np
from mindspore_serving.client import Client


def run_matmul():
    """Run client of distributed matmul"""
    client = Client("localhost:5500", "matmul", "predict")
    instance = {"x": np.ones((128, 96), np.float32), "y": np.ones((128, 96), np.float32)}
    result = client.infer(instance)
    print("result:\n", result)

if __name__ == '__main__':
    run_matmul()

执行后显示如下返回值,说明Serving分布式推理服务已正确执行Pipeline的多图推理:

result:
[{'x': array([[-96., -96., -96., ..., -96., -96., -96.],
      [-96., -96., -96., ..., -96., -96., -96.],
      [-96., -96., -96., ..., -96., -96., -96.],
      ...,
      [-96., -96., -96., ..., -96., -96., -96.],
      [-96., -96., -96., ..., -96., -96., -96.],
      [-96., -96., -96., ..., -96., -96., -96.]], dtype=float32),'z': array([[-48., -48., -48., ..., -48., -48., -48.],
      [-48., -48., -48., ..., -48., -48., -48.],
      [-48., -48., -48., ..., -48., -48., -48.],
      ...,
      [-48., -48., -48., ..., -48., -48., -48.],
      [-48., -48., -48., ..., -48., -48., -48.],
      [-48., -48., -48., ..., -48., -48., -48.]], }]