Dynamic Cluster Startup Method

View Source On Gitee

Overview

For reliability requirements during training, MindSpore provides dynamic cluster features that enable users to start Ascend/GPU/CPU distributed training tasks without relying on any third-party library (OpenMPI) and without any modification to the training script. We recommend users to use this startup method in preference. Users can click Multi-Card Startup Method to check the support of multi-card startup method on different platforms.

OpenMPI synchronizes data on the Host side and clustering between processes in a distributed training scenario. The MindSpore Dynamic Cluster feature replaces the OpenMPI capability by reusing the Parameter Server mode training architecture, which can be found in the Parameter Server Mode training tutorial.

The Dynami Cluster feature starts multiple MindSpore training processes as Workers, and starts an additional Scheduler for cluster and disaster recovery. The user only needs to make a few changes to the startup script to perform distributed training.

Dynamic cluster startup scripts can be quickly migrated between multiple hardware platforms without additional modifications.

Precautions

  • Dynamic cluster does not currently support PyNative mode.

Environment Variables

Several environment variables need to be exported before the training script can be started for dynamic cluster, as shown in the following table:

Environment Variables Functions Types Values Descriptions
MS_ROLE Specify this process role. String
  • MS_SCHED: Represents a Scheduler process. Only one Scheduler is started for a training task, responsible for cluster, disaster recovery, etc. No training code is executed.
  • MS_WORKER: Represents the Worker process, and generally sets the distributed training process to this role.
  • MS_PSERVER: Represents Parameter Server process. This role is only available in Parameter Server mode. For more details, refer to Parameter Server Mode.
The Worker and Parameter Server processes will register with the Scheduler process to complete the cluster.
MS_SCHED_HOST Specify IP address of Scheduler. String Legitimate IP address. The current version does not support IPv6 addresses.
MS_SCHED_PORT Specify the Scheduler binding port number. Integer Port number in the range of 1024 to 65535.
MS_NODE_ID Specify the ID of this process, unique within the cluster. String Represents the unique ID of this process, which is automatically generated by MindSpore by default. MS_NODE_ID needs to be set in the following cases, but in general it does not need to be set and is automatically generated by MindSpore:
  • Start disaster recovery scenario: The current process ID needs to be obtained for disaster recovery so as to re-register with Scheduler.
  • Start GLOG log redirection scenario: In order to ensure that each training process log is saved independently, you need to set the process ID as the log saving path suffix.
  • Specify process rank id scenario: Users can specify the rank id of this process by setting MS_NODE_ID to some integer.
MS_WORKER_NUM Specify the number of processes whose role is MS_WORKER. Integer The integer greater than 0. The number of Worker processes started by the user should be equal to the value of this environment variable. If it is less than this value, the cluster will fail, while if it is more than this value, the Scheduler process will complete the cluster according to the Worker registration order, and the extra Worker processes will fail to start.
MS_SERVER_NUM Specify the number of processes whose role is MS_PSERVER. Integer The integer greater than 0. Only set Parameter Server training mode.
MS_ENABLE_RECOVERY Turn on disaster recovery. Integer 1 means on, and 0 means off. The default is 0.
MS_RECOVERY_PATH Persistent path folder. String Legitimate user directory. The Worker and Scheduler processes perform the necessary persistence during execution, such as the node information used to recover the cluster and the intermediate state of the training service, and save it through files.
MS_HCCL_CM_INIT Whether to initialize the HCCL using the CM method. Integer 1 means on, and 0 means off. The default is 0. It is recommended that this environment variable be on only on Ascend hardware platforms with a high number of communication domains. Turning on this environment variable reduces the memory footprint of the HCCL collective communication library, and training tasks are executed in the same way as `rank table` starts.

The above environment variables should be set before each process starts and the contents of MS_SCHED_HOST, MS_SCHED_PORT and MS_WORKER_NUM should be consistent, otherwise the network will fail due to the inconsistent configuration of each process.

Executing Training Tasks

