Manually Parallelism

View Source On Gitee

Overview

In addition to the automatic and semi-automatic parallelism provided by MindSpore, users can manually slice the model to parallelize it on multiple nodes by encoding the parallel process based on communication primitives. In this manual parallel mode, the user needs to perceive graph slicing, operator slicing, and cluster topology to achieve optimal performance.

Basic Principle

MindSpore aggragation communication operators include AllReduce, AllGather, ReduceScatter, Broadcast, NeighborExchange, NeighborExchangeV2, and AlltoAll, which are the basic building blocks of aggragation communication in distributed training. The so-called aggragation communication refers to the data interaction between different model slices through aggragation communication operators after model slicing. Users can manually call these operators for data transfer to realize distributed training.

For a detailed description of the aggragation communication operator, see Distributed Set Communication Primitive.

Operation Practice

The following is an illustration of manual data parallel operation using an Ascend or GPU stand-alone 8-card as an example:

Example Code Description

Download the complete example code: manual_parallel.

The directory structure is as follows:

└─ sample_code
    ├─ manual_parallel
       ├── train.py
       └── run.sh
    ...

train.py is the script that defines the network structure and the training process. run.sh is the execution script.

Configuring a Distributed Environment

Initialize HCCL or NCCL communication with init and set the random seed. No parallel mode is specified here as it is manually parallelized. device_target is automatically specified as the backend hardware device corresponding to the MindSpore package. get_group_size() interface gets the number of devices in the current communication group, which is by default a global communication group containing all devices.

import mindspore as ms
from mindspore.communication import init, get_rank, get_group_size

ms.set_context(mode=ms.GRAPH_MODE)
init()
cur_rank = get_rank()
batch_size = 32
device_num = get_group_size()
shard_size = batch_size // device_num

Network Definition

The slices of the input data is added to the single-card network:

from mindspore import nn
from mindspore.communication import get_rank, get_group_size

class Network(nn.Cell):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layer1 = nn.Dense(28*28, 512)
        self.relu1 = nn.ReLU()
        self.layer2 = nn.Dense(512, 512)
        self.relu2 = nn.ReLU()
        self.layer3 = nn.Dense(512, 10)

    def construct(self, x):
        x = x[cur_rank*shard_size:cur_rank*shard_size + shard_size]
        x = self.flatten(x)
        x = self.layer1(x)
        x = self.relu1(x)
        x = self.layer2(x)
        x = self.relu2(x)
        logits = self.layer3(x)
        return logits

net = Network()

Loading the Dataset

Datasets are loaded in a manner consistent with a single-card network:

import os
import mindspore.dataset as ds

def create_dataset():
    dataset_path = os.getenv("DATA_PATH")
    dataset = ds.MnistDataset(dataset_path)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset()

Loss Function Definition

In the loss function, it is necessary to add a slice of the labels and the communication primitive operator ops.AllReduce to aggregate the losses of the cards:

from mindspore import nn, ops
from mindspore.communication import get_rank, get_group_size

class ReduceLoss(nn.Cell):
    def __init__(self):
        super().__init__()
        self.loss = nn.CrossEntropyLoss()
        self.all_reduce = ops.AllReduce()

    def construct(self, data, label):
        label = label[cur_rank*shard_size:cur_rank*shard_size + shard_size]
        loss_value = self.loss(data, label)
        loss_value = self.all_reduce(loss_value) / device_num
        return loss_value

loss_fn = ReduceLoss()

Training Process Definition

The optimizer, training process is consistent with a single-card network:

import mindspore as ms
from mindspore import nn, train

optimizer = nn.SGD(net.trainable_params(), 1e-2)
loss_cb = train.LossMonitor(20)
model = ms.Model(net, loss_fn=loss_fn, optimizer=optimizer)
model.train(10, data_set, callbacks=[loss_cb])

Running Stand-alone 8-card Script

Next, the corresponding script is called by the command. Take the mpirun startup method, the 8-card distributed training script as an example, and perform the distributed training:

bash run.sh

After training, the log files are saved to the log_output directory, and by setting the environment variable MS_DEV_SAVE_GRAPHS to 2 in train.py, you can print out the IR graphs of the compilation process, where some of the file directories are structured as follows:

└─ log_output
    └─ 1
        ├─ rank.0
        |   └─ stdout
        ├─ rank.1
        |   └─ stdout
        ...

The results on the Loss section are saved in log_output/1/rank.*/stdout, and the example is as below:

epoch: 1 step: 20, loss is 2.241283893585205
epoch: 1 step: 40, loss is 2.1842331886291504
epoch: 1 step: 60, loss is 2.0627782344818115
epoch: 1 step: 80, loss is 1.9561686515808105
epoch: 1 step: 100, loss is 1.8991656303405762
epoch: 1 step: 120, loss is 1.6239635944366455
epoch: 1 step: 140, loss is 1.465965747833252
epoch: 1 step: 160, loss is 1.3662006855010986
epoch: 1 step: 180, loss is 1.1562917232513428
epoch: 1 step: 200, loss is 1.116426944732666
...