mindspore.parallel.auto_parallel.AutoParallel

查看源文件
class mindspore.parallel.auto_parallel.AutoParallel(network, parallel_mode='semi_auto')[源代码]

并行配置的基本单元,静态图模式下需封装顶层Cell或函数并指定并行模式。

说明

  • 使用Model高阶接口进行训练或推理时,传入Model的network必须用AutoParallel进行封装。

  • 使用函数式训练或推理接口时,必须在最外层进行AutoParallel进行封装,即对AutoParallel进行编译。

  • 使用函数式训练或推理接口时,暂不支持数据下沉场景。

参数:
  • network (Cell或Function,必选) - 待封装的前向网络的顶层Cell或函数,定义了将被并行化的核心网络结构。

  • parallel_mode (str,可选) - 并行模式,指定并行策略,可选项: semi_autosharding_propagationrecursive_programming。默认为 semi_auto,支持:

    • "semi_auto": 半自动化并行模式,支持数据并行、算子级并行、优化器并行和流水线并行场景,默认开启该模式

    • "sharding_propagation": 策略传播搜索模式

    • "recursive_programming": 递归编程搜索模式

支持平台:

Ascend

样例:

说明

需要使用msrun进行启动。

>>> import os
>>> import mindspore as ms
>>> import mindspore.dataset as ds
>>> from mindspore import nn, ops
>>> from mindspore.communication import init, get_rank
>>> from mindspore.common.initializer import initializer
>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> from mindspore.train import Model
>>> from mindspore.train import LossMonitor
>>> ms.set_context(mode=ms.GRAPH_MODE)
>>> init()
>>> ms.set_seed(1)
>>>
>>> # Create the dataset taking MNIST as an example. Refer to
>>> # https://gitee.com/mindspore/docs/blob/master/docs/mindspore/code/mnist.py
>>>
>>> def create_dataset(batch_size):
...    dataset_path = os.getenv("DATA_PATH")
...    dataset = ds.MnistDataset(dataset_path)
...    image_transforms = [
...        ds.vision.Rescale(1.0 / 255.0, 0),
...        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
...        ds.vision.HWC2CHW()
...    ]
...    label_transform = ds.transforms.TypeCast(ms.int32)
...    dataset = dataset.map(image_transforms, 'image')
...    dataset = dataset.map(label_transform, 'label')
...    dataset = dataset.batch(batch_size)
...    return dataset
>>>
>>> dataset = create_dataset(32)
>>>
>>> from mindspore import nn, ops, Parameter
>>> from mindspore.common.initializer import initializer, HeUniform
>>> import math
>>>
>>> class MatMulCell(nn.Cell):
...     def __init__(self, param=None, shape=None):
...         super().__init__()
...         if shape is None:
...             shape = [28 * 28, 512]
...         weight_init = HeUniform(math.sqrt(5))
...         self.param = Parameter(initializer(weight_init, shape), name="param")
...         if param is not None:
...             self.param = param
...         self.print = ops.Print()
...         self.matmul = ops.MatMul()
...
...     def construct(self, x):
...         out = self.matmul(x, self.param)
...         self.print("out is:", out)
...         return out
>>>
>>> class Network(nn.Cell):
...     def __init__(self):
...         super().__init__()
...         self.flatten = nn.Flatten()
...         self.layer1 = MatMulCell()
...         self.relu1 = nn.ReLU()
...         self.layer2 = nn.Dense(512, 512)
...         self.relu2 = nn.ReLU()
...         self.layer3 = nn.Dense(512, 10)
...
...     def construct(self, x):
...         x = self.flatten(x)
...         x = self.layer1(x)
...         x = self.relu1(x)
...         x = self.layer2(x)
...         x = self.relu2(x)
...         logits = self.layer3(x)
...         return logits
>>>
>>> import mindspore as ms
>>> from mindspore import nn, ops
>>> from mindspore.parallel.nn import Pipeline, PipelineGradReducer
>>> from mindspore.nn.utils import no_init_parameters
>>>
>>> with no_init_parameters():
>>>     net = Network()
>>>     optimizer = nn.SGD(net.trainable_params(), 1e-2)
>>>     pp_grad_reducer = PipelineGradReducer(optimizer.parameters, opt_shard=False)
>>>
>>> loss_fn = nn.CrossEntropyLoss()
>>> net_with_loss = Pipeline(nn.WithLossCell(net, loss_fn), 4, stage_config={"_backbone.flatten":0,
>>>     "_backbone.layer1": 0, "_backbone.relu1": 0, "_backbone.layer2": 1,
>>>     "_backbone.relu2": 1, "_backbone.layer3": 1})
>>> parallel_net = AutoParallel(net_with_loss, parallel_mode="semi_auto")
>>> parallel_net.hsdp()
>>> parallel_net.pipeline(stages=2)
>>> parallel_net.dataset_strategy("data_parallel")
>>> parallel_net.save_param_strategy_file(f"/tmp/param_{get_rank()}.ckpt")
>>> parallel_net.set_group_ckpt_save_file(f"/tmp/comm_group_{get_rank()}.ckpt")
>>> parallel_net.dump_local_norm(f"/tmp/local_norm_{get_rank()}")
>>> parallel_net.disable_strategy_file_only_for_trainable_params()
>>> parallel_net.enable_fp32_communication()
>>> parallel_net.enable_device_local_norm()
>>> parallel_net.enable_gradients_mean()
>>> parallel_net.disable_gradient_fp32_sync()
>>> parallel_net.disable_loss_repeated_mean()
>>>
>>> loss_monitor = LossMonitor(per_print_times=1)
>>> model = Model(network=parallel_net, optimizer=optimizer)
>>> model.train(epoch=2, train_dataset=dataset, callbacks=[loss_monitor])
load_operator_strategy_file(file_path)[源代码]

