优化器并行

Ascend 分布式并行

查看源文件

概述

在进行数据并行训练时,模型的参数更新部分在各卡间存在冗余计算,优化器并行通过将优化器的计算量分散到数据并行维度的卡上,在大规模网络上(比如Bert、GPT)可以有效减少内存消耗并提升网络性能。

在数据并行模式下使能优化器并行,框架会将需要更新的参数进行分散到不同卡上,各自更新后再通过Broadcast算子在集群间做权重共享。需要注意的是参数量应当大于机器数,当前只支持Lamb和AdamWeightDecay优化器。

在auto_parallel或者semi_auto_parallel模式下使能优化器并行,如果经过策略切分后的参数在机器间存在重复切片,并且shape的最高维可以被重复切片的卡数整除,框架会以最小切片的方式保存参数并在优化器中更新。该模式下支持所有优化器。

并行模式

参数更新方式

优化器支持

数据并行

参数分组更新,然后广播到所有卡

Lamb和AdamWeightDecay

全/半自动并行

参数按数据并行度切分成N份,每张卡更新当前卡上的参数

所有优化器

无论是哪种模式,优化器并行不会影响原有正反向网络的计算图,只会影响参数更新的计算量和计算逻辑。

目录结构如下:

└─sample_code
    ├─distributed_optimizer_parallel
        ├── fusion_example.py
        ├── rank_table_2pcs.json
        ├── rank_table_8pcs.json
        └── run_fusion_example.sh

其中每个文件的作用如下:

  • fusion_example.py:优化器融合的示例代码,阐述了如何配置优化器的融合标记。

  • rank_table_2pcs.json:RANK_TABLE_FILE的2卡配置文件。

  • rank_table_8pcs.json:RANK_TABLE_FILE的8卡配置文件。

  • run_fusion_example.sh:优化器融合代码的启动脚本。

开启优化器并行

mindspore.context.set_auto_parallel_context中提供了enable_parallel_optimizer选项,将其配置为True后,即可使能优化器并行,默认对所有参数进行优化器切分。

from mindspore import context
context.set_auto_parallel_context(enable_parallel_optimizer=True)

配置参数优化器并行

此外,用户还可以自定义某些参数是否优化器切分。Parameter提供了一个parallel_optimizer的参数,用来配置当前的参数是否进行优化器切分。因此用户单独针对每个参数配置是否开启优化器并行,如下所示

import numpy as np
from mindspore import Parameter, Tensor
param = Parameter(Tensor(np.ones((10, 2))), name='weight1', parallel_optimizer=True)

# Another way to set the parallel_optimizer attribute
param2 = Parameter(Tensor(np.ones((10, 2))), name='weight2')
param2.parallel_optimizer = False

配置通信融合

在设置参数优化器并行一节中,我们阐述了如何配置每个参数的优化器并行属性。在全/半自动模式下,每个参数都会产生一个对应的AllGather操作和ReduceScatter操作。这些通信算子是自动并行框架自动插入的。然而,随着参数量增多,对应的通信算子也会增多,通信操作产生的算子调度和启动都会产生更多的开销。因此,可以通过cell提供的set_comm_fusion方法,对每个cell内的参数对应的AllGather和ReduceScatter操作配置融合标记。

如下述的代码所示,针对实例化后的DenseLayer,调用set_comm_fusion方法,为每一层设置fusion值。

"""Parallel Optimizer Fusion Example"""
from mindspore.communication import init
from mindspore import nn
from mindspore import context, ParallelMode
init()
context.set_auto_parallel_context(parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True)

class DenseLayer(nn.Cell):
    """A base layer with two dense layer"""
    def __init__(self):
        super().__init__()
        self.input_mapping = nn.Dense(10, 10)
        self.output_mapping = nn.Dense(10, 10)
    def construct(self, x):
        x = self.input_mapping(x)
        return self.output_mapping(x)

