# Copyright 2020 Huawei Technologies Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Communication management API"""
import os
from mindspore import context
from mindspore import log as logger
from mindspore.parallel._ps_context import _is_ps_mode, _is_role_pserver, _is_role_sched, _get_ps_context
from mindspore.communication._comm_helper import Backend, _get_rank_helper, _get_size_helper, \
_get_world_rank_from_group_rank_helper, _get_group_rank_from_world_rank_helper, \
_create_group_helper, _destroy_group_helper, HCCL_WORLD_COMM_GROUP, NCCL_WORLD_COMM_GROUP, \
MCCL_WORLD_COMM_GROUP, DEVICE_TO_BACKEND, _get_local_rank_helper, _get_local_size_helper, GlobalComm, \
_check_mpi_envs, _set_elegant_exit_handle, _get_group_ranks
from mindspore._c_expression import init_hccl, finalize_hccl, init_cluster, MSContext, ms_ctx_param
from mindspore.hal.device import is_initialized
__all__ = ["init", "release", "get_rank", "get_local_rank", "get_group_size",
"get_local_rank_size", "get_world_rank_from_group_rank",
"get_group_rank_from_world_rank", "create_group", "destroy_group", "get_process_group_ranks",
def _set_rank_from_mpi():
"""Set environment variable according to OMPI"""
ompi_rank_id = os.getenv("OMPI_COMM_WORLD_RANK")
ompi_device_id = os.getenv("OMPI_COMM_WORLD_LOCAL_RANK")
ompi_rank_size = os.getenv("OMPI_COMM_WORLD_SIZE")
if ompi_rank_id and os.getenv("MS_ROLE"):
logger.warning("Launching distributed job using both dynamic cluster and OpenMPI at the same time. "
"MindSpore will prioritize the use of dynamic cluster. Do not set env from OpenMPI.")
if ompi_rank_id:
os.environ["RANK_ID"] = ompi_rank_id
if ompi_device_id:
os.environ["DEVICE_ID"] = ompi_device_id
MSContext.get_instance().set_param(ms_ctx_param.device_id, int(ompi_device_id))
if ompi_rank_size:
os.environ["RANK_SIZE"] = ompi_rank_size
def _get_group(group):
"""Return the world communication group if the `group` is `DEFAULT_WORLD_COMM_GROUP`."""
return GlobalComm.WORLD_COMM_GROUP
return group
def _host_distribute():
"""Check whether host distribute needed."""
return os.getenv("MS_ROLE") or _check_mpi_envs()
def _check_parallel_envs():
Check whether parallel environment variables have been exported or not.
RuntimeError: If parallel environment variables have not been exported or have been exported to wrong values.
if not GlobalComm.CHECK_ENVS:
compile_level = os.getenv("MS_SIMULATION_LEVEL")
if compile_level:
rank_id_str = os.getenv("RANK_ID")
if not rank_id_str:
raise RuntimeError("Environment variables RANK_ID has not been exported, please export variables 'RANK_ID'.")
except ValueError:
print("Environment variables 'RANK_ID' should be number, but got the type : {}".format(type(rank_id_str)))
rank_table_file_str = os.getenv("MINDSPORE_HCCL_CONFIG_PATH")
rank_table_file_str_old = os.getenv("RANK_TABLE_FILE")
help_cluster = os.getenv("HELP_CLUSTER")
if not rank_table_file_str and not rank_table_file_str_old and not help_cluster:
raise RuntimeError("Get hccl rank_table_file failed, "
def _set_envs():
Some environmental variables must be set after `init` is completed.
This takes compatibility into account because user scripts may get 'DEVICE_ID' or 'RANK_ID' envs.
os.environ["RANK_ID"] = str(get_rank())
os.environ["DEVICE_ID"] = str(context.get_context("device_id"))
if os.getenv("RANK_SIZE") is None:
os.environ["RANK_SIZE"] = str(get_group_size())
[文档]def init(backend_name=None):
Initialize distributed backends required by communication services, e.g. ``"hccl"`` / ``"nccl"`` / ``"mccl"``.
It is usually used in distributed parallel scenarios and set before using communication services.
- The full name of ``"hccl"`` is Huawei Collective Communication Library(HCCL).
- The full name of ``"nccl"`` is NVIDIA Collective Communication Library(NCCL).
- The full name of ``"mccl"`` is MindSpore Collective Communication Library(MCCL).
- In Ascend hardware platforms, ``init()`` should be set before the definition of any Tensor and Parameter,
and the instantiation and execution of any operation and net.
backend_name (str): Backend, using ``"hccl"`` / ``"nccl"`` / ``"mccl"``.
``"hccl"`` should be used for Ascend hardware platforms,
``"nccl"`` for GPU hardware platforms and ``"mccl"`` for CPU hardware platforms.
If not set, inference is automatically made based on the hardware
platform type (device_target). Default: ``None`` .
TypeError: If `backend_name` is not a string.
RuntimeError: If device target is invalid, or backend is invalid, or distributed initialization fails,
or the environment variables RANK_ID/MINDSPORE_HCCL_CONFIG_PATH
have not been exported when backend is HCCL.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> from mindspore.communication import init
>>> init()
host_init = _host_distribute()
device_target = context.get_context("device_target")
if backend_name is None:
if device_target == "Ascend":
backend_name = "hccl"
elif device_target == "GPU":
backend_name = "nccl"
elif device_target == "CPU":
backend_name = "mccl"
raise RuntimeError("For 'set_context', the argument 'device_target' {} is not supported in "
"parallel initialization, please use Ascend, GPU or CPU.".format(device_target))
if not isinstance(backend_name, str):
raise TypeError("For 'init', the argument 'backend_name' must be a string, "
"but got the type : {}".format(type(backend_name)))
if os.getenv("MS_ROLE") == "MS_SCHED":
backend_name = "mccl"
if backend_name == "hccl":
if _is_ps_mode():
# Use MindSpore cluster to build network for Parameter Server training.
if _is_role_sched() or _is_role_pserver():
raise RuntimeError("Parameter server and scheduler should use 'CPU' as backend instead of 'Ascend'")
if _get_ps_context("worker_num") == 1:
GlobalComm.INITED = True
if device_target != "Ascend":
raise RuntimeError("For 'init', the argument 'backend_name' should be '{}' to init '{}', "
"but got 'hccl'.".format(DEVICE_TO_BACKEND[device_target], device_target))
if is_initialized(device_target):
logger.warning(f"For 'init' in Ascend backend, the backend is already initialized, please set it before "
"the definition of any Tensor and Parameter, and the instantiation and execution of any "
"operation and net, otherwise the 'init' may not take effect.")
if not host_init:
GlobalComm.BACKEND = Backend("hccl")
elif backend_name == "nccl":
if device_target != "GPU":
raise RuntimeError("For 'init', the argument 'backend_name' should be '{}' to init '{}', "
"but got 'nccl'.".format(DEVICE_TO_BACKEND[device_target], device_target))
GlobalComm.BACKEND = Backend("nccl")
elif backend_name == "mccl":
GlobalComm.BACKEND = Backend("mccl")
raise RuntimeError("For 'init', the argument 'backend_name' must be one of 'hccl', 'nccl' and 'mccl', "
"but got 'backend_name' : {}".format(backend_name))
GlobalComm.INITED = True
[文档]def release():
Release distributed resource. e.g. HCCL/NCCL/MCCL.
This method should be used after init(). If not, resource will be released when program ends.
RuntimeError: If failed to release distributed resource.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> from mindspore.communication import init, release
>>> init()
>>> release()
[文档]def get_rank(group=GlobalComm.WORLD_COMM_GROUP):
Get the rank ID for the current device in the specified collective communication group.
This method should be used after init().
group (str): The communication group to work on. Normally, the group should be created by create_group,
otherwise, using the default group. Default: ``GlobalComm.WORLD_COMM_GROUP`` .
int, the rank ID of the calling process within the group.
TypeError: If group is not a string.
ValueError: If backend is invalid.
RuntimeError: If HCCL/NCCL/MCCL is not available.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> from mindspore.communication import init, get_rank
>>> init()
>>> rank_id = get_rank()
>>> print(rank_id)
>>> # the result is the rank_id in world_group
if not isinstance(group, str):
raise TypeError("For 'get_rank', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
return _get_rank_helper(group=_get_group(group))
[文档]def get_local_rank(group=GlobalComm.WORLD_COMM_GROUP):
Gets local rank ID for current device in specified collective communication group.
This method isn't supported in GPU and CPU versions of MindSpore.
This method should be used after init().
group (str): The communication group to work on. Normally, the group should be created by create_group,
otherwise, using the default group. Default: ``GlobalComm.WORLD_COMM_GROUP``.
int, the local rank ID of the calling process within the group.
TypeError: If group is not a string.
ValueError: If backend is invalid.
RuntimeError: If HCCL is not available or MindSpore is GPU/CPU version.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore.communication import init, get_rank, get_local_rank
>>> ms.set_device(device_target="Ascend")
>>> ms.set_auto_parallel_context(device_num=16) # 2 server, each server with 8 NPU.
>>> init()
>>> world_rank = get_rank()
>>> local_rank = get_local_rank()
>>> print("local_rank is: {}, world_rank is {}".format(local_rank, world_rank))
local_rank is: 1, world_rank is 9
if not isinstance(group, str):
raise TypeError("For 'get_local_rank', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
return _get_local_rank_helper(group=_get_group(group))
[文档]def get_group_size(group=GlobalComm.WORLD_COMM_GROUP):
Get the rank size of the specified collective communication group.
This method should be used after init().
group (str): The communication group to work on. Normally, the group should be created by create_group,
otherwise, using the default group. Default: ``GlobalComm.WORLD_COMM_GROUP``.
int, the rank size of the group.
TypeError: If group is not a string.
ValueError: If backend is invalid.
RuntimeError: If HCCL/NCCL/MCCL is not available.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore.communication import init, get_group_size
>>> ms.set_auto_parallel_context(device_num=8)
>>> init()
>>> group_size = get_group_size()
>>> print("group_size is: ", group_size)
group_size is: 8
if not isinstance(group, str):
raise TypeError("For 'get_group_size', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
return _get_size_helper(group=_get_group(group))
[文档]def get_local_rank_size(group=GlobalComm.WORLD_COMM_GROUP):
Gets local rank size of the specified collective communication group.
This method isn't supported in GPU and CPU versions of MindSpore.
This method should be used after init().
group (str): The communication group to work on. The group is created by create_group
or the default world communication group. Default: ``GlobalComm.WORLD_COMM_GROUP`` .
int, the local rank size where the calling process is within the group.
TypeError: If group is not a string.
ValueError: If backend is invalid.
RuntimeError: If HCCL is not available or MindSpore is GPU/CPU version.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore.communication import init, get_local_rank_size
>>> ms.set_device(device_target="Ascend")
>>> ms.set_auto_parallel_context(device_num=16) # 2 server, each server with 8 NPU.
>>> init()
>>> local_rank_size = get_local_rank_size()
>>> print("local_rank_size is: ", local_rank_size)
local_rank_size is: 8
if not isinstance(group, str):
raise TypeError("For 'get_local_rank_size', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
return _get_local_size_helper(group=_get_group(group))
[文档]def get_world_rank_from_group_rank(group, group_rank_id):
Get the rank ID in the world communication group corresponding to
the rank ID in the specified user communication group.
This method isn't supported in GPU and CPU versions of MindSpore.
The parameter group should not be "hccl_world_group".
This method should be used after init().
group (str): The communication group to work on. The group is created by create_group.
group_rank_id (int): A rank ID in the communication group.
int, the rank ID in world communication group.
TypeError: If `group_rank_id` is not an integer or the group is not a string.
ValueError: If group is 'hccl_world_group' or backend is invalid.
RuntimeError: If HCCL is not available or MindSpore is GPU/CPU version.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore import set_context
>>> from mindspore.communication import init, create_group, get_world_rank_from_group_rank, get_rank
>>> set_context(mode=ms.GRAPH_MODE)
>>> ms.set_device(device_target="Ascend")
>>> init()
>>> group = "0-4"
>>> rank_ids = [0,4]
>>> if get_rank() in rank_ids:
... create_group(group, rank_ids)
... world_rank_id = get_world_rank_from_group_rank(group, 1)
... print("world_rank_id is: ", world_rank_id)
world_rank_id is: 4
if not isinstance(group, str):
raise TypeError("For 'get_world_rank_from_group_rank', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
return _get_world_rank_from_group_rank_helper(group=group, group_rank_id=group_rank_id)
[文档]def get_group_rank_from_world_rank(world_rank_id, group):
Get the rank ID in the specified user communication group corresponding to
the rank ID in the world communication group.
This method isn't supported in GPU and CPU versions of MindSpore.
The parameter group should not be "hccl_world_group".
This method should be used after init().
world_rank_id (int): A rank ID in the world communication group.
group (str): The communication group to work on. The group is created by create_group.
int, the rank ID in the user communication group.
TypeError: If world_rank_id is not an integer or the group is not a string.
ValueError: If group is 'hccl_world_group' or backend is invalid.
RuntimeError: If HCCL is not available or MindSpore is GPU/CPU version.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore import set_context
>>> from mindspore.communication import init, create_group, get_group_rank_from_world_rank, get_rank
>>> set_context(mode=ms.GRAPH_MODE)
>>> ms.set_device(device_target="Ascend")
>>> init()
>>> group = "0-4"
>>> rank_ids = [0,4]
>>> if get_rank() in rank_ids:
... create_group(group, rank_ids)
... group_rank_id = get_group_rank_from_world_rank(4, group)
... print("group_rank_id is: ", group_rank_id)
group_rank_id is: 1
if not isinstance(group, str):
raise TypeError("For 'get_group_rank_from_world_rank', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
return _get_group_rank_from_world_rank_helper(world_rank_id=world_rank_id, group=group)
[文档]def create_group(group, rank_ids):
Create a user collective communication group.
This method isn't supported in GPU and CPU versions of MindSpore.
The size of rank_ids should be larger than 1, rank_ids should not have duplicate data.
This method should be used after init().
Only support global single communication group in PyNative mode if you do not start with mpirun.
group (str): The name of the communication group to be created.
rank_ids (list): A list of device IDs.
TypeError: If group is not a string or `rank_ids` is not a list.
ValueError: If `rank_ids` size is not larger than 1, or `rank_ids` has duplicate data, or backend is invalid.
RuntimeError: If HCCL is not available or MindSpore is GPU/CPU version.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore import set_context
>>> from mindspore import ops
>>> from mindspore.communication import init, create_group, get_rank
>>> set_context(mode=ms.GRAPH_MODE)
>>> ms.set_device(device_target="Ascend")
>>> init()
>>> group = "0-7"
>>> rank_ids = [0,7]
>>> if get_rank() in rank_ids:
... create_group(group, rank_ids)
... allreduce = ops.AllReduce(group)
if not isinstance(group, str):
raise TypeError("For 'create_group', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
_create_group_helper(group, rank_ids)
[文档]def destroy_group(group):
Destroy the user collective communication group.
This method isn't supported in GPU and CPU versions of MindSpore.
The parameter group should not be "hccl_world_group".
This method should be used after init().
group (str): The communication group to destroy, the group should be created by create_group.
TypeError: If group is not a string.
ValueError: If group is "hccl_world_group" or backend is invalid.
RuntimeError: If HCCL is not available or MindSpore is GPU/CPU version.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
>>> import mindspore as ms
>>> from mindspore import set_context
>>> from mindspore import ops
>>> from mindspore.communication import init, create_group, destroy_group, get_rank
>>> set_context(mode=ms.GRAPH_MODE)
>>> ms.set_device(device_target="Ascend")
>>> init()
>>> group = "0-2"
>>> rank_ids = [0,2]
>>> if get_rank() in rank_ids:
... create_group(group, rank_ids)
... destroy_group(group)
if not isinstance(group, str):
raise TypeError("For 'destroy_group', the argument 'group' must be type of string, "
"but got 'group' type : {}.".format(type(group)))
[文档]def get_process_group_ranks(group=GlobalComm.WORLD_COMM_GROUP):
Gets the ranks of the specific group and returns the process ranks in the communication group as a list.
group (str, optional): The communication group to work on. Default: ``GlobalComm.WORLD_COMM_GROUP`` , which
means ``"hccl_world_group"`` in Ascend, and ``"nccl_world_group"`` in GPU.
List (List[int]), List of process ranks in the specified communication group.
TypeError: If the `group` is not a str.
RuntimeError: If device target is invalid, or backend is invalid, or distributed initialization fails.
Supported Platforms:
``Ascend`` ``GPU``
.. note::
Before running the following examples, you need to configure the communication environment variables.
For Ascend/GPU/CPU devices, it is recommended to use the msrun startup method
without any third-party or configuration file dependencies.
Please see the `msrun start up
for more details.
This example should be run with 4 devices.
>>> import numpy as np
>>> from mindspore.communication import init, get_process_group_ranks
>>> init()
>>> output = get_process_group_ranks()
>>> print(output)
[0, 1, 2, 3]
return _get_group_ranks(group)