临终Checkpoint保存

查看源文件

概述

MindSpore临终CKPT功能基于MindIO TTP,主要针对大模型训练过程中故障恢复加速,临终Checkpoint特性通过在训练过程中发生故障后,校验中间状态数据的完整性和一致性,生成一次临时CheckPoint数据,恢复训练时能够通过该CheckPoint数据恢复,减少故障造成的训练迭代损失。

下面以一个4卡数据并行网络训练为例,介绍如何配置临终CKPT功能。 配置完成后,在训练中如遇到功能故障(主要包括:训练进程异常,训练进程异常退出),MindSpore和MindIO会停止所有卡的训练,检查最新的训练状态,并基于训练卡间的副本关系,确认是否存在可用的副本卡(好卡),如果存在则将对好卡进行临终CKPT的保存, 否则按异常退出处理。如果发生故障后,能保存第n个step的CKPT文件, 则下一次训练可从第n+1个step开始。

使用约束

  1. 仅支持Ascend后端的静态图模式。

  2. 仅支持sink_size=1, 用于保证step的正确性。

  3. 仅支持父类类型为MindSpore Optimizer的优化器。

  4. 仅支持数据并行度大于1的网络,以确保模型参数存在副本关系。

  5. 如果网络开启优化器并行,必须使能optimizer_weight_shard_size:2,并确保其生效,以使优化器参数存在副本关系,详细可以参考优化器并行

样例代码说明

您可以在这里下载完整的样例代码:

https://gitee.com/mindspore/docs/tree/r2.4.10/docs/sample_code/mindio_ttp

目录结构如下:

└─ sample_code
    ├─ mindio_ttp
       ├── mindio_ttp_case.py
       ├── msrun-resume.sh
       └── msrun.sh
    ...

其中,mindio_ttp_case.py是定义网络结构和训练过程的脚本。msrun.sh是训练脚本。msrun-resume.sh是续训脚本。

环境准备

临终CKPT功能开启需要先安装MindIO TTP, 详情参见MindIO TTP

准备数据

下载MNIST数据集,并解压数据集到项目目录。

EXEC_PATH=$(pwd)

if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
    if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
        wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
    fi
    unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/

模型定义

如下代码定义一个包含5层的网络结构。其中设置并行模式为数据并行,让每张卡都互为备份关系,以便发生异常时,临终Checkpoint功能找到有效的副本进行保存。


import os
import math
import argparse
import mindspore as ms
import mindspore.dataset as ds
from mindspore import nn, ops, Parameter, train
from mindspore.communication import init, get_rank
from mindspore.common.initializer import initializer, HeUniform

parser = argparse.ArgumentParser(description="Mindio TTP test arguments")
parser.add_argument("--is_recover",
                    type=int,
                    default=0,
                    choices=[1, 0],
                    help="Only used for resume from Mindio TTP checkpoint, default false.")
args_opt = parser.parse_args()

ms.set_context(mode=ms.GRAPH_MODE, jit_level='O1', device_target="Ascend")

ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL)
init()
ms.set_auto_parallel_context(strategy_ckpt_config={"save_file": "./src_pipeline_strategy/src_strategy_{}.ckpt".format(get_rank())})

class MatMulCell(nn.Cell):
    """
    MatMulCell definition.
    """
    def __init__(self, param=None, shape=None):
        super().__init__()
        if shape is None:
            shape = [28 * 28, 512]
        weight_init = HeUniform(math.sqrt(5))
        self.param = Parameter(initializer(weight_init, shape), name="param")
        if param is not None:
            self.param = param
        self.matmul = ops.MatMul()

    def construct(self, x):
        out = self.matmul(x, self.param)
        return out


class Network(nn.Cell):
    """
    Network definition.
    """
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layer1 = MatMulCell()
        self.relu1 = nn.ReLU()
        self.layer2 = nn.Dense(512, 5120)
        self.relu2 = nn.ReLU()
        self.layer3 = nn.Dense(5120, 5120)
        self.relu3 = nn.ReLU()
        self.layer4 = nn.Dense(5120, 512)
        self.relu4 = nn.ReLU()
        self.layer5 = nn.Dense(512, 10)

    def construct(self, x):
        x = self.flatten(x)
        x = self.layer1(x)
        x = self.relu1(x)
        x = self.layer2(x)
        x = self.relu2(x)
        x = self.layer3(x)
        x = self.relu3(x)
        x = self.layer4(x)
        x = self.relu4(x)
        logits = self.layer5(x)
        return logits

net = Network()

数据集定义