Since the Dynamic Cluster startup script can be consistent across hardware platforms, the following is an example of how to write a startup script using 8-card distributed training on a GPU hardware platform only:

The running directory of sample: distributed_training

1. Preparing Python Training Scripts

import mindspore as ms
from mindspore.train import CheckpointConfig, ModelCheckpoint
from mindspore.communication import init

if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU")
    init()
    ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True)
    ...

where

  • mode=GRAPH_MODE: To use distributed training, you need to specify the running mode as graph mode (the current version of Dynamic Cluster feature does not support PyNative mode).

  • init(): Initializing the cluster. Initialize the collective communication library (NCCL in this case) according to the backend specified in the set_context interface, and complete the distributed training initialization operation.

  • ms.ParallelMode.DATA_PARALLEL: Set the training mode to data parallel mode.

Dynamic cluster also supports secure encrypted channel features and supports TLS/SSL protocols to meet users’ security needs. By default, the secure encrypted channel is off. If you need to turn it on, you can call init() only after the secure encrypted channel is configured correctly through set_ps_context, otherwise the initialization of the cluster will fail. If you want to use the secure encrypted channel, please configure:

set_ps_context(config_file_path="/path/to/config_file.json", enable_ssl=True, client_password="123456", server_password="123456")

For the detailed parameter configuration descriptions, refer to mindspore.set_ps_context and and the Security Authentication section of this document.

2. Preparing the Startup Script

Single-Machine Multi-Card

The content of the single-machine multi-card startup script run_gpu_cluster.sh is as follows. Before starting the Worker and Scheduler, you need to add the relevant environment variable settings:

#!/bin/bash

echo "=========================================="
echo "Please run the script as: "
echo "bash run_gpu_cluster.sh DATA_PATH"
echo "For example: bash run_gpu_cluster.sh /path/dataset"
echo "It is better to use the absolute path."
echo "==========================================="
DATA_PATH=$1
export DATA_PATH=${DATA_PATH}

rm -rf device
mkdir device
cp ./resnet50_distributed_training_gpu.py ./resnet.py ./device
cd ./device
echo "start training"

# Start 8 Worker training processes in a loop
for((i=0;i<8;i++));
do
    export MS_WORKER_NUM=8          # Set the number of Worker processes in the cluster to 8
    export MS_SCHED_HOST=127.0.0.1  # Set the Scheduler IP address to the local loop address
    export MS_SCHED_PORT=8118       # Set Scheduler port
    export MS_ROLE=MS_WORKER        # Set the started process to the MS_WORKER role
    export MS_NODE_ID=$i                      # Set process id, optional
    pytest -s -v ./resnet50_distributed_training_gpu.py > worker_$i.log 2>&1 &                             # Start training script
done

# Start 1 Scheduler process
export MS_WORKER_NUM=8              # Set the number of Worker processes in the cluster to 8
export MS_SCHED_HOST=127.0.0.1      # Set the Scheduler IP address to the local loop address
export MS_SCHED_PORT=8118           # Set Scheduler port
export MS_ROLE=MS_SCHED             # Set the started process to the MS_SCHED role
pytest -s -v ./resnet50_distributed_training_gpu.py > scheduler.log 2>&1 &     # Start training script

The training scripts for the Scheduler and Worker processes are identical in content and startup method, because the internal processes of the two roles are handled differently in MindSpore. Users simply pull up the process in the normal training manner, without modifying the Python code by role. This is one of the reasons why dynamic cluster startup scripts can be consistent across multiple hardware platforms.

A single-machine 8-card distributed training can be executed by executing the following command:

./run_gpu_cluster.sh /path/to/dataset/

Multi-Machine Multi-Card

The startup script needs to be split in the multi-machine training scenario. The following is an example of performing 2-machine 8-card training, with each machine executing the startup 4 Worker:

The script run_gpu_cluster_1.sh starts 1 Scheduler and Worker1 to Worker4 on node 1:

#!/bin/bash