在使用策略传播模式时,设置加载策略JSON文件的路径。

说明

  • 只在策略传播并行模式下生效。

  • 在分布式训练场景,用户可以首先在单卡下用Dryrun设置策略,然后再加载策略进行分布式训练。

警告

  • 实验性接口,未来可能变更或移除。

  • 暂不支持加载策略时使用Layout格式。

参数:
  • file_path (dict) - 加载并行策略JSON文件的路径,必须是绝对路径。

异常:
  • TypeError - 文件路径类型非字符串。

  • KeyError - 文件路径非绝对路径。

  • KeyError - 文件路径非JSON文件后缀结尾。

样例:

>>> # Define the network structure of ParallelNetwork. Refer to
>>> the example of 'AutoParallel.save_operator_strategy_file(file_path)'
>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> init(backend_name='hccl')
>>> strategy = ((1, 1), (1, 2))
>>> net = ParallelNetwork(strategy)
>>> parallel_net = AutoParallel(net, parallel_mode='sharding_propagation')
>>> parallel_net.load_operator_strategy_file("/tmp/strategy.json")
save_operator_strategy_file(file_path)[源代码]

在使用策略传播模式时,设置保存策略JSON文件的路径。

说明

  • 只在策略传播并行模式下生效。

  • 在分布式训练场景,用户可以首先在单卡下用Dryrun设置策略,然后再加载策略进行分布式训练。

警告

  • 实验性接口,未来可能变更或移除。

  • 暂不支持加载策略时使用Layout格式。

参数:
  • file_path (str) - 保存并行策略JSON文件的路径,必须是绝对路径。

异常:
  • TypeError - 文件路径类型非字符串。

  • KeyError - 文件路径非绝对路径。

  • KeyError - 文件路径非JSON文件后缀结尾。

样例:

>>> import math
>>> import mindspore as ms
>>> import numpy as np
>>> from mindspore import nn, ops
>>> from mindspore.communication.management import init
>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> from mindspore.common.initializer import initializer, HeUniform
>>>
>>> class ParallelNetwork(nn.Cell):
...     def __init__(self, strategy=None):
...         super().__init__()
...         self.flatten = ops.Flatten()
...         self.fc1_weight = ms.Parameter(initializer(HeUniform(math.sqrt(5)), shape=[
...             16, 10], dtype=ms.float32), name="fc1")
...         self.matmul1 = ops.MatMul().shard(strategy)
...         self.relu1 = ops.ReLU()
...
...     def construct(self, x):
...         x = self.flatten(x)
...         x = self.matmul1(x, self.fc1_weight)
...         x = self.relu1(x)
...         return x
>>>
>>> init(backend_name='hccl')
>>> strategy = ((1, 1), (1, 2))
>>> net = ParallelNetwork(strategy)
>>> parallel_net = AutoParallel(net, parallel_mode='sharding_propagation')
>>> parallel_net.save_operator_strategy_file("/tmp/strategy.json")
dataset_strategy(config)[源代码]

