分布式并行

下载Notebook下载样例代码查看源文件

背景

随着深度学习的快速发展,为了提升神经网络的精度和泛化能力,数据集和参数量都在呈指数级向上攀升。分布式并行训练成为一种解决超大规模网络性能瓶颈的发展趋势。MindSpore支持了当前主流的分布式训练范式并开发了一套自动混合并行解决方案。本篇设计文档将会集中介绍几种并行训练方式的设计原理,同时指导用户进行自定义开发。

概念

集合通信

集合通信指在一组进程间通信,组内所有进程满足一定规则的发送和接收数据。MindSpore通过集合通信的方式进行并行训练过程中的数据传输工作,在Ascend芯片上它依赖于华为集合通信库HCCL完成,在GPU上它依赖于英伟达集合通信库NCCL完成。

同步模式

在同步模式下,所有的设备同时开始训练,并且当反向传播算法完成之后同步更新参数的取值。MindSpore目前采用的是同步训练模式。

数据并行

这个小节介绍了在MindSpore中ParallelMode.DATA_PARALLEL数据并行模式是如何工作的。

数据并行原理

数据并行图解

  1. 环境依赖

    每次开始进行并行训练前,通过调用mindspore.communication.init接口初始化通信资源,并自动创建全局通信组WORLD_COMM_GROUP

  2. 数据分发(Data distribution)

    数据并行的核心在于将数据集在样本维度拆分并下发到不同的卡上。在mindspore.dataset模块提供的所有数据集加载接口中都有num_shardsshard_id两个参数,它们用于将数据集拆分为多份并循环采样的方式,采集batch大小的数据到各自的卡上,当出现数据量不足的情况时将会从头开始采样。

  3. 网络构图

    数据并行网络的书写方式与单机网络没有差别,这是因为在正反向传播(Forward propagation & Backward Propagation)过程中各卡的模型间是独立执行的,只是保持了相同的网络结构。唯一需要特别注意的是为了保证各卡间训练同步,相应的网络参数初始化值应当是一致的,在DATA_PRALLELHYBRID_PARALLEL模式下建议通过使能parameter_broadcast达到权重广播的目的;在AUTO_PRALLELSEMI_AUTO_PARALLEL模式下,框架内部会自动分析参数的并行度,并设置相应的随机数种子,保证在数据并行维度的设备上参数初始化值一致。

  4. 梯度聚合(Gradient aggregation)

    数据并行理论上应该实现和单机一致的训练效果,为了保证计算逻辑的一致性,在梯度计算完成后插入AllReduce算子实现各卡间的梯度聚合操作。MindSpore设置了mean开关,用户可以选择是否要对求和后的梯度值进行求平均操作,也可以将其视为超参项,打开开关等价于学习率倍数缩小。

  5. 参数更新(Parameter update)

    因为引入了梯度聚合操作,所以各卡的模型会以相同的梯度值一起进入参数更新步骤。因此MindSpore实现的是一种同步数据并行训练方式。理论上最终每卡训练出来的模型是相同的,如果网络中含有在样本维度的归约类型操作,网络的输出可能会有所差别,这是由数据并行的切分性质决定的。

数据并行代码

  1. 集合通信

    • management.py:这个文件中涵盖了集合通信过程中常用的helper函数接口,例如获取集群数量和卡的序号等。当在Ascend芯片上执行时,框架会加载环境上的libhccl.so库文件,通过它来完成从Python层到底层的通信接口调用。

    • comm_ops.py:MindSpore将支持的集合通信操作都封装为算子的形式放在这个文件下,包括AllReduceAllGatherReduceScatterBroadcast等。PrimitiveWithInfer中除了定义算子所需属性外,还包括构图过程中输入到输出的shapedtype推导。

  2. 梯度聚合

    • grad_reducer.py:这个文件实现了梯度聚合的过程。对入参gradsHyperMap展开后插入AllReduce算子,这里采用的是全局通信组,用户也可以根据自己网络的需求仿照这个模块进行自定义开发。MindSpore中单机和分布式执行共用一套网络封装接口,在Cell内部通过ParallelMode来区分是否要对梯度做聚合操作,网络封装接口建议参考TrainOneStepCell代码实现。

自动并行

