Dynamic Cluster Startup

View Source On Gitee

Overview

For reliability requirements during training, MindSpore provides dynamic cluster features that enable users to start Ascend/GPU/CPU distributed training tasks without relying on any third-party library (OpenMPI) and without any modification to the training script. We recommend users to use this startup method in preference.

The MindSpore Dynamic Cluster feature replaces the OpenMPI capability by reusing the Parameter Server mode training architecture, which can be found in the Parameter Server Mode training tutorial.

The Dynamic Cluster feature starts multiple MindSpore training processes as Workers, and starts an additional Scheduler for cluster and disaster recovery, thus, distributed training can be achieved without the need for OpenMPI's message passing mechanism. The user only needs to make a few changes to the startup script to perform distributed training.

Dynamic cluster supports Ascend, GPU and CPU, so the dynamic cluster startup script can be quickly migrated between multiple hardware platforms without additional modifications.

The relevant environment variables:

Environment Variables Function Type Value Description
MS_ROLE Specifies the role of this process. String
  • MS_SCHED: represents the Scheduler process. A training task starts only one Scheduler, which is responsible for networking, disaster recovery, etc., and does not execute training code.
  • MS_WORKER: Represents the Worker process, which generally sets up the distributed training process for this role.
  • MS_PSERVER: represents the Parameter Server process. Only in Parameter Server mode this role is effective. Please refer to Parameter Server Mode.
The Worker and Parameter Server processes register with the Scheduler process to complete the networking.
MS_SCHED_HOST Specifies the IP address of the Scheduler. String Legal IP address. IPv6 addresses are only supported on `Ascend` platform in current version. In IPv6 case, environment variable MS_HCCL_CM_INIT must be set to true.
MS_SCHED_PORT Specifies the Scheduler binding port number. Integer Port number in the range of 1024 to 65535.
MS_NODE_ID Specifies the ID of this process, unique within the cluster. String Represents the unique ID of this process, which is automatically generated by MindSpore by default. MS_NODE_ID needs to be set in the following cases. Normally it does not need to be set and is automatically generated by MindSpore:
  • Enable Disaster Recovery Scenario: Disaster recovery requires obtaining the current process ID and thus re-registering with the Scheduler.
  • Enable GLOG log redirection scenario: In order to ensure that the logs of each training process are saved independently, it is necessary to set the process ID, which is used as the log saving path suffix.
  • Specify process rank id scenario: users can specify the rank id of this process by setting MS_NODE_ID to some integer.