设置数据集分片策略。

参数:
  • config (Union[str, tuple(tuple), tuple(Layout)]) - 数据集切分策略配置。默认模式:"data_parallel",若需将数据集拆分到不同设备上并行处理,可设置此策略。若需要加载全批量数据集,则必须将数据集分片策略设为 "full_batch"。若通过模型并行策略(如ds_stra ((1, 8), (1, 8)))将数据集加载到网络中,需调用 AutoParallel.dataset_strategy(ds_stra) 方法。此外,数据集策略也支持Layout形式。

异常:
  • TypeError - config不是字符串或元组类型。

  • TypeError - config是元组类型时,元组中的元素不是"元组类型"或者"Layout类型"中的一个。

  • TypeError - config是元组类型时,且元组中的元素是元组类型时,子元组的元素类型不是"int"。

  • ValueError - 输入config为空。

  • ValueError - config为字符串类型时,取值不是"full_batch"或"data_parallel"中的一个。

print_local_norm()[源代码]

开启后打印local norm值。

样例:

>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> # Define the network structure of LeNet5. Refer to
>>> # https://gitee.com/mindspore/docs/blob/master/docs/mindspore/code/lenet.py
>>> net = LeNet5()
>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> parallel_net.print_local_norm()
dump_local_norm(file_path)[源代码]

指定local norm值的保存路径。

参数:
  • file_path (str) - 保存路径,默认值为 ""

异常:
  • TypeError - 文件路径类型非字符串

enable_device_local_norm()[源代码]

开启后打印device local norm值。

no_init_parameters_in_compile()[源代码]

开启后,在编译过程中,不进行模型权重参数初始化。

警告

实验性接口,未来可能变更或移除。

样例:

>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> # Define the network structure of LeNet5. Refer to
>>> # https://gitee.com/mindspore/docs/blob/master/docs/mindspore/code/lenet.py
>>> net = LeNet5()
>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> parallel_net.no_init_parameters_in_compile()
comm_fusion(config)[源代码]

用于设置并行通信算子的融合配置。

参数:
  • config (dict) - 输入格式为{"通信类型": {"mode":str, "config": None int 或者 list}}, 每种通信算子的融合配置有两个键:"mode"和"config"。支持以下通信类型的融合类型和配置:

    • openstate:是否开启通信融合功能。通过 TrueFalse 来开启或关闭通信融合功能。默认值: True,开启通信融合功能。

    • allreduce:进行AllReduce算子的通信融合。"mode"包含"auto"、"size"和"index"。在"auto"模式下,融合梯度变量的大小,默认值阈值为"64"MB,"config"对应的值为None。在"size"模式下,需要用户在config的字典中指定梯度大小阈值,这个值必须大于"0"MB。在"mode"为"index"时,它与"all_reduce_fusion_config"相同,用户需要给"config"传入一个列表,里面每个值表示梯度的索引。

    • allgather:进行AllGather算子的通信融合。"mode"包含"auto"、"size"。在 'auto' 模式下,AllGather融合由梯度值决定,其默认融合配置阈值为 '64' MB。在 'size' 模式下,手动设置AllGather算子融合的梯度阈值,并且其融合阈值必须大于 '0' MB。

    • reducescatter:进行ReduceScatter算子的通信融合。"mode"包含"auto"、"size","auto" 和 "size"模式的配置方式与allgather相同。

异常:
  • TypeError - 配置项非字典类型。

样例:

>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> # Define the network structure of LeNet5. Refer to
>>> # https://gitee.com/mindspore/docs/blob/master/docs/mindspore/code/lenet.py
>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> comm_config = {"openstate": True, "allreduce": {"mode": "auto", "config": None}}
>>> net.comm_fusion(config=comm_config)
disable_strategy_file_only_for_trainable_params()[源代码]