自动并行作为MindSpore的关键特性,用于实现自动的数据并行加模型并行的混合并行训练方式,旨在帮助用户以单机的脚本表达并行算法逻辑,降低分布式训练难度,提高算法研发效率,同时又能保持训练的高性能。这个小节介绍了在MindSpore中ParallelMode.AUTO_PARALLEL自动并行模式及ParallelMode.SEMI_AUTO_PARALLEL半自动并行模式是如何工作的。

自动并行原理

自动并行图解

  1. 分布式算子和张量排布模型

    在上面的架构图中,自动并行流程会对单机的正向计算图(ANF Graph)进行遍历,以分布式算子(Distributed Operator)为单位对张量进行切分建模,表示一个算子的输入输出张量如何分布到集群各个卡上(Tensor Layout)。这种模型充分地表达了张量和设备间的映射关系,用户无需感知模型各切片放到哪个设备上运行,框架会自动调度分配。

    为了得到张量的排布模型,每个算子都具有切分策略(Shard Strategy),它表示算子的各个输入在相应维度的切分情况。通常情况下只要满足以2为基、均匀分配的原则,张量的任意维度均可切分。以下图为例,这是一个三维矩阵乘(BatchMatMul)操作,它的切分策略由两个元组构成,分别表示inputweight的切分形式。其中元组中的元素与张量维度一一对应,2^N为切分份数,1表示不切。当用户想表示一个数据并行切分策略时,即inputbatch维度切分,其他维度不切,可以表达为strategy=((2^N, 1, 1),(1, 1, 1));当表示一个模型并行切分策略时,即weight的非batch维度切分,这里以channel维度切分为例,其他维度不切,可以表达为strategy=((1, 1, 1),(1, 1, 2^N));当表示一个混合并行切分策略时,其中一种切分策略为strategy=((2^N, 1, 1),(1, 1, 2^N))

    算子切分定义

    依据切分策略,分布式算子中定义了推导算子输入张量和输出张量的排布模型的方法。这个排布模型由device_matrixtensor_shapetensor map组成,分别表示设备矩阵形状、张量形状、设备和张量维度间的映射关系。分布式算子会进一步根据张量排布模型判断是否要在图中插入额外的计算、通信操作,以保证算子运算逻辑正确。

  2. 张量排布变换

    当前一个算子的输出张量模型和后一个算子的输入张量模型不一致时,就需要引入计算、通信操作的方式实现张量排布间的变化。自动并行流程引入了张量重排布算法(Tensor Redistribution),可以推导得到任意排布的张量间通信转换方式。下面三个样例表示公式Z=(X×W)×V的并行计算过程, 即两个二维矩阵乘操作,体现了不同并行方式间如何转换。 在样例一中,第一个数据并行矩阵乘的输出在行方向上存在切分,而第二个模型并行矩阵乘的输入需要全量张量,框架将会自动插入AllGather算子实现排布变换。

    tensor-redistribution1

    在样例二中,第一个模型并行矩阵乘的输出在列方向上存在切分,而第二个数据并行矩阵乘的输入在行方向上存在切分,框架将会自动插入等价于集合通信中AlltoAll操作的通信算子实现排布变换。

    tensor-redistribution2

    在样例三中,第一个混合并行矩阵乘的输出切分方式和第二个混合并行矩阵乘的输入切分方式一致,所以不需要引入重排布变换。但由于第二个矩阵乘操作中,两个输入的相关维度存在切分,所以需要插入AllReduce算子保证运算正确性。

    tensor-redistribution3

    综上,1、2两点是自动并行实现的基础,总体来说这种分布式表达打破了数据并行和模型并行的边界,轻松实现混合并行。从脚本层面上,用户仅需构造单机网络,即可表达并行算法逻辑,框架将自动实现对整图切分。

  3. 切分策略搜索算法

    当用户熟悉了算子的切分表达,并手动对算子配置切分策略,这就是SEMI_AUTO_PARALLEL半自动并行模式。这种方式对手动调优有帮助,但还是具有一定的调试难度,用户需要掌握并行原理,并根据网络结构、集群拓扑等计算分析得到高性能的并行方案。为了帮助用户加速并行网络训练过程,在半自动并行模式的基础上,AUTO_PARALLEL自动并行模式支持并行策略传播(Sharding Propagation),能够有效地降低用户手配算子切分策略的工作量,算法将切分策略由用户配置的算子向未配置的算子传播。为进一步降低用户手配算子切分策略的工作量,支持切分策略完全自动搜索。为此,围绕硬件平台构建相应的代价函数模型(Cost Model),计算出一定数据量、一定算子在不同切分策略下的计算开销(Computation Cost),内存开销(Memory Cost)及通信开销(Communication Cost)。然后通过动态规划算法(Dynamic Programming)或者递归规划算法(Recursive Programming),以单卡的内存上限为约束条件,高效地搜索出性能较优的切分策略。

    策略搜索这一步骤代替了用户手动指定模型切分,在短时间内可以得到较高性能的切分方案,极大降低了并行训练的使用门槛。

  4. 分布式自动微分

    传统的手动模型切分除了需要关注正向网络通信还需要考虑网络反向的并行运算,MindSpore通过将通信操作包装为算子,并利用框架原有的自动微分操作自动生成通信算子反向,所以即便在进行分布式训练时,用户同样只需关注网络的前向传播,真正实现训练的全自动并行。

