Functional Operator Shading

View Source On Gitee

Overview

Pynative mode support richer syntax and more flexible use, but currently MindSpore pynative mode does not support the various features of automatic parallel. In addation, static graph mode currently only support different parallel modes by configuring policies for operators.

Aiming at these problems, We design a functional operator function shard. Different from the existing Primitive.shard() method, this function sets the parallel policy for cell or function.

In pynative mode, this function specify a part of the graph mode execution, and perform various parallel operations. shard function can also be used in graph mode to specify the sharding_propagation policy of a module. Other unspecified modules can automatically configure the SHARDING_Propagation policy through the policy propagation.

Currently functional operator sharding supports only "auto_parallel" and "semi_auto_parallel" and automatically sets the policy search algorithm to "sharding_propagation".

Related interfaces:

def shard(fn, in_strategy, out_strategy=None, parameter_plan=None, device="Ascend", level=0):
    return shard_fn(fn, in_strategy, out_strategy, device, level)

in_strategy(tuple): Specify the sharding strategy of the input Tensor, each element is a tuple indicating the sharding strategy of the corresponding input Tensor, the length of each tuple should be equal to the dimension of the corresponding Tensor, indicating how each dimension is sliced. You can pass in tuple(int) or mindspore.Layout as each element int the tuple.

out_strategy(None, tuple): Specify the sharding strategy for the output Tensor, used in the same way as in_strategy, with a default value of None, which is not yet enabled and will be opened later. In deep learning models, the output strategy is replaced with data parallel (False) and repeated computation (True) based on the value of full_batch in set_auto_parallel_context.

parameter_plan(None, dict): Specify the sharding strategy of each parameter, when passed into the dictionary, the key is the parameter name of type str, the value is a one-dimensional tuple(int) or a one-dimensional tuple(mindspore.Layout), indicating the corresponding sharding strategy. If the parameter name is wrong or the corresponding parameter has already set the sharding strategy, the setting of this parameter will be skipped. Default: None, means not set.

device(string): Specify the device for execution, optional range Ascend, GPU and CPU, default is Ascend. This is a reserved parameter and is not used currently.

level(int): Specify the search strategy for all operators, the cut strategy for input and output Tensor is specified by the user, and the cut strategy for the rest of operators will be obtained by the framework search, this parameter specifies the objective function for searching, with the optional range of 0, 1, 2, which represent maximizing the computational communication ratio, minimizing memory consumption, and maximizing the operation speed. The default is 0. This is a reserved parameter and is not used currently.

Basic Principle

In the MindSpore pynative mode, you can use the @jit decorator to specify a certain section to be compiled and executed in graph mode. In the forward execution at the same time, execute while the operators, subgraphs will be recorded, the forward execution is complete, will be automatically differentiated to get the whole graph to get the inverse graph. The specific process is shown in the following figure:

Figure 1: Schematic diagram of the execution of the @jit decorator

The Shard function follows this pattern, with the difference that operator-level model parallel can be performed at the link where the graph pattern is compiled and executed.

In MindSpore's graph mode, the Shard function is similar to Primitive.shard() to set a distribution layout for modules input and its parameters.

Operation Practice

Example Code Description

The directory structure is as follows:

└─sample_code
    ├─shard_function_parallel
        ├── rank_table_8pcs.json
        ├── run_shard_function_example.sh
        ├── run_mpirun_shard_function_example.sh
        └── shard_function_example.py

The function of each file is as follows:

  • shard_function_example.py: Sample code for the shard function, which describes how to use the shard function to specify part of the code to be executed in parallel.

  • rank_table_8pcs.json: 8-card profile for RANK_TABLE_FILE.

  • run_shard_function_example.sh: Startup script for shard function example.

  • run_mpirun_shard_function_example.sh: shard function example startup script launched with mpirun.

Importing the Relevant Packages and Setting the Execution Mode

As mentioned above, the shard function supports graph mode and PyNative mode. The following takes PyNative mode as an example:

import mindspore as ms
from mindspore.communication import init

ms.set_context(mode=ms.PYNATIVE_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.AUTO_PARALLEL,
                             search_mode="sharding_propagation", device_num=8)
init()
ms.set_seed(1)

Specifying Output Arrangement

Specifying the output arrangement for data parallel and repeated calculations is currently supported, and can be controlled by the dataset_strategy or full_batch, which are set as follows:

# Set via dataset_strategy, recommended
ms.set_auto_parallel_context(dataset_strategy="full_batch")  # The dataset is not sliced and the output tensor of the shard is not sliced; (default configuration)
ms.set_auto_parallel_context(dataset_strategy="data_parallel")  # The dataset is sliced in a data-parallel fashion, and the output tensor of the shard is also sliced in a data-parallel fashion