默认情况,MindSpore仅保存/加载可训练参数的策略信息,调用此接口后,可支持加载和保存模型的非可训练参数。

enable_fp32_communication()[源代码]

开启之后,通信期间Reduce类算子(AllReduce、ReduceScatter)强制使用fp32数据类型进行通信。

load_param_strategy_file(file_path)[源代码]

设置加载并行策略checkpoint的路径,默认仅加载可训练参数的策略信息。

参数:
  • file_path (str) - 加载路径。

异常:
  • TypeError - 文件路径类型非字符串。

样例:

>>> from mindspore.parallel.auto_parallel import AutoParallel
>>> parallel_net = AutoParallel(net)
>>> parallel_net.load_param_strategy_file(file_path="./train_strategy.ckpt")
save_param_strategy_file(file_path)[源代码]

设置保存并行策略checkpoint的路径,默认仅保存可训练参数的策略信息。

参数:
  • file_path (str) - 保存路径。

异常:
  • TypeError - 文件路径类型非字符串。

set_group_ckpt_save_file(file_path)[源代码]

指定图编译过程中所创建group的保存路径。

参数:
  • file_path (str) - 保存路径。

异常:
  • TypeError - 文件路径类型非字符串。

transformer_opt(file_path)[源代码]

并行加速配置文件,配置项可以参考 parallel_speed_up.json。当设置为None时,表示不启用。

参数:
  • file_path (Union[str, None]): 并行加速配置文件,配置项可以参考 parallel_speed_up.json 。 当设置为None时,表示不启用。

    • recomputation_communication_overlap (bool): 为 True 时表示开启反向重计算和通信掩盖。默认值: False

    • grad_matmul_communication_overlap (bool): 为 True 时表示开启反向Matmul和通信掩盖。默认值: False

    • grad_fa_allgather_overlap (bool):为 True 时表示在序列并行和开启FlashAttentionScoreGrad算子时,开启重计算以掩盖重复的AllGather。默认值: False

    • enable_communication_fusion (bool): 为 True 时表示开启通信融合进行通信算子task数量优化。默认值: False

    • grad_computation_allreduce_overlap (bool): 为 True 时表示开启梯度dx计算与数据并行梯度通信的掩盖,暂时不支持 O2 编译模式下开启。注意在数据并行梯度通信和计算掩盖良好的情况下,开启该选项后性能不一定有提升,请根据实际场景确定是否开启。默认值: False

    • computation_allgather_overlap (bool): 为 True 时表示开启正向计算与优化器并行的AllGather通信的掩盖,暂时不支持 O2 编译模式下开启。注意在权重聚合通信和计算掩盖良好的情况下,开启该选项后性能不一定有提升,请根据实际场景确定是否开启。默认值: False

    • enable_concat_eliminate_opt (bool): 为 True 时表示开启Concat消除优化,当前在开启细粒度双副本优化时有收益。默认值: False

    • enable_begin_end_inline_opt (bool): 为 True 时表示开启首尾micro_batch子图的内联,用于半自动并行子图模式,流水线并行场景,一般需要和其他通信计算掩盖优化一起使用。默认值: False

    • computation_communication_fusion_level (int): 控制通算融合的级别。默认值: 0 。注:此功能需要配套Ascend Training Solution 24.0.RC2以上版本使用。

      • 0: 不启用通算融合。

      • 1: 仅对前向节点使能通算融合。

      • 2: 仅对反向节点使能通算融合。

      • 3: 对所有节点使能通算融合。

    • dataset_broadcast_opt_level (int): 数据集读取的优化级别, 目前只支持O0/O1模式,O2模式下不生效。默认值: 0

      • 0: 不启用数据集读取优化。

      • 1: 优化流水线并行中,Stage间的数据读取。

      • 2: 优化模型并行维度数据的读取。

      • 3: 同时优化场景1和2。

    • allreduce_and_biasadd_swap (bool): 为 True 时表示开启matmul-add结构下,通信算子与Add算子执行顺序互换。当前仅支持bias为一维的情况。默认值: False

    • enable_allreduce_slice_to_reducescatter (bool): 为 True 时,表示开启AllReduce优化。在batchmatmul模型并行引入AllReduce的场景中,如果后续节点是配置了模型并行的StridedSlice算子,在已识别可优化的模式中,将AllReduce优化为ReduceScatter。典型的用在开启了groupwise alltoall的MoE模块。默认值: False

    • enable_interleave_split_concat_branch (bool): 为 True 时,表示针对带enable_interleave属性的Split和Concat算子形成的分支,开启通信计算并行优化。典型的使用场景为MoE模块并行场景,对输入数据进行split后,对各切片数据进行MoE模块运算,再对分支结果进行Concat,开启后各分支的MoE模块进行通信计算并行优化。默认值: False

    • enable_interleave_parallel_branch (bool): 为 True 时,表示针对可并行的分支,如果分支汇聚点带parallel_branch属性,开启通信计算并行优化。典型的使用场景为MoE模块带路由专家和共享专家分支的并行场景,开启后并行分支进行通信计算并行优化。默认值: False