自动并行代码

  1. 张量排布模型

    • tensor_layout:这个目录下包含了张量排布模型相关功能的定义及实现。其中tensor_layout.h中声明了一个张量排布模型需要具备的成员变量tensor_map_origin_tensor_shape_device_arrangement_等。在tensor_redistribution.h中声明了实现张量排布间from_origin_to_origin_变换的相关方法,将推导得到的重排布操作保存在operator_list_中返回,并计算得到重排布所需的通信开销comm_cost_, 内存开销memory_cost_及计算开销computation_cost_

  2. 分布式算子

    • ops_info:这个目录下包含了分布式算子的具体实现。在operator_info.h中定义了分布式算子实现的基类OperatorInfo,开发一个分布式算子需要继承于这个基类并显式实现相关的虚函数。其中InferTensorInfoInferTensorMapInferDevMatrixShape函数定义了推导该算子输入、输出张量排布模型的算法。InferForwardCommunicationInferMirrorOps等函数定义了切分该算子需要插入的额外计算、通信操作。CheckStrategyGenerateStrategies函数定义了算子切分策略校验和生成。根据切分策略SetCostUnderStrategy将会产生该策略下分布式算子的并行开销值operator_cost_

  3. 策略搜索算法

    • auto_parallel:这个目录下实现了切分策略搜索的算法。graph_costmodel.h定义了构图信息,其中每个点表示一个算子OperatorInfo,有向边edge_costmodel.h表示算子的输入输出关系及重排布的代价。operator_costmodel.h中定义了每个算子的代价模型,包括计算代价、通信代价和内存代价。dp_algorithm_costmodel.h主要描述了动态规划算法的主要流程,由一系列图操作组成。在costmodel.h中定义了cost和图操作的数据结构。

  4. 设备管理

    • device_manager.h:这个文件实现了集群设备通信组的创建及管理。其中设备矩阵模型由device_matrix.h定义,通信域由group_manager.h管理。

  5. 整图切分

    • step_auto_parallel.h, step_parallel.h:这两个文件包含了自动并行流程的核心实现。首先由step_auto_parallel.h调用策略搜索流程并产生分布式算子的OperatorInfo,然后在step_parallel.h中处理算子切分和张量重排布等流程,对单机计算图进行分布式改造。

  6. 通信算子反向

    • grad_comm_ops.py:这个文件定义了AllReduceAllGather等通信算子的反向操作。

异构并行

异构并行训练方法是通过分析图上算子内存占用和计算密集度,将内存消耗巨大或适合CPU逻辑处理的算子切分到CPU子图,将内存消耗较小计算密集型算子切分到硬件加速器子图,框架协同不同子图进行网络训练,使得处于不同硬件且无依赖关系的子图能够并行进行执行的过程。

计算流程

MindSpore异构并行训练典型的计算流程如下图所示:

heterogeneous-heter

  1. 用户设置网络执行的后端

[ ]:
import mindspore as ms
ms.set_context(device_target="GPU")
  1. 用户设置特定算子执行后端

[1]:
from mindspore import ops

prim = ops.Add()

prim.set_device("CPU")
  1. 框架根据计算图算子标志进行切图

  2. 框架调度不同后端执行子图

