Data Parallel

View Source On Gitee

Overview

Data parallel is the most commonly used parallel training approach for accelerating model training and handling large-scale datasets. In data parallel mode, the training data is divided into multiple copies and then each copy is assigned to a different compute node, such as multiple cards or multiple devices. Each node processes its own subset of data independently and uses the same model for forward and backward propagation, and ultimately performs model parameter updates after synchronizing the gradients of all nodes.

The following is an illustration of data parallel operation using the Ascend single-machine 8-card as an example:

Sample Code Description

The directory structure is as follows:

└─ sample_code
    ├─ distributed_data_parallel
       ├── distributed_data_parallel.py
       └── run.sh
    ...

Among them, distributed_data_parallel.py is the script that defines the network structure and training process. run.sh is the execution script.

Configuring Distributed Environments

The context interface allows you to specify the run mode, run device, run card number. Unlike single-card scripts, parallel scripts also need to specify the parallel mode parallel_mode for data parallel mode and initialize HCCL, NCCL or MCCL communication through init according to different device targets. In data parallel mode, you also can set gradients_mean to specify the gradient aggregation method. If device_target is not set here, it is automatically specified as the backend hardware device corresponding to the MindSpore package.

import mindspore as ms
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True)
init()
ms.set_seed(1)

gradients_mean=True is for the fact that during the backward computation, the framework internally aggregates the gradient values of the data parallel parameters scattered across multiple machines to get the global gradient values before passing them into the optimizer for updating. Framework will do the reduce sum for gradient by using AllReduce(op=ReduceOp.SUM), then calculate the mean value in terms of the value of gradients_mean. (If it is set to true, the mean value is calculated, otherwise, the mean value is not calculated. Default: False).

Loading Datasets

The biggest difference between the data parallel mode and other modes is the different data loading way. The data is imported in a parallel way. Below we take the MNIST dataset as an example to introduce the method of importing MNIST dataset in data parallel mode. dataset_path is the path of the dataset.

import mindspore.dataset as ds
from mindspore.communication import get_rank, get_group_size

rank_id = get_rank()
rank_size = get_group_size()
dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id)

Unlike single-card, the num_shards and shard_id parameters need to be passed in the dataset interface, corresponding to the number of cards and the logical serial number, respectively, and it is recommended to obtain them through the mindspore.communication interface:

  • get_rank: Obtain the ID of the current device in the cluster.

  • get_group_size: Obtain the number of clusters.

When loading datasets for data parallel scenarios, it is recommended that the same dataset file be specified for each card. If different datasets are loaded for each card, the computational accuracy may be affected.

The complete data processing code:

import os
import mindspore.dataset as ds
from mindspore.communication import get_rank, get_group_size

def create_dataset(batch_size):
    dataset_path = os.getenv("DATA_PATH")
    rank_id = get_rank()
    rank_size = get_group_size()
    dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset(32)

Defining Network

In data parallel mode, the network is defined in the same way as single-card network, and the main structure of the network is as follows:

from mindspore import nn

class Network(nn.Cell):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.dense_relu_sequential = nn.SequentialCell(
            nn.Dense(28*28, 512, weight_init="normal", bias_init="zeros"),
            nn.ReLU(),
            nn.Dense(512, 512, weight_init="normal", bias_init="zeros"),
            nn.ReLU(),
            nn.Dense(512, 10, weight_init="normal", bias_init="zeros")
        )

    def construct(self, x):
        x = self.flatten(x)
        logits = self.dense_relu_sequential(x)
        return logits

net = Network()

Training Network

In this step, we need to define the loss function, the optimizer, and the training process. The difference with single-card model is that the data parallel mode also requires the addition of the mindspore.nn.DistributedGradReducer() interface to aggregate the gradients of all cards. The first parameter of the network is the network parameter to be updated:

from mindspore import nn
import mindspore as ms

loss_fn = nn.CrossEntropyLoss()
optimizer = nn.SGD(net.trainable_params(), 1e-2)

def forward_fn(data, label):
    logits = net(data)
    loss = loss_fn(logits, label)
    return loss, logits

grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)
grad_reducer = nn.DistributedGradReducer(optimizer.parameters)

for epoch in range(10):
    i = 0
    for data, label in data_set:
        (loss, _), grads = grad_fn(data, label)
        grads = grad_reducer(grads)
        optimizer(grads)
        if i % 10 == 0:
            print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss))
        i += 1

This can also be trained using Model.train.

Running Single-machine Eight-card Script

Next, the corresponding scripts are invoked by commands, using the mpirun startup method and the 8-card distributed training script as an example of distributed training:

bash run.sh

After training, the log files are saved to the log_output directory, and the part of the file directory structure is as follows:

└─ log_output
    └─ 1
        ├─ rank.0
        |   └─ stdout
        ├─ rank.1
        |   └─ stdout
...

The part results of the Loss are saved in log_output/1/rank.*/stdout. The example is as follows:

epoch: 0 step: 0, loss is 2.3084016
epoch: 0 step: 10, loss is 2.3107638
epoch: 0 step: 20, loss is 2.2864391
epoch: 0 step: 30, loss is 2.2938071
...

Other startup methods such as dynamic network and rank table startup can be found in startup methods.