echo "=========================================="
echo "Please run the script as: "
echo "bash run_gpu_cluster.sh DATA_PATH"
echo "For example: bash run_gpu_cluster.sh /path/dataset"
echo "It is better to use the absolute path."
echo "==========================================="
DATA_PATH=$1
export DATA_PATH=${DATA_PATH}

rm -rf device
mkdir device
cp ./resnet50_distributed_training_gpu.py ./resnet.py ./device
cd ./device
echo "start training"

# Start Worker1 to Worker4, 4 Worker training processes in a loop
for((i=0;i<4;i++));
do
    export MS_WORKER_NUM=8                    # Set the total number of Worker processes in the cluster to 8 (including other node processes)
    export MS_SCHED_HOST=<node_1 ip address>  # Set the Scheduler IP address to the Node 1 IP address
    export MS_SCHED_PORT=8118                 # Set the Scheduler port
    export MS_ROLE=MS_WORKER                  # Set the startup process to the MS_WORKER role
    export MS_NODE_ID=$i                      # Set process id, optional
    pytest -s -v ./resnet50_distributed_training_gpu.py > worker_$i.log 2>&1 &                                       # Start training script
done

# Start 1 Scheduler process on node 1
export MS_WORKER_NUM=8                        # Set the total number of Worker processes in the cluster to 8 (including other node processes)
export MS_SCHED_HOST=<node_1 ip address>      # Set the Scheduler IP address to the Node 1 IP address
export MS_SCHED_PORT=8118                     # Set the Scheduler port
export MS_ROLE=MS_SCHED                       # Set the startup process to the MS_SCHED role
pytest -s -v ./resnet50_distributed_training_gpu.py > scheduler.log 2>&1 &     # Start training script

The script run_gpu_cluster_2.sh starts Worker5 to Worker8 on node 2 (without executing Scheduler):

#!/bin/bash

echo "=========================================="
echo "Please run the script as: "
echo "bash run_gpu_cluster.sh DATA_PATH"
echo "For example: bash run_gpu_cluster.sh /path/dataset"
echo "It is better to use the absolute path."
echo "==========================================="
DATA_PATH=$1
export DATA_PATH=${DATA_PATH}

rm -rf device
mkdir device
cp ./resnet50_distributed_training_gpu.py ./resnet.py ./device
cd ./device
echo "start training"

# Start Worker5 to Worker8, 4 Worker training processes in a loop
for((i=4;i<8;i++));
do
    export MS_WORKER_NUM=8                    # Set the total number of Worker processes in the cluster to 8 (including other node processes)
    export MS_SCHED_HOST=<node_1 ip address>  # Set the Scheduler IP address to the Node 1 IP address
    export MS_SCHED_PORT=8118                 # Set the Scheduler port
    export MS_ROLE=MS_WORKER                  # Set the startup process to the MS_WORKER role
    export MS_NODE_ID=$i                      # Set process id, optional
    pytest -s -v ./resnet50_distributed_training_gpu.py > worker_$i.log 2>&1 &                                       # Start training script
done

The multi-machine task MS_WORKER_NUM should be the total number of Worker nodes in the cluster. To keep the inter-node network connected, use the telnet <scheduler ip> <scheduler port> command to test whether this node is connected to the started Scheduler node.

Execute on Node 1:

./run_gpu_cluster_1.sh /path/to/dataset/

Execute on Node 2:

./run_gpu_cluster_2.sh /path/to/dataset/

That is, you can perform 2-machine 8-card distributed training tasks.

The above startup scripts are consistent across Ascend and CPU hardware platforms, and only hardware-related code modifications such as device_target in the Python training scripts is performed, and we can execute dynamic cluster distributed training.

3. Execution Results

The script will run in the background and the log file will be saved to the current directory. A total of 10 epochs are run, each of which has 234 steps. The results about the Loss part are saved in worker_*.log. After grep out the loss value, the example is as follows:

epoch: 1 step: 234, loss is 2.0084016
epoch: 2 step: 234, loss is 1.6407638
epoch: 3 step: 234, loss is 1.6164391
epoch: 4 step: 234, loss is 1.6838071
epoch: 5 step: 234, loss is 1.6320667
epoch: 6 step: 234, loss is 1.3098773
epoch: 7 step: 234, loss is 1.3515002
epoch: 8 step: 234, loss is 1.2943741
epoch: 9 step: 234, loss is 1.2316195
epoch: 10 step: 234, loss is 1.1533381

Disaster Recovery

Model training requires high reliability and serviceability of distributed training architecture. MindSpore supports disaster recovery under data parallelism, and the training tasks continue to be executed normally after the processes in the cluster (multiple Workers and 1 Scheduler) of the multi-card data parallel training scenario exit abnormally and are pulled up again.

Scenario constraints: In graph mode, MindData is used for data sink mode training. Data parallel mode is turned on, and the Worker process is pulled up using the non-OpenMPI approach described above.

In the above scenario, if a node is interrupted during the training process, it is guaranteed that the training can continue after pulling up the corresponding script of the corresponding process for the same environment variables (MS_ENABLE_RECOVERY and MS_RECOVERY_PATH), and the accuracy convergence is not affected.

  1. Enable disaster recovery:

    Enabling disaster recovery through environment variables:

    export MS_ENABLE_RECOVERY=1                 # Enable disaster recovery
    export MS_RECOVERY_PATH=/path/to/recovery/  # Configure the persistence path file
    
  2. Configure the checkpoint save interval. The sample is as follows:

    from mindspore.train import ModelCheckpoint, CheckpointConfig
    
    ckptconfig = CheckpointConfig(save_checkpoint_steps=100, keep_checkpoint_max=5)
    ckpoint_cb = ModelCheckpoint(prefix='train', directory="./ckpt_of_rank_/"+str(get_rank()), config=ckptconfig)
    

Each worker is enabled to save checkpoint and use different paths (e.g., the directory in the above sample is set using rank id to ensure that the paths are not the same) to prevent conflicts in saving checkpoints with the same name. The checkpoint is used for abnormal process recovery and normal process rollback. The rollback of training means that each Worker in the cluster is restored to the state corresponding to the latest checkpoint, while the data side is also rolled back to the corresponding step, and then training continues. The interval between checkpoint saves is configurable, which determines the granularity of disaster recovery. The smaller the interval, the smaller the number of steps rolled back to the checkpoint saved last time, but frequent checkpoint saves may also affect training efficiency. The larger the interval, the opposite effect. keep_checkpoint_max is set to at least 2 (to prevent checkpoint save failures).

The running directory of sample: distributed_training.

The scripts involved are run_gpu_cluster_recovery.sh, resnet50_distributed_training_gpu_recovery.py, and resnet.py. The script contents run_gpu_cluster_recovery.sh are as follows:

#!/bin/bash

echo "=========================================="
echo "Please run the script as: "
echo "bash run_gpu_cluster_recovery.sh DATA_PATH"
echo "For example: bash run_gpu_cluster_recovery.sh /path/dataset"
echo "It is better to use the absolute path."
echo "==========================================="
DATA_PATH=$1
export DATA_PATH=${DATA_PATH}

export MS_ENABLE_RECOVERY=1                 # Enable disaster recovery
export MS_RECOVERY_PATH=/path/to/recovery/  # Configure the persistence path file

rm -rf device
mkdir device
cp ./resnet50_distributed_training_gpu_recovery.py ./resnet.py ./device
cd ./device
echo "start training"

# Start 1 Scheduler process
export MS_WORKER_NUM=8              # Set the number of Worker processes in the cluster to 8
export MS_SCHED_HOST=127.0.0.1      # Set the Scheduler IP address to the local loop address
export MS_SCHED_PORT=8118           # Set Scheduler port
export MS_ROLE=MS_SCHED             # Set the started process to the MS_SCHED role
export MS_NODE_ID=sched             # Set Node ID as 'sched'
pytest -s -v ./resnet50_distributed_training_gpu_recovery.py > scheduler.log 2>&1 &