当前典型使用异构并行计算的场景有:优化器异构、Embedding异构、PS异构。

优化器异构

在盘古或GPT3大模型训练过程中,优化器状态占用了大量内存,进而限制了可训练的模型规模。使用优化器异构,将优化器指定到CPU上执行,可以极大扩展可训练模型规模:

heterogeneous-heter-opt

如图所示,将Adam算子配置到CPU执行同时指定加速器进行FP16计算,可以将参数内存占用降低到原始的1/3。

  1. 配置优化器算子到CPU执行

  2. 初始化FP16的权重参数以及FP32的优化器状态变量

  3. 将输入优化器的梯度转为FP16(如果本来就是FP16梯度,可忽略这步)

  4. 权重和梯度转为FP32参与优化器运算

  5. 更新后的FP32权重赋值给FP16的权重

优化器异构代码样例如下:

[2]:
import numpy as np
import mindspore as ms
import mindspore.ops as ops
from mindspore.common.initializer import initializer
from mindspore.nn import Optimizer
_adam_opt = ops.MultitypeFuncGraph("adam_opt")
host_assign = ops.Assign()
host_assign.set_device("CPU")
host_cast = ops.Cast()
host_cast.set_device("CPU")
device_cast = ops.Cast()

@_adam_opt.register("Function", "Tensor", "Tensor", "Tensor", "Tensor", "Number", "Tensor", "Tensor", "Tensor",
                    "Tensor", "Bool", "Bool")
def _update_run_kernel(opt, beta1, beta2, eps, lr, weight_decay, param, m, v, gradient, decay_flags, optim_filter):
    """
    Update parameters by AdamWeightDecay op.
    """
    success = True
    if optim_filter:
        param32 = host_cast(param, ms.float32)
        gradient = device_cast(gradient, ms.float32)
        if decay_flags:
            next_param = opt(param32, m, v, lr, beta1, beta2, eps, weight_decay, gradient)
        else:
            next_param = opt(param32, m, v, lr, beta1, beta2, eps, 0.0, gradient)
        ret = host_assign(param, host_cast(ops.depend(param32, next_param), ops.dtype(param)))
        return ops.depend(success, ret)
    return success

class AdamWeightDecayOp(Optimizer):
    def __init__(self, params, learning_rate=1e-3, beta1=0.9, beta2=0.999, eps=1e-6, weight_decay=0.0):
        super(AdamWeightDecayOp, self).__init__(learning_rate, params, weight_decay)
        self.beta1 = ms.Tensor(np.array([beta1]).astype(np.float32))
        self.beta2 = ms.Tensor(np.array([beta2]).astype(np.float32))
        self.eps = ms.Tensor(np.array([eps]).astype(np.float32))
        self.moments1 = self.clone_param32(prefix="adam_m", init='zeros')
        self.moments2 = self.clone_param32(prefix="adam_v", init='zeros')
        self.opt = ops.AdamWeightDecay()
        self.hyper_map = ops.HyperMap()
        self.opt.set_device("CPU")

    def construct(self, gradients):
        """AdamWeightDecayOp"""
        lr = self.get_lr()
        if self.is_group:
            if self.is_group_lr:
                optim_result = self.map_reverse(ops.partial(_adam_opt, self.opt, self.beta1, self.beta2, self.eps),
                                                lr, self.weight_decay, self.parameters, self.moments1, self.moments2,
                                                gradients, self.decay_flags, self.optim_filter)
            else:
                optim_result = self.map_reverse(ops.partial(_adam_opt, self.opt, self.beta1, self.beta2, self.eps, lr),
                                                self.weight_decay, self.parameters, self.moments1, self.moments2,
                                                gradients, self.decay_flags, self.optim_filter)
        else:
            optim_result = self.map_reverse(ops.partial(_adam_opt, self.opt, self.beta1, self.beta2, self.eps, lr,
                                                        self.weight_decay), self.parameters, self.moments1, self.moments2,
                                            gradients, self.decay_flags, self.optim_filter)
        return optim_result

    def clone_param32(self, prefix, init=None):
        new = []
        for old_param in self.parameters:
            param_init = init
            if init is None:
                param_init = old_param.init
            new_state = old_param.clone()
            new_state.set_dtype(ms.float32)
            new_state.set_data(initializer(param_init, shape=old_param.shape, dtype=ms.float32))
            new_state.name = prefix + '.' + new_state.name
            new.append(new_state)
        return ms.ParameterTuple(new)