样例:

>>> from mindspore.parallel.auto_parallel import AutoParallel
>>>
>>> # Define the network structure of LeNet5. Refer to
>>> # https://gitee.com/mindspore/docs/blob/master/docs/mindspore/code/lenet.py
>>> net = LeNet5()
>>> net = AutoParallel(net, parallel_mode="semi_auto")
>>> net.transformer_opt("./parallel_speed_up.json")
pipeline(stages=1, output_broadcast=False, interleave=False, scheduler='1f1b')[源代码]

配置流水线阶段的数量,stage的结果是否广播,是否启用interleave调度,配置流水线并行时配置调度策略。

参数:
  • stages (int,可选) - 设置流水线并行的阶段信息。默认值: 1

  • output_broadcast (bool,可选) - 在执行流水线并行推理时,是否将最后阶段的结果广播到其他阶段。默认值: False

  • interleave (bool,可选) - 是否启用交错调度。默认值: False

  • scheduler (str,可选) - 调度器的类型。默认值: 1f1b

异常:
  • TypeError - stages 的类型非int。

  • ValueError - stages <= 0。

  • TypeError - output_broadcast 的类型非bool。

  • TypeError - interleave 的类型非bool。

  • TypeError - scheduler 的类型非str。

  • ValueError - scheduler 的类型非支持。

enable_gradients_mean()[源代码]

开启后,在并行模式下,对梯度执行allreduce操作后的mean操作。

样例:

>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> parallel_net.enable_gradients_mean()
disable_gradient_fp32_sync()[源代码]

开启后,关闭梯度间的fp32通信。

样例:

>>> net = Network()
>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> parallel_net.disable_gradient_fp32_sync()
disable_loss_repeated_mean()[源代码]

开启后,loss在多卡重复计算时,均值运算符不会向后执行。

样例:

>>> net = Network()
>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> parallel_net.disable_loss_repeated_mean()
hsdp(shard_size=- 1, threshold=64, optimizer_level='level1')[源代码]

设置优化器并行配置。

参数:
  • shard_size (int, 可选) - 指定优化器权重跨设备切分通信域的大小,数值范围可为 (0, 设备数量]。默认值: -1 ,表明优化器权重分片组大小将采用每个参数的数据并行组。

  • threshold (int, 可选) - 切分参数时,要求目标参数所占内存的最小值,小于该阈值的参数不会在设备间进行分片。Parameter size = shape[0] * … *shape[n] * size(dtype)。取值范围:非负数,单位:KB。。默认值: 64

  • optimizer_level (str, 可选) - 配置用于指定优化器切分的切分级别,静态图下的优化器分片实现与动态图(如 Megatron)不一致,但内存优化效果相同。默认为 "level1"

    • "level1": 对权重、优化器状态进行切分。

    • "level2": 对权重、优化器状态以及梯度进行切分。

    • "level3": 对权重、优化器状态、梯度进行切分,并且在反向开始前会对权重额外展开一次 allgather 通信,以释放前向 allgather 的显存。

异常:
  • ValueError - shard_size 不是正整数或-1。

  • ValueError - threshold 不是正整数或0。

  • ValueError - optimizer_level 取值不是 "level1" , "level2" 或 "level3" 中的一个。

样例:

>>> parallel_net = AutoParallel(net, parallel_mode="semi_auto")
>>> parallel_net.hsdp(shard_size=-1, threshold=64, optimizer_level="level1")