class Net(nn.Cell):
    """An network with many dense layers"""
    def __init__(self):
        super().__init__()
        self.layer1 = DenseLayer()
        self.layer2 = DenseLayer()
        self.layer3 = DenseLayer()
        self.layer1.set_comm_fusion(0)
        self.layer2.set_comm_fusion(1)
        self.layer3.set_comm_fusion(2)
    def construct(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        return x

net = Net()
for item in net.trainable_params():
    print(f"The parameter {item.name}'s fusion id is {item.comm_fusion}")

对应的输出如下,表示了每层特定dense的fusion值:

The parameter layer1.input_mapping.weight's fusion id is 0
The parameter layer1.input_mapping.bias's fusion id is 0
The parameter layer1.output_mapping.weight's fusion id is 0
The parameter layer1.output_mapping.bias's fusion id is 0
The parameter layer2.input_mapping.weight's fusion id is 1
The parameter layer2.input_mapping.bias's fusion id is 1
The parameter layer2.output_mapping.weight's fusion id is 1
The parameter layer2.output_mapping.bias's fusion id is 1
The parameter layer3.input_mapping.weight's fusion id is 2
The parameter layer3.input_mapping.bias's fusion id is 2
The parameter layer3.output_mapping.weight's fusion id is 2
The parameter layer3.output_mapping.bias's fusion id is 2

在编译图的流程中,相同融合标记并且是相同的通信操作,会被融合成一个通信操作。从而减少通信操作的数量。对于融合标记为0的通信算子时,优化流程中不会对它们进行融合。

开启优化器切分时,网络中每个参数都会产生一个相应的通信算子,然而频繁地调用通信算子将造成较多的算子启动消耗。将这些通信算子融合成一个通信算子,是最有效减少通信算子个数的办法。MindSpore提供了但这样会导致计算资源的浪费。例如,将所有的通信算子融合成一个算子后,在当前训练迭代中,NPU需要等待切分的参数汇聚完成后才能进行网络的前向计算。这样会造成设备的等待。

为了避免上述问题,可以将网络参数进行分组融合:在上一组参数进行的计算的同时,进行下组参数的通信,使得计算和通信能够互相隐藏。这就是上述代码将layer2layer3设置不同fusion值的原因。

运行代码

上述代码需要在配置分布式变量后才可以运行。Ascend环境需要配置RANK_TABLE_FILE、RANK_ID和DEVICE_ID。配置的过程请参考此处,GPU环境需要配置OpenMPI、NCCL和HOST_FILE,配置的过程请参考此处

Ascend分布式相关的环境变量有:

  • RANK_TABLE_FILE:组网信息文件的路径。rank_table_file文件可以使用models代码仓中的hccl_tools.py生成,可以从此处获取。

  • DEVICE_ID:当前卡在机器上的实际序号。

  • RANK_ID:当前卡的逻辑序号。

GPU分布式相关的环境变量:

  • HOST_FILE: 描述多卡训练时的设备IP和个数。文件每一行格式为[hostname] slots=[slotnum],hostname可以是ip或者主机名。需要注意的是,不同机器上的用户名需要相同,但是hostname不可以相同。

用户可以通过此处获取上述的此文档中的脚本。执行下述的bash脚本即可运行程序,输出日志在device0/train.log0文件。

#!/bin/bash
set -e
echo "=============================================================================================================="
echo "Please run the script as: "
echo "bash run_fusion_example.sh DATA_PATH RANK_SIZE"
echo "For example: bash run_fusion_example.sh 8"
echo "It is better to use the absolute path."
echo "This example is expected to run on the Ascend environment."
echo "=============================================================================================================="
RANK_SIZE=$1

EXEC_PATH=$(pwd)

test_dist_8pcs()
{
    export RANK_TABLE_FILE=${EXEC_PATH}/rank_table_8pcs.json
    export RANK_SIZE=8
}

test_dist_2pcs()
{
    export RANK_TABLE_FILE=${EXEC_PATH}/rank_table_2pcs.json
    export RANK_SIZE=2
}

test_dist_${RANK_SIZE}pcs

for((i=0;i<${RANK_SIZE};i++))
do
    rm -rf device$i
    mkdir device$i
    cp ./fusion_example.py ./device$i
    cd ./device$i
    export DEVICE_ID=$i
    export RANK_ID=$i
    echo "start training for device $i"
    env > env$i.log
    pytest -s -v ./fusion_example.py > train.log$i 2>&1 &
    cd ../
done
echo "The program launch succeed, the log is under device0/train.log0."

在当前目录下配置完RANK_TABLE_FILE之后,下述的命令要求用户拥有8张Ascend 910设备。运行命令如下:

bash run_fusion_example.sh 8