# Start 8 Worker training process in a loop
for((i=0;i<8;i++));
do
    export MS_WORKER_NUM=8              # Set the number of Worker processes in the cluster to 8
    export MS_SCHED_HOST=127.0.0.1      # Set the Scheduler IP address to the local loop address
    export MS_SCHED_PORT=8118           # Set Scheduler port
    export MS_ROLE=MS_WORKER            # Set the started process to the MS_WORKER role
    export MS_NODE_ID=worker_$i         # Set Node ID as 'worker_$i'
    pytest -s -v ./resnet50_distributed_training_gpu_recovery.py > worker_$i.log 2>&1 &
done

Before starting Worker and Scheduler, you need to add relevant environment variables settings, such as IP and Port of Scheduler, and whether the role of the current process is Worker or Scheduler.

Execute the following command to start a single-machine 8-card data parallel training

bash run_gpu_cluster_recovery.sh /path/to/recovery/

Distributed training starts, and if an abnormal case is encountered during training, such as a process quitting abnormally, and then restarting the corresponding process, the training process can be recovered: For example, if the Scheduler process abnormally exits during training, the following command can be executed to restart the Scheduler:

export DATA_PATH=YOUR_DATA_PATH
export MS_ENABLE_RECOVERY=1                # Enable disaster recovery
export MS_RECOVERY_PATH=/path/to/recovery/ # Configure the persistence path file

cd ./device

# 启动1个Scheduler进程
export MS_WORKER_NUM=8              # Set the number of Worker processes in the cluster to 8
export MS_SCHED_HOST=127.0.0.1      # Set the Scheduler IP address to the local loop address
export MS_SCHED_PORT=8118           # Set Scheduler port
export MS_ROLE=MS_SCHED             # Set the started process to the MS_SCHED role
export MS_NODE_ID=sched             # Set Node ID as 'sched'
pytest -s -v ./resnet50_distributed_training_gpu_recovery.py > scheduler.log 2>&1 &

Worker and Scheduler cluster is automatically restored.

Worker processes with abnormal exit are handled in a similar way (Note: Worker processes with abnormal exit need to wait 30s before pulling up to resume training. Before that, Scheduler rejects Worker with the same node id to register again in order to prevent network jitter and malicious registration).

Security Authentication

To support SSL security authentication between nodes/processes, to enable security authentication, configure enable_ssl=True via the Python API mindspore.set_ps_context (defaults to False when not passed in, indicating that SSL security authentication is not enabled). The config.json configuration file specified by config_file_path needs to add the following fields:

{
  "server_cert_path": "server.p12",
  "crl_path": "",
  "client_cert_path": "client.p12",
  "ca_cert_path": "ca.crt",
  "cipher_list": "ECDHE-R SA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-DSS-AES256-GCM-SHA384:DHE-PSK-AES128-GCM-SHA256:DHE-PSK-AES256-GCM-SHA384:DHE-PSK-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-PSK-CHACHA20-POLY1305:DHE-RSA-AES128-CCM:DHE-RSA-AES256-CCM:DHE-RSA-CHACHA20-POLY1305:DHE-PSK-AES128-CCM:DHE-PSK-AES256-CCM:ECDHE-ECDSA-AES128-CCM:ECDHE-ECDSA-AES256-CCM:ECDHE-ECDSA-CHACHA20-POLY1305",
  "cert_expire_warning_time_in_day": 90
}
  • server_cert_path: The path to the p12 file (SSL-specific certificate file) that contains the cipher text of the certificate and the secret key on the server side.

  • The file path to the revocation list (used to distinguish invalid untrusted certificates from valid trusted certificates).

  • client_cert_path: The client contains the path to the p12 file (SSL-specific certificate file) with the cipher text of the certificate and secret key.

  • ca_cert_path: The path to root certificate

  • cipher_list: Cipher suite (list of supported SSL encrypted types)

  • cert_expire_warning_time_in_da: The warning time of certificate expiration.

The secret key in the p12 file is stored in cipher text, and the password needs to be passed in when starting. Please refer to the Python API mindspore.set_ps_context for the client_password and server_password fields.