步骤4、5也可以直接融合到优化器算子中做进一步优化,完整的优化器异构训练流程可以参考: https://gitee.com/mindspore/models/tree/r2.0/official/nlp/Pangu_alpha

Embedding异构

在一些需要查Embedding大表的网络中,Embedding表往往有上百G的规模,受加速器内存大小限制,无法直接将整表加载到加速器上执行。通过将与权重表相连的算子放到CPU上执行,避免加速器由于内存限制而无法训练网络的问题。

heterogeneous-heter-embed

  1. 配置EmbeddingLookup算子到CPU执行

[3]:
import mindspore.nn as nn
import mindspore.ops as ops
import mindspore as ms
from mindspore.common.initializer import initializer
class EmbeddingLookupNet(nn.Cell):
    def __init__(self, vocab_size, embedding_size, param_init='normal'):
        super(EmbeddingLookupNet, self).__init__()
        self.embeddinglookup = ops.EmbeddingLookup().set_device('CPU')
        self.embedding_table = ms.Parameter(initializer(param_init, [vocab_size, embedding_size]), name='embedding_table')

    def construct(self, indices):
        out = self.embeddinglookup(self.embedding_table, indices, 0)
        return out
  1. 配置EmbeddingLookup关联稀疏优化器到CPU执行

[4]:
from mindspore.nn.optim import LazyAdam
net = EmbeddingLookupNet(1000, 100)
params = net.trainable_params()
optimizer = LazyAdam(params)
optimizer.target = "CPU"

EmbeddingLookup算子设置代码样例如下:

[5]:
import mindspore.nn as nn
import mindspore.ops as ops
import mindspore as ms
from mindspore.common.initializer import initializer

class EmbeddingLookup(nn.Cell):
    def __init__(self, vocab_size, embedding_size, param_init='normal',
                 target='CPU', sparse=True):
        """Initialize EmbeddingLookup."""
        super(EmbeddingLookup, self).__init__()
        validator.check_value_type('sparse', sparse, [bool], self.cls_name)
        self.vocab_size = validator.check_positive_int(vocab_size, 'vocab_size')
        self.target = target
        self.sparse = sparse
        if target not in ('CPU', 'DEVICE'):
            raise ValueError('Attr \'target\' of \'EmbeddingLookup\' Op passed '
                             + str(target) + ', should be one of values in \'CPU\', \'DEVICE\'.')
        if not sparse and target == 'CPU':
            raise ValueError('When target is CPU, embedding_lookup must be sparse.')
        if sparse:
            self.gatherv2 = ops.SparseGatherV2()
        else:
            self.gatherv2 = ops.Gather()
        self.embeddinglookup = ops.EmbeddingLookup().set_device('CPU')
        self.embedding_size = validator.check_positive_int(embedding_size, 'embedding_size')
        self.embedding_table = ms.Parameter(initializer(param_init, [self.vocab_size, self.embedding_size]),
                                            name='embedding_table')

    def construct(self, indices):
        if self.target == "CPU":
            out = self.embeddinglookup(self.embedding_table, indices, 0)
        else:
            out = self.gatherv2(self.embedding_table, indices, 0)
        return out

当前nn目录下的EmbeddingLookup、FTRL、LazyAdam等算子已经封装好异构接口,用户只需设置target属性为CPU或DEVICE即可切换执行后端。

整体调用流程可以参考:https://gitee.com/mindspore/models/tree/r2.0/official/recommend/Wide_and_Deep

PS异构

在EmbeddingTable达到T级别,单机内存无法放下时,使用Parameter Server,通过异构的Pull/Push算子进行权重的拉取和更新。

heterogeneous-heter-ps

Parameter Server封装异构流程,用户只需配置参数使用PS即可,具体配置流程请参考Parameter Server训练流程

此外,wide&deep网络中也有使用PS的流程,可参考:https://gitee.com/mindspore/models/tree/r2.0/official/recommend/Wide_and_Deep

约束

当前需要用户指定算子执行的后端,不支持根据网络进行自动化配置。