mindspore.ops.CollectiveGather
- class mindspore.ops.CollectiveGather(dest_rank, group=GlobalComm.WORLD_COMM_GROUP)[源代码]
对通信组的输入张量进行聚合。操作会将每张卡的输入Tensor的第0维度上进行聚合,发送到对应卡上。
说明
只有目标为dest_rank的进程(全局的进程编号)才会收到聚合操作后的输出。其他进程只得到一个形状为[1]的张量,且该张量没有数学意义。
- 参数:
dest_rank (int) - 表示发送目标的进程编号。只有该进程会接收汇聚张量。
group (str,可选) - 表示通信域。默认值:
GlobalComm.WORLD_COMM_GROUP
。
- 输入:
input_x (Tensor) - 输入待聚合的Tensor,Tensor的shape为 \((x_1, x_2, ..., x_R)\) 。
- 输出:
Tensor,Tensor第0维等于各输入数据第0维的累加,其他shape维度相同 即 \((\sum x_1, x_2, ..., x_R)\) 。
- 异常:
TypeError - 首个输入的数据类型不为Tensor,op 或 group 不是str。
RuntimeError - 如果目标设备无效,或者后端无效,或者分布式初始化失败。
ValueError - 调用进程的rank id大于本通信组的rank大小。
- 支持平台:
Ascend
样例:
>>> import numpy as np >>> import mindspore as ms >>> import mindspore.nn as nn >>> from mindspore.communication import init >>> from mindspore import Tensor >>> from mindspore import ops >>> # Launch 2 processes. >>> >>> ms.set_context(mode=ms.GRAPH_MODE) >>> init() >>> class CollectiveGatherNet(nn.Cell): ... def __init__(self): ... super(CollectiveGatherNet, self).__init__() ... self.collective_gather = ops.CollectiveGather(dest_rank=0) ... ... def construct(self, x): ... return self.collective_gather(x) ... >>> input = Tensor(np.arange(4).reshape([2, 2]).astype(np.float32)) >>> net = CollectiveGatherNet() >>> output = net(input) >>> print(output) Process with rank 0: [[0. 1.], [2. 3.], [0. 1.], [2. 3.]] Process with rank 1: [0.]