分布式并行训练Transformer模型

查看源文件

概述

近年来,基于Transformer的预训练模型参数量越来越大,而Ascend 910、GPU等设备内存的增长显著小于模型大小增长的速度。因此,将Transformer模型进行并行训练已经一个非常迫切的需求。MindSpore提供了一个分布式的Transformer接口mindspore.nn.transformer.transformer,将Transformer内部用到的每个算子都配置了并行策略,而用户只需要配置全局的data_parallelmodel_parallel属性,即可完成分布式并行策略的配置。可以极大地方便用户应用Transformer进行分布式训练。目前分布式训练支持Ascend 910和GPU环境,总结如下:

  • Transformer提供了简单的并行配置,即可实现算子级别并行和流水线并行。

目录结构如下:

└─sample_code
    ├─distribute_training_transformer
        ├── dataset.py
        ├── model.py
        ├── parallel_recover_train.py
        ├── parallel_save_ckpt_train.py
        ├── preprocess.py
        ├── rank_table_16pcs.json
        ├── rank_table_2pcs.json
        ├── rank_table_8pcs.json
        ├── run_cluster.sh
        ├── run_parallel_recover_ckpt.sh
        ├── run_parallel_save_ckpt.sh
        ├── run.sh
        └── train.py

其中,rank_table_8pcs.jsonrank_table_2pcs.json是配置当前多卡环境的组网信息文件。model.pydataset.pytrain.py三个文件是定义数据导入,网络结构的脚本和训练文件。run.sh是执行脚本。

使用mindspore.parallel中的Transformer库,用户需要决定并行配置和模型这两个部分的入参,即可完成分布式配置。分布式配置仅在半自动和自动并行模式下生效

并行配置定义

针对Transformer中网络的定义和实现,我们为每个算子设置了对应的切分策略。用户根据自己的需求,设置全局的并行配置可以实现Transformer网络的并行配置。Transformer目前定义的并行配置主要有三个类别TransformerOpParallelConfigOpParallelConfigEmbeddingOpParallelConfigTransformerOpParallelConfig的导入路径为mindspore.nn.transformer,它可以配置的属性如下所示:

  • data_parallel (int): 设置数据并行数,默认值为1。

  • model_parallel (int): 设置模型并行数,默认值为1。

  • pipeline_stage (int): 设置Pipeline Stage数目,默认值为 1。

  • micro_batch_num (int): 设置输入Batch的切分个数,即将一个Batch切分成多个小batch,默认值为1。

  • optimizer_shard (bool): 是否开启优化器并行,默认值为False。

  • gradient_aggregation_group (int): 优化器并行对应梯度聚合个数,默认值为4。

  • recompute (bool): 是否开启重计算,默认值为False。

  • vocab_emb_dp (bool): 是否配置Embedding为数据并行,默认值为True。

我们会在接下来讨论他们的区别。现在以单机八卡训练一个Transformer模型为例,我们根据目前的卡数8设置Transformer模型的并行配置。我们可以设置data_parallel=1,model_parallel=8作为并行的基本配置。注意并行配置的情况下,data_parallel*model_parallel*pipeline_stages<=总卡数。对应的代码中的并行配置如下。

import mindspore as ms
from mindspore.nn.transformer import TransformerOpParallelConfig
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL)
parallel_config = TransformerOpParallelConfig(data_parallel=1, model_parallel=8)

模型定义

在定义好配置之后,我们可以开始构造一个网络。由于MindSpore已经提供了Transformer,用户只需要额外增加Embedding层、输出层和损失函数即可。下面依次介绍各个模块的配置。

Embedding层

Tranformer中的Embeding层主要由词向量嵌入和位置向量嵌入两部分组成。我们提供了VocabEmbedding作为并行的Embedding层,需要传入EmbeddingOpParallelConfig进行初始化。和OpParallelConfig不同的是,EmbeddingOpParallelConfig拥有的属性如下

  • data_parallel: 设置数据并行数,默认值为1。

  • model_parallel: 设置模型并行数,默认值为1。

  • vocab_emb_dp: 是否配置Embedding为数据并行,默认值为True。