MS_WORKER_NUM Specifies the number of processes with the role MS_WORKER. Integer Integer greater than 0. The number of Worker processes started by the user should be equal to the value of this environment variable. If it is less than this value, the networking fails. If it is greater than this value, the Scheduler process will complete the networking according to the order of Worker registration, and the redundant Worker processes will fail to start.
MS_SERVER_NUM Specifies the number of processes with the role MS_PSERVER. Integer Integer greater than 0. Only set in Parameter Server training mode.
MS_WORKER_IP Specifies the IP address used for communication and networking between processes. String Legitimate IP address. This environment variable must be set when using IPv6. But when MS_SCHED_HOST is set to ::1(Representing local loopback interface in IPv6), there's no need to set MS_WORKER_IP because MindSpore will use local loopback interface to communicate by default.
MS_ENABLE_RECOVERY Turn on disaster recovery. Integer 1 for on, 0 for off. The default is 0.
MS_RECOVERY_PATH Persistent path folder. String Legal user directory. The Worker and Scheduler processes perform the necessary persistence during execution, such as node information for restoring the networking and training the intermediate state of the service, and are saved via files.
MS_HCCL_CM_INIT Whether to use the CM method to initialize the HCCL. Integer 1 for yes, other values for no. The default is no. This environment variable is only recommended to be turned on for Ascend hardware platforms with a large number of communication domains. Turning on this environment variable reduces the memory footprint of the HCCL collection of communication libraries, and the training tasks are executed in the same way as rank table startup method.
When it is set to 1, the graph compilation level can not be O0, otherwise Environment variable settings conflict exception will be thrown by MindSpore.
MS_ENABLE_LCCL Whether to use LCCL as communication library. Integer 1 for yes, other values for no. The default is no. The LCCL communication library currently only supports single-machine multi-card scenario and must be executed when the graph compilation level is O0. LCCL does not support creating sub communication groups.
MS_TOPO_TIMEOUT Cluster networking phase timeout time in seconds. Integer The default is 30 minutes. This value represents that all nodes can register to the scheduler within this time window. If the time window is exceeded, registration will fail and if the number of nodes does not meet the requirements, cluster networking will fail. We suggest users to configure this environment variable when the cluster is in large-scale.
MS_NODE_TIMEOUT Node heartbeat timeout in seconds。 Integer The default is 300 seconds. This value represents the heartbeat timeout time between the scheduler and the worker. If there are no heartbeat messages within this time window, the cluster will exit abnormally.
MS_RECEIVE_MSG_TIMEOUT Node timeout for receiving messages in seconds. Integer The default is 300 seconds. This value represents the timeout window for the node to receive messages from the other end. If there is no message response within the time window, an empty message is returned.
MS_RETRY_INTERVAL_LOWER Lower limit of message retry interval between nodes in seconds. Integer The default is 3 seconds. This value represents the lower limit of the time interval between each retry of sending a message by a node. MindSpore randomly selects the value between MS_RETRY_INTERVAL_LOWER and MS_RETRY_INTERVAL_UPPER as the interval time. This variable is used to control the message concurrency of the Scheduler.
MS_RETRY_INTERVAL_UPPER Upper limit of message retry interval between nodes in seconds Integer The default is 5 seconds. This value represents the upper limit of the time interval between each retry of sending a message by a node. MindSpore randomly selects the value between MS_RETRY_INTERVAL_LOWER and MS_RETRY_INTERVAL_UPPER as the interval time. This variable is used to control the message concurrency of the Scheduler.
MS_DISABLE_HEARTBEAT Disable the heartbeat feature between nodes in the cluster. Integer Heartbeat feature is enabled by default. If set to 1, the heartbeat between cluster nodes will be disabled. In this scenario, Scheduler will not detect Workers' exception and will not control the cluster to exit. This variable can reduce the message concurrency of the Scheduler.

The environment variables MS_SCHED_HOST, MS_SCHED_PORT, and MS_WORKER_NUM need to be consistent in their contents, or else the networking will fail due to the inconsistency in the configurations of the processes.

Operation Practice

Dynamic cluster startup scripts are consistent across hardware platforms. The following is an example of how to write a startup script for Ascend:

You can download the full sample code here: startup_method.

The directory structure is as follows:

└─ sample_code
    ├─ startup_method
       ├── net.py
       ├── run_dynamic_cluster.sh
       ├── run_dynamic_cluster_1.sh
       ├── run_dynamic_cluster_2.sh
    ...

net.py is defining the network structure and training process, and run_dynamic_cluster.sh, run_dynamic_cluster_1.sh and run_dynamic_cluster_2.sh are executing scripts.

1. Preparing Python Training Scripts

Here, as an example of data parallel, a recognition network is trained for the MNIST dataset.

First specify the operation mode, hardware device, etc. Unlike single card scripts, parallel scripts also need to specify configuration items such as parallel mode and initialize HCCL, NCCL or MCCL communication via init(). If you don't set device_target here, it will be automatically specified as the backend hardware device corresponding to the MindSpore package.

import mindspore as ms
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True)
init()
ms.set_seed(1)

Then build the following network:

from mindspore import nn

class Network(nn.Cell):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.fc = nn.Dense(28*28, 10, weight_init="normal", bias_init="zeros")
        self.relu = nn.ReLU()

    def construct(self, x):
        x = self.flatten(x)
        logits = self.relu(self.fc(x))
        return logits
net = Network()

Finally, the dataset is processed and the training process is defined:

import os
from mindspore import nn
import mindspore as ms
import mindspore.dataset as ds
from mindspore.communication import get_rank, get_group_size

def create_dataset(batch_size):
    dataset_path = os.getenv("DATA_PATH")
    rank_id = get_rank()
    rank_size = get_group_size()
    dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset(32)
loss_fn = nn.CrossEntropyLoss()
optimizer = nn.SGD(net.trainable_params(), 1e-2)

