参数服务器

查看源文件

概述

Parameter Server(参数服务器)是分布式训练中一种广泛使用的架构,相较于同步的AllReduce训练方法,Parameter Server具有更好的灵活性、可扩展性。具体来讲,参数服务器既支持同步SGD(Stochastic Gradient Descent,随机梯度下降),也支持异步SGD的训练算法;在扩展性上,将模型的计算与模型的更新分别部署在Worker和Server两类进程中,使得Worker和Server的资源可以独立地横向扩缩(新增或者删除Worker和Server资源);另外,在大规模数据中心的环境下,计算设备、网络以及存储经常会出现各种故障而导致部分节点异常,而在参数服务器的架构下,能够较为容易地处理此类故障而不会对训练中的任务产生影响。

参数服务器支持的硬件平台包括Ascend、GPU,不支持PyNative模式。

相关接口:

  1. mindspore.set_ps_context(enable_ps=True)开启Parameter Server训练模式。

    • 此接口需在mindspore.communication.init()之前调用。

    • 若没有调用此接口,下面的环境变量设置则不会生效。

    • 调用mindspore.reset_ps_context()可以关闭Parameter Server训练模式。

  2. 在本训练模式下,有以下两种调用接口方式以控制训练参数是否通过Parameter Server进行更新,并且可以控制参数初始化位置:

    • 通过mindspore.nn.Cell.set_param_ps()nn.Cell中所有权重递归设置。

    • 通过mindspore.Parameter.set_param_ps()mindspore.Parameter权重进行设置。

    • 被设置为通过Parameter Server更新的单个权重大小不得超过INT_MAX(2^31 - 1)字节。

    • 接口set_param_ps可接收一个bool型参数:init_in_server,表示该训练参数是否在Server端初始化,init_in_server默认值为False,表示在Worker上初始化该训练参数;当前仅支持EmbeddingLookup算子的训练参数embedding_table在Server端初始化,以解决超大shape的embedding_table在Worker上初始化导致内存不足的问题,该算子的target属性需要设置为’CPU’。在Server端初始化的训练参数将不再同步到Worker上,如果涉及到多Server训练并保存CheckPoint,则训练结束后每个Server均会保存一个CheckPoint。

  3. [可选配置]针对超大shape的embedding_table,由于设备上存放不下全量的embedding_table,可以配置EmbeddingLookup算子vocab_cache_size,用于开启Parameter Server训练模式下EmbeddingLookup的cache功能,该功能使用vocab_cache_size大小的embedding_table在设备上训练,全量embedding_table存储在Server,将下批次训练用到的embedding_table提前换入到cache上,当cache放不下时则将过期的embedding_table放回到Server,以达到提升训练性能的目的;训练结束后,可在Server上导出CheckPoint,保存训练后的全量embedding_table。Embedding cache支持sparse模式,需要将所有开启cache的EmbeddingLookup算子的sparse参数都设为True,sparse模式会对该算子输入的特征id做去重处理,以降低计算与通信量。详细网络训练脚本参考https://gitee.com/mindspore/models/tree/r2.2/official/recommend/Wide_and_Deep

Parameter Server模式暂时不支持控制流,因此在train.py中,需要将model = Model(network, loss_fn, optimizer, metrics={"Accuracy": Accuracy()}, amp_level="O2")修改为model = Model(network, loss_fn, optimizer, metrics={"Accuracy": Accuracy()}),将混合精度amp_level选项关闭,消除控制流的影响。

相关环境变量配置:

MindSpore通过读取环境变量,控制Parameter Server训练,环境变量包括以下选项(所有脚本中的MS_SCHED_HOSTMS_SCHED_PORT值需保持一致):

export MS_SERVER_NUM=1                # Server number
export MS_WORKER_NUM=1                # Worker number
export MS_SCHED_HOST=XXX.XXX.XXX.XXX  # Scheduler IP address
export MS_SCHED_PORT=XXXX             # Scheduler port
export MS_ROLE=MS_SCHED               # The role of this process: MS_SCHED represents the scheduler, MS_WORKER represents the worker, MS_PSERVER represents the Server

更多详细说明请查看动态组网环境变量

基本原理

MindSpore的参数服务器采用了自研的通信框架作为基础架构,基于该框架提供的远程通信能力以及抽象的Send/Broadcast等原语,实现了同步SGD的分布式训练算法,另外结合Ascend和GPU中的高性能集合通信库(HCCL和NCCL),MindSpore还提供了Parameter Server和AllReduce的混合训练模式,支持将部分权重通过参数服务器进行存储和更新,其余权重仍然通过AllReduce算法进行训练。

在参数服务器的架构设计中,一共包含三个独立的组件,分别是Server、Worker和Scheduler,作用分别是:

  • Server:保存模型的权重和反向计算的梯度值,并使用优化器通过Worker上传的梯度值对模型进行更新。

  • Worker:执行网络的正反向计算,反向计算的梯度值通过Push接口上传至Server中,通过Pull接口把Server更新好的模型下载到Worker本地。

  • Scheduler:用于建立Server和Worker的通信关系。

操作实践

参数服务器支持GPU和Ascend,下面以Ascend为例进行操作说明:

样例代码说明