vocab_emb_dp用来区分embedding_lookup操作的两种并行模式数据并行行切分并行。当vocab_emb_dpTrue时,embedding查找的过程将会被设置为并行度为data_parallel的数据并行。当vocab_emb_dpFalse时,embedding的权重将会在第0维度按model_parallel进行均分,可以减少变量的存储。

在此我们定义了一个EmbeddingLayer,将查询的词向量和位置向量进行相加求和。注意,我们在此设置了adddropout操作。由于输入的tensor大小为[batch_size, seq_length, hidden_size],并且词向量的查找过程为数据并行,所以我们根据OpParallelConfig中的数据并行值data_parallel,调用算子的shard方法分别设置这两个算子的并行策略。如果用户不设置shard方法,那么默认的算子并行策略为并行度为卡数的数据并行。对应的代码如下所示:

import mindspore.nn as nn
import mindspore.ops as ops
from mindspore.nn.transformer import VocabEmbedding
class EmbeddingLayer(nn.Cell):
    def __init__(self, vocab_size, position_size, embedding_size,
                 parallel_config, dropout_rate=0.1):
        super(EmbeddingLayer, self).__init__()
        self.word_embedding = VocabEmbedding(vocab_size=vocab_size,
                                             embedding_size=embedding_size,
                                             parallel_config=parallel_config)
        self.position_embedding = VocabEmbedding(vocab_size=position_size,
                                                 embedding_size=embedding_size,
                                                 parallel_config=parallel_config)
        self.add = ops.Add().shard(((parallel_config.data_parallel, 1, 1), (parallel_config.data_parallel, 1, 1)))
        self.dropout = nn.Dropout(1 - dropout_rate)
        self.dropout.dropout.shard(((parallel_config.data_parallel, 1, 1),))

    def construct(self, input_ids, input_position):
        word_embedding, word_table = self.word_embedding(input_ids)
        position_embedding, _ = self.position_embedding(input_position)
        embed = self.add(word_embedding, position_embedding)
        embed = self.dropout(embed)
        return embed, word_table

注意我们还将词嵌入的embedding_table作为返回值返回了。

Transformer层

用户可以调用三个接口作为主要的构建API:TransformerTransformerEncoderTransformerDecoder。它们都需要传入TransformerOpParallelConfig作为并行设置的配置。我们根据TransformerOpParallelConfig中配置的并行配置,对Transformer内部使用的算子设置对应的并行策略。

pipeline_func这个方法可以设置transformer中每个block属于的stage、是否开启重计算和优化器切分的融合标记。例如下面的例子中,我们根据传入的layer_idoffset(在Transformer接口中,在实例化Encoder时传入的offset为0, Decoder中传入的offset的值为Encoder的层数), Encoder_layerDecoder_layer的总层数,和指定的pipeline_stage数目,按照均分的配置计算出当前的block对应的stage。在默认情况下,即用户不传入lambda_func的情况下,也是按照层数进行均分的设置。

def pipeline_func(network, layer_id, offset, parallel_config, layers):
    layers_per_stage = 2
    pp_id = max(int(layer_id + offset) / layers_per_stage, 1)
    network.pipeline_stage = int(pp_id)
    print(f"pipeline id is:{pp_id}", flush=True)

在下面的代码中,我们实例化了上述定义的EmbeddingLayer,并且调用set_comm_fusion将其对应的反向梯度融合标记为第0组,调用pipeline_stage方法设置对应embedding的权重为第0个stage。将最后的Head类,一个简单的Linear层,放置于最后一个stage。在用户不设置Linear中的算子并行策略的情况下,默认是当前stage内的数据并行。

