Distributed Training
Overview
MindSpore supports DATA_PARALLEL
and AUTO_PARALLEL
. Automatic parallel is a distributed parallel mode that integrates data parallel, model parallel, and hybrid parallel. It can automatically establish cost models and select a parallel mode for users.
Among them:
Data parallel: A parallel mode for dividing data in batches.
Layerwise parallel: A parallel mode for dividing parameters by channel.
Hybrid parallel: A parallel mode that covers both data parallel and model parallel.
Cost model: A cost model built based on the memory computing cost and communication cost, for which an efficient algorithm is designed to find the parallel strategy with the shorter training time.
In this tutorial, we will learn how to train the ResNet-50 network in DATA_PARALLEL
or AUTO_PARALLEL
mode on MindSpore.
For sample code, please see at
The current sample is for the Ascend AI processor.
Preparations
Configuring Distributed Environment Variables
When distributed training is performed in the lab environment, you need to configure the networking information file for the current multi-card environment. If HUAWEI CLOUD is used, skip this section.
The Ascend 910 AI processor and 1980 AIServer are used as an example. The JSON configuration file of a two-card environment is as follows. In this example, the configuration file is named rank_table.json.
{
"board_id": "0x0000",
"chip_info": "910",
"deploy_mode": "lab",
"group_count": "1",
"group_list": [
{
"device_num": "2",
"server_num": "1",
"group_name": "",
"instance_count": "2",
"instance_list": [
{"devices":[{"device_id":"0","device_ip":"192.1.27.6"}],"rank_id":"0","server_id":"10.*.*.*"},
{"devices":[{"device_id":"1","device_ip":"192.2.27.6"}],"rank_id":"1","server_id":"10.*.*.*"}
]
}
],
"para_plane_nic_location": "device",
"para_plane_nic_name": [
"eth0", "eth1"
],
"para_plane_nic_num": "2",
"status": "completed"
}
The following parameters need to be modified based on the actual training environment:
server_num
indicates the number of hosts, andserver_id
indicates the IP address of the local host.device_num
,para_plane_nic_num
, andinstance_count
indicate the number of cards.rank_id
indicates the logical sequence number of a card, which starts from 0 fixedly.device_id
indicates the physical sequence number of a card, that is, the actual sequence number of the host where the card is located.device_ip
indicates the IP address of the NIC. You can run thecat /etc/hccn.conf
command on the current host to obtain the IP address of the NIC.para_plane_nic_name
indicates the name of the corresponding NIC.
After the networking information file is ready, add the file path to the environment variable MINDSPORE_HCCL_CONFIG_PATH
. In addition, the device_id
information needs to be transferred to the script. In this example, the information is transferred by configuring the environment variable DEVICE_ID.
export MINDSPORE_HCCL_CONFIG_PATH="./rank_table.json"
export DEVICE_ID=0
Invoking the Collective Communication Library
You need to enable the distributed API enable_hccl
in the context.set_context()
API, set the device_id
parameter, and invoke init()
to complete the initialization operation.
In the sample, the graph mode is used during runtime. On the Ascend AI processor, Huawei Collective Communication Library (HCCL) is used.
import os
from mindspore import context
from mindspore.communication.management import init
if __name__ == "__main__":
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend", enable_hccl=True, device_id=int(os.environ["DEVICE_ID"]))
init()
...
mindspore.communication.management
encapsulates the collective communication API provided by the HCCL to help users obtain distributed information. The common types include get_rank
and get_group_size
, which correspond to the ID of the current card in the cluster and the number of cards, respectively.
HCCL implements multi-device multi-card communication based on the Da Vinci architecture chip. The restrictions on using the distributed service are as follows:
In a single-node system, a cluster of 1, 2, 4, or 8 cards is supported. In a multi-node system, a cluster of 8 x N cards is supported.
Each server has four NICs (numbered 0 to 3) and four NICs (numbered 4 to 7) deployed on two different networks. During training of two or four cards, the NICs must be connected and clusters cannot be created across networks.
The operating system needs to use the symmetric multiprocessing (SMP) mode.
Loading Datasets
During distributed training, data is imported in data parallel mode. The following uses Cifar10Dataset as an example to describe how to import the CIFAR-10 data set in parallel mode, data_path
is the path of the dataset.
Different from a single-node system, the multi-node system needs to transfer num_shards
and shard_id
parameters to the dataset API, which correspond to the number of cards and logical sequence number of the NIC, respectively. You are advised to obtain the parameters through the HCCL API.
import mindspore.common.dtype as mstype
import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as C
import mindspore.dataset.transforms.vision.c_transforms as vision
from mindspore.communication.management import get_rank, get_group_size
def create_dataset(repeat_num=1, batch_size=32, rank_id=0, rank_size=1):
resize_height = 224
resize_width = 224
rescale = 1.0 / 255.0
shift = 0.0
# get rank_id and rank_size
rank_id = get_rank()
rank_size = get_group_size()
data_set = ds.Cifar10Dataset(data_path, num_shards=rank_size, shard_id=rank_id)
# define map operations
random_crop_op = vision.RandomCrop((32, 32), (4, 4, 4, 4))
random_horizontal_op = vision.RandomHorizontalFlip()
resize_op = vision.Resize((resize_height, resize_width))
rescale_op = vision.Rescale(rescale, shift)
normalize_op = vision.Normalize((0.4465, 0.4822, 0.4914), (0.2010, 0.1994, 0.2023))
changeswap_op = vision.HWC2CHW()
type_cast_op = C.TypeCast(mstype.int32)
c_trans = [random_crop_op, random_horizontal_op]
c_trans += [resize_op, rescale_op, normalize_op, changeswap_op]
# apply map operations on images
data_set = data_set.map(input_columns="label", operations=type_cast_op)
data_set = data_set.map(input_columns="image", operations=c_trans)
# apply repeat operations
data_set = data_set.repeat(repeat_num)
# apply shuffle operations
data_set = data_set.shuffle(buffer_size=10)
# apply batch operations
data_set = data_set.batch(batch_size=batch_size, drop_remainder=True)
return data_set
Defining the Network
In DATA_PARALLEL
and AUTO_PARALLEL
modes, the network definition mode is the same as that of a single-node system. For sample code, see at
https://gitee.com/mindspore/docs/blob/r0.1/tutorials/tutorial_code/resnet/resnet.py.
Defining the Loss Function and Optimizer
Defining the Loss Function
In the Loss function, the SoftmaxCrossEntropyWithLogits is expanded into multiple small operators for implementation according to a mathematical formula.
Compared with fusion loss, the loss in AUTO_PARALLEL
mode searches and finds optimal parallel strategy by operator according to an algorithm.
from mindspore.ops import operations as P
from mindspore import Tensor
import mindspore.ops.functional as F
import mindspore.common.dtype as mstype
import mindspore.nn as nn
class SoftmaxCrossEntropyExpand(nn.Cell):
def __init__(self, sparse=False):
super(SoftmaxCrossEntropyExpand, self).__init__()
self.exp = P.Exp()
self.sum = P.ReduceSum(keep_dims=True)
self.onehot = P.OneHot()
self.on_value = Tensor(1.0, mstype.float32)
self.off_value = Tensor(0.0, mstype.float32)
self.div = P.Div()
self.log = P.Log()
self.sum_cross_entropy = P.ReduceSum(keep_dims=False)
self.mul = P.Mul()
self.mul2 = P.Mul()
self.mean = P.ReduceMean(keep_dims=False)
self.sparse = sparse
self.max = P.ReduceMax(keep_dims=True)
self.sub = P.Sub()
def construct(self, logit, label):
logit_max = self.max(logit, -1)
exp = self.exp(self.sub(logit, logit_max))
exp_sum = self.sum(exp, -1)
softmax_result = self.div(exp, exp_sum)
if self.sparse:
label = self.onehot(label, F.shape(logit)[1], self.on_value, self.off_value)
softmax_result_log = self.log(softmax_result)
loss = self.sum_cross_entropy((self.mul(softmax_result_log, label)), -1)
loss = self.mul2(F.scalar_to_array(-1.0), loss)
loss = self.mean(loss, -1)
return loss
Defining the Optimizer
The Momentum
optimizer is used as the parameter update tool. The definition is the same as that of a single-node system.
from mindspore.nn.optim.momentum import Momentum
lr = 0.01
momentum = 0.9
opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr, momentum)
Training the Network
context.set_auto_parallel_context()
is an API provided for users to set parallel parameters. The parameters are as follows:
parallel_mode
: distributed parallel mode. The options areParallelMode.DATA_PARALLEL
andParallelMode.AUTO_PARALLEL
.mirror_mean
: During backward computation, the framework collects gradients of parameters in data parallel mode across multiple machines, obtains the global gradient value, and transfers the global gradient value to the optimizer for update.
The value True indicates the allreduce_mean
operation that would be applied, and the value False indicates the allreduce_sum
operation that would be applied.
In the following example, the parallel mode is set to AUTO_PARALLEL
. dataset_sink_mode=False
indicates that the non-sink mode is used. LossMonitor
can return the loss value through the callback function.
from mindspore.nn.optim.momentum import Momentum
from mindspore.train.callback import LossMonitor
from mindspore.train.model import Model, ParallelMode
from resnet import resnet50
def test_train_cifar(num_classes=10, epoch_size=10):
context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, mirror_mean=True)
loss_cb = LossMonitor()
dataset = create_dataset(epoch_size)
net = resnet50(32, num_classes)
loss = SoftmaxCrossEntropyExpand(sparse=True)
opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)
model = Model(net, loss_fn=loss, optimizer=opt)
model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=False)
Running Test Cases
Currently, MindSpore distributed execution uses the single-card single-process running mode. The number of processes must be the same as the number of used cards. Each single-process will create a folder to save log and building information. The following is an example of a running script for two-card distributed training:
#!/bin/bash
export MINDSPORE_HCCL_CONFIG_PATH=./rank_table.json
export RANK_SIZE=2
for((i=0;i<$RANK_SIZE;i++))
do
mkdir device$i
cp ./resnet50_distributed_training.py ./device$i
cd ./device$i
export RANK_ID=$i
export DEVICE_ID=$i
echo "start training for device $i"
env > env$i.log
pytest -s -v ./resnet50_distributed_training.py > log$i 2>&1 &
cd ../
done