下载完整的样例代码:parameter_server

目录结构如下:

└─ sample_code
    ├─ parameter_server
       ├── train.py
       └── run.sh
    ...

其中,train.py是定义网络结构和训练过程的脚本。run.sh是执行脚本。

配置分布式环境

通过context接口指定运行模式、运行设备、运行卡号等,与单卡脚本不同,并行脚本还需指定并行模式parallel_mode,使能enable_ps开启参数服务器训练模式,并通过init初始化HCCL或NCCL通信。device_target会自动指定为MindSpore包对应的后端硬件设备。

import mindspore as ms
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(full_batch=True, parallel_mode=ms.ParallelMode.AUTO_PARALLEL)
ms.set_ps_context(enable_ps=True)
init()
ms.set_seed(1)
  • full_batch:是否全量导入数据集,为True时表示全量导入,每卡的数据相同,在多Worker场景中必须设置为True

  • parallel_mode:并行模式,多Worker场景需要开启自动并行模式,设置parallel_mode=ParallelMode.AUTO_PARALLEL

网络定义

参数服务器模式的网络定义是在单卡模式的基础上配置net.set_param_ps():

from mindspore import nn

class Network(nn.Cell):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Dense(28*28, 10, weight_init="normal", bias_init="zeros")
        self.relu = nn.ReLU()
        self.fc2 = nn.Dense(10, 1, weight_init="normal", bias_init="zeros")

    def construct(self, x):
        x = self.flatten(x)
        logits = self.fc2(self.relu(self.fc1(x)))
        return logits

net = Network()
net.set_param_ps()

数据集加载

数据集加载方式与单卡模型一致,代码如下:

import os
import mindspore.dataset as ds

def create_dataset(batch_size):
    dataset_path = os.getenv("DATA_PATH")
    dataset = ds.MnistDataset(dataset_path)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset(32)

训练网络

在这一部分,定义优化器,损失函数和训练网络,此处采用函数式写法来定义网络,代码与单卡模式一致:

import mindspore as ms
from mindspore import nn

optimizer = nn.SGD(net.trainable_params(), 1e-2)
loss_fn = nn.MSELoss()

def forward_fn(data, target):
    logits = net(data)
    loss = loss_fn(logits, target)
    return loss, logits

grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)

@ms.jit
def train_step(inputs, targets):
    (loss_value, _), grads = grad_fn(inputs, targets)
    optimizer(grads)
    return loss_value

for epoch in range(10):
    i = 0
    for image, label in data_set:
        loss_output = train_step(image, label)
        if i % 10 == 0:
            print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output))
        i += 1

运行单机8卡脚本

接下来通过命令调用对应的脚本,以8卡的分布式训练脚本为例,进行分布式训练,Scheduler、Server和Worker三个角色分别启动对应数量的进程。命令如下:

EXEC_PATH=$(pwd)

if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
    if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
        wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
    fi
    unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/

rm -rf output
mkdir output

# run Scheduler process
export MS_SERVER_NUM=8
export MS_WORKER_NUM=8
export MS_SCHED_HOST=127.0.0.1
export MS_SCHED_PORT=8118
export MS_ROLE=MS_SCHED
python train.py > output/scheduler.log 2>&1 &

# run Server processes
export MS_SERVER_NUM=8
export MS_WORKER_NUM=8
export MS_SCHED_HOST=127.0.0.1
export MS_SCHED_PORT=8118
export MS_ROLE=MS_PSERVER
for((server_id=0;server_id<${MS_SERVER_NUM};server_id++))
do
    python train.py > output/server_${server_id}.log 2>&1 &
done

# run Wroker processes
export MS_SERVER_NUM=8
export MS_WORKER_NUM=8
export MS_SCHED_HOST=127.0.0.1
export MS_SCHED_PORT=8118
export MS_ROLE=MS_WORKER
for((worker_id=0;worker_id<${MS_WORKER_NUM};worker_id++))
do
    python train.py > output/worker_${worker_id}.log 2>&1 &
done

或者直接执行:

bash run.sh

每个进程的输出结果保存在output文件夹中,可以在output/scheduler.log中查看Server与Worker通信日志:

...
Assign rank id of node id: 2fa9d1ab-10b8-4a61-9acf-217a04439287, role: MS_WORKER, with host ip: 127.0.0.1, old rank id: 6, new rank id: 0
...
Assign rank id of node id: 02fb1169-edc3-465e-b307-ccaf62d1f0b3, role: MS_PSERVER, with host ip: 127.0.0.1, old rank id: 4, new rank id: 0
...
Cluster is successfully initialized.

训练结果保存在output/worker_0.log中,示例如下:

epoch: 0, step: 0, loss is 26.743706
epoch: 0, step: 10, loss is 17.507723
epoch: 0, step: 20, loss is 9.616591
epoch: 0, step: 30, loss is 8.589715
epoch: 0, step: 40, loss is 8.23479
epoch: 0, step: 50, loss is 10.431321
epoch: 0, step: 60, loss is 7.7080607
epoch: 0, step: 70, loss is 8.599786
epoch: 0, step: 80, loss is 7.669814
epoch: 0, step: 90, loss is 8.584343
epoch: 0, step: 100, loss is 8.803712