import mindspore.nn as nn
import mindspore.ops as ops
from mindspore.nn.transformer import Transformer, AttentionMask, CrossEntropyLoss
from mindspore.nn import Dense as Linear
class Net(nn.Cell):
    """
      Single Transformer Model
    """
    def __init__(self, batch, src_len, tgt_len, hidden_size, vocab_size,
                 en_layer, de_layer, parallel_config, return_loss=False):
        super(Net, self).__init__()
        self.src_embedding = EmbeddingLayer(vocab_size=vocab_size, embedding_size=hidden_size,
                                            position_size=src_len,
                                            parallel_config=parallel_config.embedding_dp_mp_config)
        self.tgt_embedding = EmbeddingLayer(vocab_size=vocab_size, embedding_size=hidden_size,
                                            position_size=tgt_len,
                                            parallel_config=parallel_config.embedding_dp_mp_config)
        total_layers = en_layer + de_layer + 2
        layers_per_stage = total_layers // parallel_config.pipeline_stage
        self.src_embedding.pipeline_stage = 0
        self.tgt_embedding.pipeline_stage = 0
        self.return_loss = return_loss

        def pipeline_func(network, layer_id, offset, parallel_config, layers):
            pp_id = max(int(layer_id + offset) / layers_per_stage, 1)
            network.pipeline_stage = int(pp_id)
            gradient_aggregation_group = 4
            dis = max(int((layer_id + offset) / gradient_aggregation_group), 1)
            network.set_comm_fusion(int((layer_id + offset) / dis) + 1)
            print(f"pipeline id is:{pp_id}", flush=True)

        self.base1 = Transformer(encoder_layers=en_layer,
                                 decoder_layers=de_layer,
                                 batch_size=batch,
                                 src_seq_length=src_len,
                                 tgt_seq_length=tgt_len,
                                 hidden_size=hidden_size,
                                 num_heads=8,
                                 attention_dropout_rate=0.0,
                                 hidden_dropout_rate=0.0,
                                 lambda_func=pipeline_func,
                                 ffn_hidden_size=hidden_size,
                                 parallel_config=parallel_config)

        self.attention_mask = AttentionMask(seq_length=tgt_len)
        self.head = Linear(in_channels=hidden_size, out_channels=vocab_size, has_bias=False)
        self.head.matmul.shard(((1, 1), (1, 1)))
        self.head.pipeline_stage = parallel_config.pipeline_stage - 1
        self.loss = CrossEntropyLoss(parallel_config=parallel_config.dp_mp_config)
        self.no_equal = ops.NotEqual().shard(((1, 1), ()))

定义损失函数

MindSpore还提供了一个支持并行的交叉商损失函数mindspore.nn.transformer.CrossEntroyLoss。这个函数接收一个OpParallelConfig来配置并行属性。OpParallelConfig实际包含了两个属性data_parallelmodel_parallel。 通过这两个属性可以配置损失函数的并行配置。

from mindspore.nn.transformer import CrossEntropyLoss, TransformerOpParallelConfig
parallel_config = TransformerOpParallelConfig()
loss = CrossEntropyLoss(parallel_config=parallel_config.dp_mp_config)

端到端流程

在定义并行配置、模型和损失函数之后,我们将上述代码进一步整合。在启动训练之前,我们调用auto_parallel_context设置并行选项,设置并行模式为SEMI_AUTO_PARALLEL。在流水线并行的情况下,MindSpore提供了额外的配置,将梯度累积变量进一步切分到数据并行维度的卡上,以节省内存占用。其过程如下:首先开启优化器切分(enable_parallel_optimizer=True), 然后设置parallel_optimizer_config= {"gradient_accumulation_shard":True}将流水线并行训练时的累积变量进一步切分,以达到节省内存的目的,同时会在每个micro_step之间引入通信算子进行梯度的同步。注意gradient_accumulation_shard默认对应的值为True,如果用户为了提高性能,可以将此参数设置为False。

import mindspore as ms
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, gradients_mean=False, full_batch=True, loss_repeated_mean=True, device_num=device_num, enable_parallel_optimizer=True, parallel_optimizer_config = {"gradient_accumulation_shard": gradient_accumulation_shard})