def forward_fn(data, label):
    logits = net(data)
    loss = loss_fn(logits, label)
    return loss, logits

grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)
grad_reducer = nn.DistributedGradReducer(optimizer.parameters)

for epoch in range(10):
    i = 0
    for data, label in data_set:
        (loss, _), grads = grad_fn(data, label)
        grads = grad_reducer(grads)
        optimizer(grads)
        if i % 10 == 0:
            print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss))
        i += 1

2. Preparing the Startup Script

Single-Machine Multi-Card

The content of the single-machine multi-card startup script run_dynamic_cluster.sh is as follows. Taking the single-machine 8-card as an example:

EXEC_PATH=$(pwd)
if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
    if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
        wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
    fi
    unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/

rm -rf device
mkdir device
echo "start training"

# Start 8 Worker training processes in a loop
for((i=0;i<8;i++));
do
    export MS_WORKER_NUM=8          # Set the number of Worker processes in the cluster to 8
    export MS_SCHED_HOST=127.0.0.1  # Set the Scheduler IP address to the local loop address
    export MS_SCHED_PORT=8118       # Set Scheduler port
    export MS_ROLE=MS_WORKER        # Set the started process to the MS_WORKER role
    export MS_NODE_ID=$i                      # Set process id, optional
    python ./net.py > device/worker_$i.log 2>&1 &                            # Start training script
done

# Start 1 Scheduler process
export MS_WORKER_NUM=8              # Set the number of Worker processes in the cluster to 8
export MS_SCHED_HOST=127.0.0.1      # Set the Scheduler IP address to the local loop address
export MS_SCHED_PORT=8118           # Set Scheduler port
export MS_ROLE=MS_SCHED             # Set the started process to the MS_SCHED role
python ./net.py > device/scheduler.log 2>&1 &     # Start training script

The training scripts for the Scheduler and Worker processes are identical in content and startup method, because the internal processes of the two roles are handled differently in MindSpore. Users simply pull up the process in the normal training manner, without modifying the Python code by role. This is one of the reasons why dynamic cluster startup scripts can be consistent across multiple hardware platforms.

A single-machine 8-card distributed training can be executed by executing the following command:

bash run_dynamic_cluster.sh

The script will run in the background, the log file will be saved to the device directory and the result will be saved in the worker_*.log and is as follows:

epoch: 0, step: 0, loss is 2.3499548
epoch: 0, step: 10, loss is 1.6682479
epoch: 0, step: 20, loss is 1.4237018
epoch: 0, step: 30, loss is 1.0437132
epoch: 0, step: 40, loss is 1.0643986
epoch: 0, step: 50, loss is 1.1021575
epoch: 0, step: 60, loss is 0.8510884
epoch: 0, step: 70, loss is 1.0581372
epoch: 0, step: 80, loss is 1.0076828
epoch: 0, step: 90, loss is 0.88950706
...

Multi-Machine Multi-Card

The startup script needs to be split in the multi-machine training scenario. The following is an example of performing 2-machine 8-card training, with each machine executing the startup 4 Worker:

The script run_dynamic_cluster_1.sh starts 1 Scheduler process and 4 Worker processes on node 1:

EXEC_PATH=$(pwd)
if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
    if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
        wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
    fi
    unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/

rm -rf device
mkdir device
echo "start training"

# Start Worker1 to Worker4, 4 Worker training processes in a loop
for((i=0;i<4;i++));
do
    export MS_WORKER_NUM=8                    # Set the total number of Worker processes in the cluster to 8 (including other node processes)
    export MS_SCHED_HOST=<node_1 ip address>  # Set the Scheduler IP address to the Node 1 IP address
    export MS_SCHED_PORT=8118                 # Set the Scheduler port
    export MS_ROLE=MS_WORKER                  # Set the startup process to the MS_WORKER role
    export MS_NODE_ID=$i                      # Set process id, optional
    python ./net.py > device/worker_$i.log 2>&1 &                                    # Start training script
done