# This attribute is about to be deprecated through the full_batch setting
ms.set_auto_parallel_context(full_batch=True)   # The dataset is not sliced and the output tensor of the shard is not sliced; (default configuration)
ms.set_auto_parallel_context(full_batch=False)  # The dataset is sliced in a data-parallel fashion, and the output tensor of the shard is also sliced in a data-parallel fashion

Cell Uses Functional Sharding

There are currently two ways to use the shard function, using the following network as an example to introduce the use of the shard function.

import mindspore.nn as nn
class BasicBlock(nn.Cell):
    def __init__(self):
        super(BasicBlock, self).__init__()
        self.dense1 = nn.Dense(128, 256)
        self.gelu = nn.GELU()
        self.dense2 = nn.Dense(256, 128)
    def construct(self, x):
        x = self.dense1(x)
        x = self.gelu(x)
        x = self.dense2(x)
        return x

class Net(nn.Cell):
    def __init__(self):
        super(Net, self).__init__()
        self.block1 = BasicBlock()
        self.block2 = BasicBlock()
        self.block3 = BasicBlock()
    def construct(self, x):
        # Here all blocks are executed in PyNative mode
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        return x
  • Self-call via Cell member method 'shard', returns distributed function fn.

    class Net1(Net):
        def __init__(self):
            super(Net1, self).__init__()
            self.flatten = nn.Flatten()
            self.layer1 = nn.Dense(28*28, 128)
            self.layer2 = nn.Dense(128, 10)
            # Slicing along the second dimension of the input makes the output into data parallel arrangement
            self.block1_shard = self.block1.shard(in_strategy=((1, 8),),
                                                  parameter_plan={'self.block1.dense2.weight': (8, 1)})
    
        def construct(self, x):
            x = self.flatten(x)
            x = self.layer1(x)
            # block1 is executed in graph mode
            x = self.block1_shard(x)
            # block2 and block3 are executed in PyNative mode
            x = self.block2(x)
            x = self.block3(x)
            x = self.layer2(x)
            return x
    
    net = Net1()
    
  • Using the functional interface mindspore.shard. Since the return value of the shard function is a function, you can't assign an instance of a class that has already been instantiated to the return value of shard when using the functional interface, because MindSpore doesn't support assigning class instances to other types.

    class NetError(Net):
        def __init__(self):
            self.block1 = ms.shard(self.block1, in_strategy=((8, 1),),
                                    parameter_plan={'self.block1.dense2.weight': (8, 1)})
    
        def construct(self, x):
            x = self.block1(x)
            x = self.block2(x)
            x = self.block3(x)
            return x
    

    Such an execution will encounter an error:

    TypeError: For 'Cell', the type of block1 should be cell, but got function.
    

    The right usage is as follows:

    class Net2(Net):
        def __init__(self):
            # Set the return value of the Cell instance after passing it through ms.shard to a different name
            self.block1_shard = ms.shard(self.block1, in_strategy=((8, 1),),
                                         parameter_plan={'self.block1.dense2.weight': (8, 1)})
            self.block2_shard = self.block2.shard(in_strategy=((1, 8),))
    
        def construct(self, x):
            # block1 is executed in graph mode and along the first dimensional slice
            x = self.block1_shard(x)
            # block2 is also executed in graph mode
            x = self.block2_shard(x)
            # block3 is executed in PyNative mode
            x = self.block3(x)
            return x
    

Function Uses Functional Sharding

function can be functionally sliced using mindspore.shard. Using the matmul+bias_add+relu function as an example:

import numpy as np

import mindspore as ms
import mindspore.ops as ops
from mindspore import Tensor

ms.set_auto_parallel_context(dataset_strategy="full_batch") # This is an example where the dataset is not sliced and the output tensor of the shard is not sliced.

def dense_relu(x, weight, bias):
    x = ops.matmul(x, weight)
    x = ops.bias_add(x, bias)
    x = ops.relu(x)
    return x

x = Tensor(np.random.uniform(0, 1, (32, 128)), ms.float32)
weight = Tensor(np.random.uniform(0, 1, (128, 10)), ms.float32)
bias = Tensor(np.random.uniform(0, 1, (10,)), ms.float32)

# Specify the sharding strategy for x to be (4, 2), the weight to be (4, 2), and the bias to be (8,).
result = ms.shard(dense_relu, in_strategy=((4, 2), (4, 2), (8,)))(x, weight, bias)
print('result.shape:', result.shape)