关于stage_num的说明如下,MindSpore通过stage_num来判断是否进入流水线并行训练。

  • 在设置stage_num=1的情况下,进行算子级别的并行。用户可以通过设置TransformerOpParallelConfig中的model_paralleldata_parallel属性配置并行策略。

  • 在设置stage_num>1的情况下,会进入流水线并行模式。流水线并行模式下,需要设置每个cellpipeline_stage属性,将cell指定到对应的设备上执行。另外,在实例化网络后,我们需要再调用PipelineCell来封装定义好的网络。这个Cell的作用是将网络的输入切分成mirco_batch_num个数的小数据,以最大利用计算资源。值得注意的是,我们需要调用net.infer_param_pipeline_stage()而不是net.trainable_params()来获取当前设备stage对应的训练权重。注意,pipeline的stage内的卡数至少为8。pipeline的详细教程可以参考这里

整合后的主文件代码如下。注意在此省略一些参数的定义,完整的参数列表可以参考用例源代码,代码地址在本文开始的部分已经给出。

import argparse
import mindspore as ms
from mindspore.nn.transformer import TransformerOpParallelConfig
import mindspore.communication as D
from mindspore.nn import PipelineCell
from mindspore.nn import AdamWeightDecay
from dataset import ToyDataset, Tokenzier
from model import Net


def set_weight_decay(params):
    decay_filter = lambda x: 'layernorm' not in x.name.lower() and "bias" not in x.name.lower()
    decay_params = list(filter(decay_filter, params))
    other_params = list(filter(lambda x: not decay_filter(x), params))
    group_params = [{
        'params': decay_params,
        'weight_decay': 1e-1
    }, {
        'params': other_params,
        'weight_decay': 0.0
    }, {
        'order_params': params
    }]
    return group_params


def main():
    parser = argparse.ArgumentParser(description="Transformer training")
    parser.add_argument("--distribute",
                        type=str,
                        default="false",
                        choices=["true", "false"],
                        help="Run distribute, default is true.")
    parser.add_argument("--micro_batch_num",
                        type=int,
                        default=1,
                        help="The micro batch num.")
    parser.add_argument('--pipeline_stage',
                        required=False,
                        type=int,
                        default=1,
                        help='The pipeline stage number.')
    parser.add_argument('--mp',
                        required=False,
                        type=int,
                        default=1,
                        help='The model parallel way.')
    args_opt = parser.parse_args()

    if args_opt.distribute == 'true':
        D.init()
        device_num = D.get_group_size()
        rank_id = D.get_rank()
        dp = device_num // args_opt.mp // args_opt.pipeline_stage
        print("rank_id is {}, device_num is {}, dp is {}".format(rank_id, device_num, dp))
        gradient_accumulation_shard = dp > 1 and args_opt.pipeline_stage > 1
        ms.reset_auto_parallel_context()
        ms.set_auto_parallel_context(
               parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, gradients_mean=False,
               full_batch=True, loss_repeated_mean=True,
               device_num=device_num, enable_parallel_optimizer=True,
               parallel_optimizer_config={"gradient_accumulation_shard": gradient_accumulation_shard})
    else:
        dp = 1

    parallel_config = TransformerOpParallelConfig(pipeline_stage=args_opt.pipeline_stage,
                                                  micro_batch_num=args_opt.micro_batch_num,
                                                  model_parallel=args_opt.mp,
                                                  data_parallel=dp)

    net = Net(batch=args_opt.batch_size // args_opt.micro_batch_num if args_opt.pipeline_stage else args_opt.batch_size,
              src_len=args_opt.src_len, tgt_len=args_opt.tgt_len,
              vocab_size=args_opt.vocab_size,
              hidden_size=args_opt.d_model,
              en_layer=args_opt.encoder_layer,
              de_layer=args_opt.decoder_layer,
              parallel_config=parallel_config, return_loss=args_opt.train)

    tokenizer = Tokenzier()
    task = ToyDataset(file_path=args_opt.file_path,
                      tokenizer=tokenizer,
                      seq_length=(args_opt.src_len, args_opt.tgt_len))
    dataset = task.get_dataset(batch_size=args_opt.batch_size)

    if args_opt.pipeline_stage > 1:
        net = PipelineCell(net, args_opt.micro_batch_num)
        param = net.infer_param_pipeline_stage()
        print(f"params is:{param}", flush=True)
        group_params = set_weight_decay(param)
        opt = AdamWeightDecay(group_params, learning_rate=args_opt.lr)
    else:
        group_params = set_weight_decay(net.trainable_params())
        opt = AdamWeightDecay(group_params, learning_rate=args_opt.lr)

    if not args_opt.train:
        model = ms.Model(net)
    else:
        model = ms.Model(net, optimizer=opt)

    callback_size = 1
    ckpt_config = ms.CheckpointConfig(save_checkpoint_steps=callback_size, keep_checkpoint_max=4,
                                      integrated_save=False)
    ckpoint_cb = ms.ModelCheckpoint(prefix="test",
                                    config=ckpt_config)
    callback = [ms.TimeMonitor(callback_size), ms.LossMonitor(callback_size), ckpoint_cb]
    model.train(1, dataset, callbacks=callback, dataset_sink_mode=False)

