Dynamic Cluster Startup
Overview
For reliability requirements during training, MindSpore provides dynamic cluster features that enable users to start Ascend/GPU/CPU distributed training tasks without relying on any third-party library (OpenMPI) and without any modification to the training script. We recommend users to use this startup method in preference.
The MindSpore Dynamic Cluster feature replaces the OpenMPI capability by reusing the Parameter Server mode training architecture, which can be found in the Parameter Server Mode training tutorial.
The Dynamic Cluster feature starts multiple MindSpore training processes as Workers
, and starts an additional Scheduler
for cluster and disaster recovery. The user only needs to make a few changes to the startup script to perform distributed training.
Dynamic cluster supports Ascend, GPU and CPU, so the dynamic cluster startup script can be quickly migrated between multiple hardware platforms without additional modifications. In addition, dynamic cluster needs to run in Graph mode.
The relevant environment variables:
Environment Variables | Function | Type | Value | Description |
---|---|---|---|---|
MS_ROLE | Specifies the role of this process. | String |
|
The Worker and Parameter Server processes register with the Scheduler process to complete the networking. |
MS_SCHED_HOST | Specifies the IP address of the Scheduler. | String | Legal IP address. | IPv6 addresses are only supported on `Ascend` platform in current version. In IPv6 case, environment variable MS_HCCL_CM_INIT must be set to true. |
MS_SCHED_PORT | Specifies the Scheduler binding port number. | Integer | Port number in the range of 1024 to 65535. | |
MS_NODE_ID | Specifies the ID of this process, unique within the cluster. | String | Represents the unique ID of this process, which is automatically generated by MindSpore by default. |
MS_NODE_ID needs to be set in the following cases. Normally it does not need to be set and is automatically generated by MindSpore:
|
MS_WORKER_NUM | Specifies the number of processes with the role MS_WORKER. | Integer | Integer greater than 0. | The number of Worker processes started by the user should be equal to the value of this environment variable. If it is less than this value, the networking fails. If it is greater than this value, the Scheduler process will complete the networking according to the order of Worker registration, and the redundant Worker processes will fail to start. |
MS_SERVER_NUM | Specifies the number of processes with the role MS_PSERVER. | Integer | Integer greater than 0. | Only set in Parameter Server training mode. |
MS_WORKER_IP | Specifies the IP address used for communication and networking between processes. | String | Legitimate IP address. | This environment variable must be set when using IPv6. But when MS_SCHED_HOST is set to ::1(Representing local loopback interface in IPv6), there's no need to set MS_WORKER_IP because MindSpore will use local loopback interface to communicate by default. |
MS_ENABLE_RECOVERY | Turn on disaster recovery. | Integer | 1 for on, 0 for off. The default is 0. | |
MS_RECOVERY_PATH | Persistent path folder. | String | Legal user directory. | The Worker and Scheduler processes perform the necessary persistence during execution, such as node information for restoring the networking and training the intermediate state of the service, and are saved via files. |
MS_HCCL_CM_INIT | Whether to use the CM method to initialize the HCCL. | Integer | 1 for yes, other values for no. The default is no. | This environment variable is only recommended to be turned on for Ascend hardware platforms with a large number of communication domains. Turning on this environment variable reduces the memory footprint of the HCCL collection of communication libraries, and the training tasks are executed in the same way as rank table startup method. |
MS_TOPO_TIMEOUT | Cluster networking phase timeout time in seconds. | Integer | The default is 30 minutes. | This value represents that all nodes can register to the Scheduler within this time window. If the time window is exceeded, registration will fail and if the number of nodes does not meet the requirements, cluster networking will fail. We suggest users to configure this environment variable when the cluster is in large-scale. |
MS_CLUSTER_RETRY_NUM | Number of node's retrying registration during cluster networking phase. | Integer | The default is 610 times. | This value represents the retrying number of a worker registers to the Scheduler, with an interval of 3 seconds. |
MS_NODE_TIMEOUT | Node heartbeat timeout in seconds. | Integer | The default is 600 seconds. | This value represents the heartbeat timeout time between the Scheduler and the Worker. If there are no heartbeat messages within this time window, the cluster will exit abnormally. |
MS_RECEIVE_MSG_TIMEOUT | Node timeout for receiving messages in seconds. | Integer | The default is 600 seconds. | This value represents the timeout window for the node to receive messages from the other end. If there is no message response within the time window, an empty message is returned. |
The environment variables
MS_SCHED_HOST
,MS_SCHED_PORT
, andMS_WORKER_NUM
need to be consistent in their contents, or else the networking will fail due to the inconsistency in the configurations of the processes.
Operation Practice
Dynamic cluster startup scripts are consistent across hardware platforms. The following is an example of how to write a startup script for Ascend:
You can download the full sample code here: startup_method.
The directory structure is as follows:
└─ sample_code
├─ startup_method
├── net.py
├── run_dynamic_cluster.sh
├── run_dynamic_cluster_1.sh
├── run_dynamic_cluster_2.sh
...
net.py
is defining the network structure and training process, and run_dynamic_cluster.sh
, run_dynamic_cluster_1.sh
and run_dynamic_cluster_2.sh
are executing scripts.
1. Preparing Python Training Scripts
Here, as an example of data parallel, a recognition network is trained for the MNIST dataset, and the network structure and training process are consistent with that of the data parallel network.
First specify the operation mode, hardware device, etc. Unlike single card scripts, parallel scripts also need to specify configuration items such as parallel mode and initialize HCCL or NCCL communication via init. If you don’t set device_target
here, it will be automatically specified as the backend hardware device corresponding to the MindSpore package.
import mindspore as ms
from mindspore.communication import init
ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True)
init()
ms.set_seed(1)
Then build the following network:
from mindspore import nn
class Network(nn.Cell):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.fc = nn.Dense(28*28, 10, weight_init="normal", bias_init="zeros")
self.relu = nn.ReLU()
def construct(self, x):
x = self.flatten(x)
logits = self.relu(self.fc(x))
return logits
net = Network()
Finally, the dataset is processed and the training process is defined:
import os
from mindspore import nn
import mindspore as ms
import mindspore.dataset as ds
from mindspore.communication import get_rank, get_group_size
def create_dataset(batch_size):
dataset_path = os.getenv("DATA_PATH")
rank_id = get_rank()
rank_size = get_group_size()
dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id)
image_transforms = [
ds.vision.Rescale(1.0 / 255.0, 0),
ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
ds.vision.HWC2CHW()
]
label_transform = ds.transforms.TypeCast(ms.int32)
dataset = dataset.map(image_transforms, 'image')
dataset = dataset.map(label_transform, 'label')
dataset = dataset.batch(batch_size)
return dataset
data_set = create_dataset(32)
loss_fn = nn.CrossEntropyLoss()
optimizer = nn.SGD(net.trainable_params(), 1e-2)
def forward_fn(data, label):
logits = net(data)
loss = loss_fn(logits, label)
return loss, logits
grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)
grad_reducer = nn.DistributedGradReducer(optimizer.parameters)
for epoch in range(10):
i = 0
for data, label in data_set:
(loss, _), grads = grad_fn(data, label)
grads = grad_reducer(grads)
optimizer(grads)
if i % 10 == 0:
print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss))
i += 1
2. Preparing the Startup Script
Single-Machine Multi-Card
The content of the single-machine multi-card startup script run_dynamic_cluster.sh is as follows. Taking the single-machine 8-card as an example:
EXEC_PATH=$(pwd)
if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
fi
unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/
rm -rf device
mkdir device
echo "start training"
# Start 8 Worker training processes in a loop
for((i=0;i<8;i++));
do
export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8
export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address
export MS_SCHED_PORT=8118 # Set Scheduler port
export MS_ROLE=MS_WORKER # Set the started process to the MS_WORKER role
export MS_NODE_ID=$i # Set process id, optional
python ./net.py > device/worker_$i.log 2>&1 & # Start training script
done
# Start 1 Scheduler process
export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8
export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address
export MS_SCHED_PORT=8118 # Set Scheduler port
export MS_ROLE=MS_SCHED # Set the started process to the MS_SCHED role
python ./net.py > device/scheduler.log 2>&1 & # Start training script
The training scripts for the Scheduler and Worker processes are identical in content and startup method, because the internal processes of the two roles are handled differently in MindSpore. Users simply pull up the process in the normal training manner, without modifying the Python code by role. This is one of the reasons why dynamic cluster startup scripts can be consistent across multiple hardware platforms.
A single-machine 8-card distributed training can be executed by executing the following command:
bash run_dynamic_cluster.sh
The script will run in the background, the log file will be saved to the device directory and the result will be saved in the worker_*.log and is as follows:
epoch: 0, step: 0, loss is 2.3499548
epoch: 0, step: 10, loss is 1.6682479
epoch: 0, step: 20, loss is 1.4237018
epoch: 0, step: 30, loss is 1.0437132
epoch: 0, step: 40, loss is 1.0643986
epoch: 0, step: 50, loss is 1.1021575
epoch: 0, step: 60, loss is 0.8510884
epoch: 0, step: 70, loss is 1.0581372
epoch: 0, step: 80, loss is 1.0076828
epoch: 0, step: 90, loss is 0.88950706
...
Multi-Machine Multi-Card
The startup script needs to be split in the multi-machine training scenario. The following is an example of performing 2-machine 8-card training, with each machine executing the startup 4 Worker:
The script run_dynamic_cluster_1.sh starts 1 Scheduler
and Worker1
to Worker4
on node 1:
EXEC_PATH=$(pwd)
if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
fi
unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/
rm -rf device
mkdir device
echo "start training"
# Start Worker1 to Worker4, 4 Worker training processes in a loop
for((i=0;i<4;i++));
do
export MS_WORKER_NUM=8 # Set the total number of Worker processes in the cluster to 8 (including other node processes)
export MS_SCHED_HOST=<node_1 ip address> # Set the Scheduler IP address to the Node 1 IP address
export MS_SCHED_PORT=8118 # Set the Scheduler port
export MS_ROLE=MS_WORKER # Set the startup process to the MS_WORKER role
export MS_NODE_ID=$i # Set process id, optional
python ./net.py > device/worker_$i.log 2>&1 & # Start training script
done
# Start 1 Scheduler process on node 1
export MS_WORKER_NUM=8 # Set the total number of Worker processes in the cluster to 8 (including other node processes)
export MS_SCHED_HOST=<node_1 ip address> # Set the Scheduler IP address to the Node 1 IP address
export MS_SCHED_PORT=8118 # Set the Scheduler port
export MS_ROLE=MS_SCHED # Set the startup process to the MS_SCHED role
python ./net.py > device/scheduler.log 2>&1 & # Start training script
The script run_dynamic_cluster_2.sh starts Worker5
to Worker8
on node 2 (without executing Scheduler):
EXEC_PATH=$(pwd)
if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then
if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then
wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip
fi
unzip MNIST_Data.zip
fi
export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/
rm -rf device
mkdir device
echo "start training"
# Start Worker5 to Worker8, 4 Worker training processes in a loop
for((i=4;i<8;i++));
do
export MS_WORKER_NUM=8 # Set the total number of Worker processes in the cluster to 8 (including other node processes)
export MS_SCHED_HOST=<node_1 ip address> # Set the Scheduler IP address to the Node 1 IP address
export MS_SCHED_PORT=8118 # Set the Scheduler port
export MS_ROLE=MS_WORKER # Set the startup process to the MS_WORKER role
export MS_NODE_ID=$i # Set process id, optional
python ./net.py > device/worker_$i.log 2>&1 & # Start training script
done
The multi-machine task
MS_WORKER_NUM
should be the total number of Worker nodes in the cluster. To keep the inter-node network connected, use thetelnet <scheduler ip> <scheduler port>
command to test whether this node is connected to the started Scheduler node.
Execute on Node 1:
bash run_dynamic_cluster_1.sh
Execute on Node 2:
bash run_dynamic_cluster_2.sh
That is, you can perform 2-machine 8-card distributed training tasks.
Disaster Recovery
Dynamic cluster supports disaster recovery under data parallel. In a parallel training scenario with multi-card data, if a process quits abnormally, the training can be continued after pulling up the corresponding script of the corresponding process again, and the accuracy convergence will not be affected. Disaster recovery configuration and samples can be found in the Disaster Recovery in Dynamic Cluster Scenarios tutorial.
Security Authentication
Dynamic cluster also supports the Secure Encrypted Channel feature, which supports the TLS/SSL
protocol to satisfy users security needs. By default, the secure encrypted channel is turned off. If you need to turn it on, call init() only after configuring the secure encrypted channel correctly via set_ps_context
, otherwise the initialization of the networking will fail. If you want to use the secure Encrypted channel, please configure it:
set_ps_context(config_file_path="/path/to/config_file.json", enable_ssl=True, client_password="123456", server_password="123456")
The config.json
configuration file specified by config_file_path
needs to add the following fields:
{
"server_cert_path": "server.p12",
"crl_path": "",
"client_cert_path": "client.p12",
"ca_cert_path": "ca.crt",
"cipher_list": "ECDHE-R SA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-DSS-AES256-GCM-SHA384:DHE-PSK-AES128-GCM-SHA256:DHE-PSK-AES256-GCM-SHA384:DHE-PSK-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-PSK-CHACHA20-POLY1305:DHE-RSA-AES128-CCM:DHE-RSA-AES256-CCM:DHE-RSA-CHACHA20-POLY1305:DHE-PSK-AES128-CCM:DHE-PSK-AES256-CCM:ECDHE-ECDSA-AES128-CCM:ECDHE-ECDSA-AES256-CCM:ECDHE-ECDSA-CHACHA20-POLY1305",
"cert_expire_warning_time_in_day": 90
}
server_cert_path
: The path to the p12 file (SSL-specific certificate file) that contains the cipher text of the certificate and the secret key on the server side.crl_path
: The file path to the revocation list (used to distinguish invalid untrusted certificates from valid trusted certificates).client_cert_path
: The client contains the path to the p12 file (SSL-specific certificate file) with the cipher text of the certificate and secret key.ca_cert_path
: The path to root certificatecipher_list
: Cipher suite (list of supported SSL encrypted types)cert_expire_warning_time_in_da
: The warning time of certificate expiration.
The secret key in the p12 file is stored in cipher text, and the password needs to be passed in when starting. Please refer to the Python API mindspore.set_ps_context for the client_password
and server_password
fields.