Data Parallel
Overview
Data parallel is the most commonly used parallel training approach for accelerating model training and handling large-scale datasets. In data parallel mode, the training data is divided into multiple copies and then each copy is assigned to a different compute node, such as multiple cards or multiple devices. Each node processes its own subset of data independently and uses the same model for forward and backward propagation, and ultimately performs model parameter updates after synchronizing the gradients of all nodes.
Hardware platforms supported for data parallelism include Ascend, GPU and CPU, in addition to both PyNative and Graph modes.
Related interfaces are as follows:
mindspore.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL)
: Set the data parallel mode.mindspore.nn.DistributedGradReducer()
: Perform multi-card gradient aggregation.
Overall Process
Environmental dependencies
Before starting parallel training, the communication resources are initialized by calling the
mindspore.communication.init
interface and the global communication groupWORLD_COMM_GROUP
is automatically created. The communication group enables communication operators to distribute messages between cards and machines, and the global communication group is the largest one, including all devices in current training. The current mode is set to data parallel mode by callingmindspore.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL)
.Data distribution
The core of data parallel lies in splitting the dataset in sample dimensions and sending it down to different cards. In all dataset loading interfaces provided by the
mindspore.dataset
module, there arenum_shards
andshard_id
parameters which are used to split the dataset into multiple copies and cycle through the samples in a way that collectsbatch
data to their respective cards, and will start from the beginning when there is a shortage of data.Network composition
The data parallel network is written in a way that does not differ from the single-card network, due to the fact that during forward propagation & backward propagation the models of each card are executed independently from each other, only the same network structure is maintained. The only thing we need to pay special attention to is that in order to ensure the training synchronization between cards, the corresponding network parameter initialization values should be the same. In
DATA_PARALLEL
mode, we can usemindspore.set_seed
to set the seed or enableparameter_broadcast
inmindspore.set_auto_parallel_context
to achieve the same initialization of weights between multiple cards.Gradient aggregation
Data parallel should theoretically achieve the same training effect as the single-card machine. In order to ensure the consistency of the computational logic, the gradient aggregation operation between cards is realized by calling the
mindspore.nn.DistributedGradReducer()
interface, which automatically inserts theAllReduce
operator after the gradient computation is completed.DistributedGradReducer()
provides themean
switch, which allows the user to choose whether to perform an average operation on the summed gradient values, or to treat them as hyperparameters.Parameter update
Because of the introduction of the gradient aggregation operation, the models of each card will enter the parameter update step together with the same gradient values.
Operation Practice
The following is an illustration of data parallel operation using the Ascend or GPU single-machine 8-card as an example:
Sample Code Description
You can download the full sample code here:
https://gitee.com/mindspore/docs/tree/r2.4.1/docs/sample_code/distributed_data_parallel.
The directory structure is as follows:
└─ sample_code
├─ distributed_data_parallel
├── distributed_data_parallel.py
└── run.sh
...
Among them, distributed_data_parallel.py
is the script that defines the network structure and training process. run.sh
is the execution script.
Configuring Distributed Environments
The context interface allows you to specify the run mode, run device, run card number. Unlike single-card scripts, parallel scripts also need to specify the parallel mode parallel_mode
for data parallel mode and initialize HCCL, NCCL or MCCL communication through init according to different device targets. In data parallel mode, you also can set gradients_mean
to specify the gradient aggregation method. If device_target
is not set here, it is automatically specified as the backend hardware device corresponding to the MindSpore package.
import mindspore as ms
from mindspore.communication import init
ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True)
init()
ms.set_seed(1)
gradients_mean=True
is for the fact that during the backward computation, the framework internally aggregates the gradient values of the data parallel parameters scattered across multiple machines to get the global gradient values before passing them into the optimizer for updating. Framework will do the reduce sum for gradient by using AllReduce(op=ReduceOp.SUM)
, then calculate the mean value in terms of the value of gradients_mean
. (If it is set to true, the mean value is calculated, otherwise, the mean value is not calculated. Default: False).
Data Parallel Mode Loads Datasets
The biggest difference between the data parallel mode and other modes is the different data loading way. The data is imported in a parallel way. Below we take the MNIST dataset as an example to introduce the method of importing MNIST dataset in data parallel mode. dataset_path
is the path of the dataset.
import mindspore.dataset as ds
from mindspore.communication import get_rank, get_group_size
rank_id = get_rank()
rank_size = get_group_size()
dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id)
Unlike single-card, the num_shards
and shard_id
parameters need to be passed in the dataset interface, corresponding to the number of cards and the logical serial number, respectively, and it is recommended to obtain them through the mindspore.communication
interface:
get_rank
: Obtain the ID of the current device in the cluster.get_group_size
: Obtain the number of clusters.
When loading datasets for data parallel scenarios, it is recommended that the same dataset file be specified for each card. If different datasets are loaded for each card, the computational accuracy may be affected.
The complete data processing code:
import os
import mindspore.dataset as ds
from mindspore.communication import get_rank, get_group_size
def create_dataset(batch_size):
dataset_path = os.getenv("DATA_PATH")
rank_id = get_rank()
rank_size = get_group_size()
dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id)
image_transforms = [
ds.vision.Rescale(1.0 / 255.0, 0),
ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
ds.vision.HWC2CHW()
]
label_transform = ds.transforms.TypeCast(ms.int32)
dataset = dataset.map(image_transforms, 'image')
dataset = dataset.map(label_transform, 'label')
dataset = dataset.batch(batch_size)
return dataset
data_set = create_dataset(32)
Defining Network
In data parallel mode, the network is defined in the same way as single-card network, and the main structure of the network is as follows:
from mindspore import nn
class Network(nn.Cell):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.dense_relu_sequential = nn.SequentialCell(
nn.Dense(28*28, 512, weight_init="normal", bias_init="zeros"),
nn.ReLU(),
nn.Dense(512, 512, weight_init="normal", bias_init="zeros"),
nn.ReLU(),
nn.Dense(512, 10, weight_init="normal", bias_init="zeros")
)
def construct(self, x):
x = self.flatten(x)
logits = self.dense_relu_sequential(x)
return logits
net = Network()
Training Network
In this step, we need to define the loss function, the optimizer, and the training process. The difference with single-card model is that the data parallel mode also requires the addition of the mindspore.nn.DistributedGradReducer()
interface to aggregate the gradients of all cards. The first parameter of the network is the network parameter to be updated:
from mindspore import nn
import mindspore as ms
loss_fn = nn.CrossEntropyLoss()
optimizer = nn.SGD(net.trainable_params(), 1e-2)
def forward_fn(data, label):
logits = net(data)
loss = loss_fn(logits, label)
return loss, logits
grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)
grad_reducer = nn.DistributedGradReducer(optimizer.parameters)
for epoch in range(10):
i = 0
for data, label in data_set:
(loss, _), grads = grad_fn(data, label)
grads = grad_reducer(grads)
optimizer(grads)
if i % 10 == 0:
print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss))
i += 1
This can also be trained using Model.train.
Running Single-machine Eight-card Script
Next, the corresponding scripts are invoked by commands, using the mpirun
startup method and the 8-card distributed training script as an example of distributed training:
bash run.sh
After training, the log files are saved to the log_output
directory, and the part of the file directory structure is as follows:
└─ log_output
└─ 1
├─ rank.0
| └─ stdout
├─ rank.1
| └─ stdout
...
The part results of the Loss are saved in log_output/1/rank.*/stdout
. The example is as follows:
epoch: 0 step: 0, loss is 2.3084016
epoch: 0 step: 10, loss is 2.3107638
epoch: 0 step: 20, loss is 2.2864391
epoch: 0 step: 30, loss is 2.2938071
...
Other startup methods such as dynamic network and rank table
startup can be found in startup methods.