if __name__ == "__main__":
    main()

准备环节

下载数据集

使用newstest2014-fren-ref.en.sgm作为该任务的训练集合,合并且清洗该数据集。将数据集解压至docs/sample_code/distributed_training_transformer目录下。

预处理流程

执行下述代码进行数据的预处理过程,将会在当前目录下产生output目录,目录下将会生成wmt14.en_fr.txtwmt14.fr_en.txt两个文件,文件中每行是一个法语和英语的句子对。我们将采用wmt14.fr_en.txt作为训练数据。

python preprocess.py

配置分布式环境变量

在裸机环境(对比云上环境,即本地有Ascend 910 AI 处理器)进行分布式训练时,需要配置当前多卡环境的组网信息文件。如果使用华为云环境,因为云服务本身已经做好了配置,可以跳过本小节。

以Ascend 910 AI处理器为例,1个8卡环境的json配置文件示例如下,本样例将该配置文件命名为rank_table_8pcs.json。2卡环境配置可以参考样例代码中的rank_table_2pcs.json文件。

{
    "version": "1.0",
    "server_count": "1",
    "server_list": [
        {
            "server_id": "10.*.*.*",
            "device": [
                {"device_id": "0","device_ip": "192.1.27.6","rank_id": "0"},
                {"device_id": "1","device_ip": "192.2.27.6","rank_id": "1"},
                {"device_id": "2","device_ip": "192.3.27.6","rank_id": "2"},
                {"device_id": "3","device_ip": "192.4.27.6","rank_id": "3"},
                {"device_id": "4","device_ip": "192.1.27.7","rank_id": "4"},
                {"device_id": "5","device_ip": "192.2.27.7","rank_id": "5"},
                {"device_id": "6","device_ip": "192.3.27.7","rank_id": "6"},
                {"device_id": "7","device_ip": "192.4.27.7","rank_id": "7"}],
             "host_nic_ip": "reserve"
        }
    ],
    "status": "completed"
}

其中需要根据实际训练环境修改的参数项有:

  • server_count表示参与训练的机器数量。

  • server_id表示当前机器的IP地址。

  • device_id表示卡物理序号,即卡所在机器中的实际序号。

  • device_ip表示集成网卡的IP地址,可以在当前机器执行指令cat /etc/hccn.confaddress_x的键值就是网卡IP地址。

  • rank_id表示卡逻辑序号,固定从0开始编号。

调用集合通信库

MindSpore分布式并行训练的通信使用了华为集合通信库Huawei Collective Communication Library(以下简称HCCL),可以在Ascend AI处理器配套的软件包中找到。同时mindspore.communication.management中封装了HCCL提供的集合通信接口,方便用户配置分布式信息。

HCCL实现了基于Ascend AI处理器的多机多卡通信,有一些使用限制,我们列出使用分布式服务常见的,详细的可以查看HCCL对应的使用文档。

  • 单机场景下支持1、2、4、8卡设备集群,多机场景下支持8*n卡设备集群。

  • 每台机器的0-3卡和4-7卡各为1个组网,2卡和4卡训练时卡必须相连且不支持跨组网创建集群。

  • 组建多机集群时需要保证各台机器使用同一交换机。

  • 服务器硬件架构及操作系统需要是SMP(Symmetrical Multi-Processing,对称多处理器)处理模式。