def create_dataset(batch_size):
    """create dataset"""
    dataset_path = os.getenv("DATA_PATH")
    dataset = ds.MnistDataset(dataset_path)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

dataset = create_dataset(32)

优化器定义与封装

开启临终CKPT功能需要设置TFT优化器, 设置后可在梯度计算完成后,优化器更新前向MindIO TFT上报状态。TFT优化器用OptTFTWrapper来配置, 详情参见OptTFTWrapper

optimizer = nn.SGD(net.trainable_params(), 1e-2)
#配置TFT优化器
optimizer_wrapper = nn.OptTFTWrapper(optimizer)

创建loss函数并配置model对象

loss_fn = nn.CrossEntropyLoss()
net.set_train()
model = ms.Model(net,  optimizer=optimizer_wrapper)

Callback配置

开启临终CKPT功能需要设置 TFTRegister Callback对象,并传入参数来配置,详情参见TFTRegister

time_monitor = train.TimeMonitor(data_size=1)
loss_cb = train.LossMonitor(1)

# 设置TFT callback对象
tft_cb = train.TFTRegister(0, "127.0.0.1", 30051, "./ttp_checkpoints/")

续训配置

续训可从临终Chckpoint恢复,由于临终Checkpoint对于多个副本只会保存一份Checkpoint文件,因此需要查看生成的Checkpoint文件,并配置相应的Checkpoint文件进行续训。

init_epoch = 0

if bool(args_opt.is_recover):
    cur_epoch = 2 # 设置成异常保存的epoch值
    cur_step = 1215 # 设置成异常保存的step值
    ckpt_step = (cur_epoch - 1) * dataset.get_dataset_size() + cur_step
    if context.get_auto_parallel_context("parallel_mode") == "data_parallel":
        cur_rank = 0
        new_ckpt_file = f"./ttp_checkpoints/tft_saved_checkpoints-step_{ckpt_step}/rank_{cur_rank}/ttp_rank_{cur_rank}-{cur_epoch}_{cur_step}.ckpt"
    else:
        cur_rank = get_rank()
        ckpt_file = f"./ttp_checkpoints/tft_saved_checkpoints-step_{ckpt_step}/rank_{cur_rank}/ttp_rank_{cur_rank}-{cur_epoch}_{cur_step}.ckpt"
        strategy_file = f"./src_pipeline_strategy/src_strategy_{cur_rank}.ckpt"
        new_ckpt_file = get_ckpt_path_with_strategy(ckpt_file, strategy_file)
    param_dict = ms.load_checkpoint(new_ckpt_file)
    ms.load_param_into_net(net, param_dict)
    dataset.set_init_step(int(param_dict["step_num"]))
    init_epoch = int(param_dict["epoch_num"]) - 1

启动训练

model.train(5, dataset, callbacks=[time_monitor, loss_cb, tft_cb])

配置环境变量并启动训练

开启临终Checkpoint功能,需要设置环境变量 MS_ENABLE_TFT='{TTP:1}'。此外还需要设置环境变量 MINDIO_FOR_MINDSPORE=1, 使能 MindIO 适配 MindSpore。

使用 msrun 命令启动训练。

export MS_ENABLE_TFT='{TTP:1}'
export MINDIO_FOR_MINDSPORE=1
export DATA_PATH=${EXEC_PATH}/MNIST_DATA/train/

msrun --worker_num=4 --local_worker_num=4 --master_port=10970 --join=False --log_dir=msrun_log --cluster_time_out=300  mindio_ttp_case.py

异常注入

常见的异常注入为查看训练的进程,并直接杀掉相应的进程来检验是否有临终Checkpoint文件生成。 注意: 由于MindIo的Controller控制器默认在0卡启动,因此杀死rank0的进程并不会生成Checkpoint文件。

npu-smi info # 查看训练进程
kill -9 pid  # 杀死对应的训练进程

配置环境变量并恢复训练

export MS_ENABLE_TFT='{TTP:1}'
export MINDIO_FOR_MINDSPORE=1
export DATA_PATH=${EXEC_PATH}/MNIST_DATA/train/

msrun --worker_num=4 --local_worker_num=4 --master_port=10970 --join=False --log_dir=msrun_log --cluster_time_out=300  mindio_ttp_case.py --is_recover=1

临终Checkpoint文件生成说明

└─ sample_code
    ├─ mindio_ttp
       ├── ttp_checkpoints
           ├── tft_saved_checkpoints-step_{global_step}
               ├── rank_0
                   └── ttp_rank_0-{cur_epoch}_{cur_step}.ckpt
               ├── rank_1
                   └── ttp_rank_1-{cur_epoch}_{cur_step}.ckpt
    ...