# Start 1 Scheduler process on node 1
export MS_WORKER_NUM=8                        # Set the total number of Worker processes in the cluster to 8 (including other node processes)
export MS_SCHED_HOST=<node_1 ip address>      # Set the Scheduler IP address to the Node 1 IP address
export MS_SCHED_PORT=8118                     # Set the Scheduler port
export MS_ROLE=MS_SCHED                       # Set the startup process to the MS_SCHED role
python ./net.py > device/scheduler.log 2>&1 &    # Start training script

The script run_dynamic_cluster_2.sh starts Worker5 to Worker8 on node 2 (without executing Scheduler):

EXEC_PATH=$(pwd)
if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
    if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
        wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
    fi
    unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/

rm -rf device
mkdir device
echo "start training"

# Start Worker5 to Worker8, 4 Worker training processes in a loop
for((i=4;i<8;i++));
do
    export MS_WORKER_NUM=8                    # Set the total number of Worker processes in the cluster to 8 (including other node processes)
    export MS_SCHED_HOST=<node_1 ip address>  # Set the Scheduler IP address to the Node 1 IP address
    export MS_SCHED_PORT=8118                 # Set the Scheduler port
    export MS_ROLE=MS_WORKER                  # Set the startup process to the MS_WORKER role
    export MS_NODE_ID=$i                      # Set process id, optional
    python ./net.py > device/worker_$i.log 2>&1 &                             # Start training script
done

The multi-machine task MS_WORKER_NUM should be the total number of Worker nodes in the cluster. To keep the inter-node network connected, use the telnet <scheduler ip> <scheduler port> command to test whether this node is connected to the started Scheduler node.

Execute on Node 1:

bash run_dynamic_cluster_1.sh

Execute on Node 2:

bash run_dynamic_cluster_2.sh

That is, you can perform 2-machine 8-card distributed training tasks.

Disaster Recovery

Dynamic cluster supports disaster recovery under data parallel. In a parallel training scenario with multi-card data, if a process quits abnormally, the training can be continued after pulling up the corresponding script of the corresponding process again, and the accuracy convergence will not be affected. Disaster recovery configuration and samples can be found in the Disaster Recovery in Dynamic Cluster Scenarios tutorial.

Security Authentication

Dynamic cluster also supports the Secure Encrypted Channel feature, which supports the TLS/SSL protocol to satisfy users security needs. By default, the secure encrypted channel is turned off. If you need to turn it on, call init() only after configuring the secure encrypted channel correctly via set_ps_context, otherwise the initialization of the networking will fail. If you want to use the secure Encrypted channel, please configure it:

set_ps_context(config_file_path="/path/to/config_file.json", enable_ssl=True, client_password="123456", server_password="123456")

The config.json configuration file specified by config_file_path needs to add the following fields:

{
  "server_cert_path": "server.p12",
  "crl_path": "",
  "client_cert_path": "client.p12",
  "ca_cert_path": "ca.crt",
  "cipher_list": "ECDHE-R SA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-DSS-AES256-GCM-SHA384:DHE-PSK-AES128-GCM-SHA256:DHE-PSK-AES256-GCM-SHA384:DHE-PSK-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-PSK-CHACHA20-POLY1305:DHE-RSA-AES128-CCM:DHE-RSA-AES256-CCM:DHE-RSA-CHACHA20-POLY1305:DHE-PSK-AES128-CCM:DHE-PSK-AES256-CCM:ECDHE-ECDSA-AES128-CCM:ECDHE-ECDSA-AES256-CCM:ECDHE-ECDSA-CHACHA20-POLY1305",
  "cert_expire_warning_time_in_day": 90
}
  • server_cert_path: The path to the p12 file (SSL-specific certificate file) that contains the cipher text of the certificate and the secret key on the server side.

  • crl_path: The file path to the revocation list (used to distinguish invalid untrusted certificates from valid trusted certificates).

  • client_cert_path: The client contains the path to the p12 file (SSL-specific certificate file) with the cipher text of the certificate and secret key.

  • ca_cert_path: The path to root certificate

  • cipher_list: Cipher suite (list of supported SSL encrypted types)

  • cert_expire_warning_time_in_da: The warning time of certificate expiration.

The secret key in the p12 file is stored in cipher text, and the password needs to be passed in when starting. Please refer to the Python API mindspore.set_ps_context for the client_password and server_password fields.