下面是调用集合通信库样例代码:

import os
from mindspore.communication import init
import mindspore as ms

if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend", device_id=int(os.environ["DEVICE_ID"]))
    init()
    ...

其中,

  • mode=GRAPH_MODE:使用分布式训练需要指定运行模式为图模式(PyNative模式不支持并行)。

  • device_id:卡的物理序号,即卡所在机器中的实际序号。

  • init:使能HCCL通信,并完成分布式训练初始化操作。

运行脚本

上述已将训练所需的脚本编辑好了,接下来通过命令调用对应的脚本。

目前MindSpore分布式执行采用单卡单进程运行方式,即每张卡上运行1个进程,进程数量与使用的卡的数量一致。其中,0卡在前台执行,其他卡放在后台执行。每个进程创建1个目录,用来保存日志信息以及算子编译信息。下面以使用8张卡的分布式训练脚本为例,演示如何运行脚本:

#!/bin/bash
# applicable to Ascend

echo "=============================================================================================================="
echo "Please run the script as: "
echo "bash run.sh DATA_PATH RANK_SIZE"
echo "For example: bash run.sh /path/dataset 8"
echo "It is better to use the absolute path."
echo "=============================================================================================================="
set -e
DATA_PATH=$1
export DATA_PATH=${DATA_PATH}
RANK_SIZE=$2

EXEC_PATH=$(pwd)

test_dist_8pcs()
{
    export RANK_TABLE_FILE=${EXEC_PATH}/rank_table_8pcs.json
    export RANK_SIZE=8
}

test_dist_2pcs()
{
    export RANK_TABLE_FILE=${EXEC_PATH}/rank_table_2pcs.json
    export RANK_SIZE=2
}

test_dist_${RANK_SIZE}pcs

for((i=1;i<${RANK_SIZE};i++))
do
    rm -rf device$i
    mkdir device$i
    cp ./train.py ./model.py ./dataset.py ./device$i
    cd ./device$i
    export DEVICE_ID=$i
    export RANK_ID=$i
    echo "start training for device $i"
    env > env$i.log
    python ./train.py --distribute=true --file_path=${DATA_PATH} --mp=${RANK_SIZE} > train.log$i 2>&1 &
    cd ../
done
rm -rf device0
mkdir device0
cp ./train.py ./model.py ./dataset.py ./device0
cd ./device0
export DEVICE_ID=0
export RANK_ID=0
echo "start training for device 0"
env > env0.log
python ./train.py --distribute=true --file_path=${DATA_PATH} --mp=${RANK_SIZE} > train.log0 2>&1 &
if [ $? -eq 0 ];then
    echo "training success"
else
    echo "training failed"
    exit 2
fi
cd ../

脚本需要传入变量DATA_PATHRANK_SIZE,分别表示wmt14.fr_en.txt数据集绝对路径和卡的数量。

分布式相关的环境变量有,

  • RANK_TABLE_FILE:组网信息文件的路径。

  • DEVICE_ID:当前卡在机器上的实际序号。

  • RANK_ID:当前卡的逻辑序号。

其余环境变量请参考安装教程中的配置项。

运行时间大约在5分钟内,主要时间是用于算子的编译,实际训练时间在20秒内。用户可以通过ps -ef | grep python来监控任务进程。

日志文件保存到rank所对应的device0device1……目录下,env.log中记录了环境变量的相关信息,关于Loss部分结果保存在train.log中,示例如下:

epoch: 1 step: 1, loss is 9.9034
epoch: 1 step: 2, loss is 9.9033
epoch: 1 step: 3, loss is 9.9031
epoch: 1 step: 4, loss is 9.9025
epoch: 1 step: 5, loss is 9.9022

总结

分布式并行训练可以显著的提升网络训练的性能,从实际的实验来看,Transformer 8卡分布式训练的性能超过单卡的5倍。网络分布式并行化的过程会引入一些代码和配置的复杂性,但相比性能上带来的收益是值得的。