Note that the initialization of parameters depends on Cell parameter management, and when the fn type passed in shard is function, its definition should not contain parameters (e.g., operations such as Conv2D, Dense, etc.).

Running the Code

Currently MindSpore can pull up distributed parallel tasks through both rank table startup and mpirun.

rank table Startup

Distributed parallelism can be initiated by rank table when executing on Ascend and there is no subGroup communication.

Model parallelism generates sub-Group communication when there are dimensions of an object that are not sliced full or for which at least two dimensions are sliced.

That is, when started in this way, the communication generated by model parallelism within shard can only occur within world group, so the specified sharding strategy can currently only support slicing one dimension.

The above code needs to be configured with distributed variables before it can run. The Ascend environment needs to be configured with RANK_TABLE_FILE, RANK_ID, and DEVICE_ID. The configuration procedure can be found here.

Ascend Distributed related environment variables are:

  • RANK_TABLE_FILE: Path to the grouping information file. rank_table_file files can be generated using hccl_tools.py in the models code repository, which can be obtained from here.

  • DEVICE_ID: the actual serial number of the current card on the machine.

  • RANK_ID: logical serial number of the current card.

#!/bin/bash
set -e
echo "=============================================================================================================="
echo "Please run the script as: "
echo "bash run_shard_function_example.sh RANK_SIZE RANK_TABLE_FILE"
echo "For example: bash run_fusion_example.sh 8 ../rank_table_8pcs.json"
echo "It is better to use the absolute path."
echo "This example is expected to run on the Ascend environment."
echo "=============================================================================================================="
if [$# != 2]
then
    echo "Usage: bash run_shard_function_example.sh RANK_SIZE RANK_TABLE_FILE"
exit 1
fi
EXEC_PATH=$(pwd)

if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
    if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
        wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
    fi
    unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/
RANK_SIZE=$1
RANK_TABLE_FILE=$2
test_dist_8pcs()
{
    export RANK_TABLE_FILE=${RANK_TABLE_FILE}
    export RANK_SIZE=8
}
test_dist_${RANK_SIZE}pcs

for((i=0;i<${RANK_SIZE};i++))
do
    rm -rf device$i
    mkdir device$i
    cp ./shard_function_example.py ./device$i
    cd ./device$i
    export DEVICE_ID=$i
    export RANK_ID=$i
    echo "start training for device $i"
    env > env$i.log
    python ./shard_function_example.py > train.log$i 2>&1 &
    cd ../
done
echo "The program launch succeed, the log is under device0/train.log0."

After configuring RANK_TABLE_FILE in the current directory, the following command requires the user to have 8 Atlas training series devices. Run the command as follows:

bash run_shard_function_example.sh 8 ../rank_table_8pcs.json

During the execution process, the framework automatically performs operator-level model parallelization for the input function of shard, and the parallelization strategy for each operator is obtained by the framework search, and the whole process is user-agnostic. The graph can be saved as follows:

ms.set_context(save_graphs=2)

The parallelization strategy for each specific operator can be seen in step_parallel_end.ir.

Startup via mpirun

On Ascend and GPU, distributed parallel can be started by means of mpirun, and this startup method supports the creation of subgroup communication. The run command is as follows:

mpirun -n ${DEVICE_NUM} --output-filename log_output --allow-run-as-root python ${PYTHON_SCRIPT_PATH}

Take the sample code as an example, and start 8 cards, the corresponding command is:

bash run_mpirun_shard_function_example.sh

Note that when startup from mpirun on Ascend with a large number of subgroups, you may run into an error that creating a communication domain fails. The error message is "Ascend collective Error: "HcclCommInitRootInfo failed. | Error Number 2". You can reduce the max_device_memory in context to reserve enough memory for hccl to create the communication domain.

Running Results

After the running is completed, the following is the part of Loss results:

epoch: 0, step: 0, loss is 2.3093076
epoch: 0, step: 100, loss is 2.299726
epoch: 0, step: 200, loss is 2.3076267
epoch: 0, step: 300, loss is 2.288056
epoch: 0, step: 400, loss is 2.2772775
epoch: 0, step: 500, loss is 2.1903486
epoch: 0, step: 600, loss is 1.2501067
epoch: 0, step: 700, loss is 0.7540306
...

Usage Limitations

  • The execution mode needs to be set to PYNATIVE_MODE, the parallel configuration to AUTO_PARALLEL, and the search_mode to sharding_propagation.

  • Nested vmap use is supported, and must be used with shard outside and vmap inside.

  • Nested use of shard is not supported.

  • Nested ms.jit with shard is not supported, shard will not take effect in this scenario.