mindspore_gl.graph.ops 源代码

# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Operations for Graph."""
from typing import List, Union, Tuple
import math
from enum import Enum
import numpy as np

import mindspore_gl.array_kernel as array_kernel
import mindspore_gl.dataloader.shared_numpy as shared_numpy
from .graph import BatchMeta, MindHomoGraph
from .utils import SharedArrayPool, ArrayPool


[文档]class BatchHomoGraph: """ BatchHomoGraph, batch list of MindHomoGraph into a single MindHomoGraph with some batch_meta information. Inputs: - **graph_list** (List[MindHomoGraph]) - A list of MindHomoGraph. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore_gl.graph.ops import BatchHomoGraph >>> import numpy as np >>> from mindspore_gl.graph import MindHomoGraph >>> graph_list = [] >>> for _ in range(5): ... graph = MindHomoGraph() ... edges = np.array([[0, 2, 2, 3, 4, 5, 5, 6], [1, 0, 1, 5, 3, 4, 6, 4]]) ... graph.set_topo_coo(edges) ... graph.node_count = 7 ... graph.edge_count = 8 ... graph_list.append(graph) >>> batch_fn = BatchHomoGraph() >>> batch_graph = batch_fn(graph_list) >>> print(batch_graph.edge_count) 40 """ def __init__(self): self.res_coo = None def __call__(self, graph_list: List[MindHomoGraph], **kwargs) -> MindHomoGraph: """determine coo_length""" total_edge_count = 0 total_node_count = 0 graph_nodes = np.zeros([len(graph_list) + 1], dtype=np.int32) graph_edges = np.zeros([len(graph_list) + 1], dtype=np.int32) for idx, graph in enumerate(graph_list): total_edge_count += graph.edge_count total_node_count += graph.node_count graph_edges[idx + 1] = total_edge_count graph_nodes[idx + 1] = total_node_count if self.res_coo is None or self.res_coo.shape[1] < total_edge_count: del self.res_coo self.res_coo = np.zeros([2, total_edge_count], dtype=np.int32) # copy edge array node_offset = 0 edge_offset = 0 for graph in graph_list: self.res_coo[:, edge_offset: graph.edge_count + edge_offset] = graph.adj_coo + node_offset node_offset += graph.node_count edge_offset += graph.edge_count # Pack Result res_graph = MindHomoGraph() res_graph.set_topo_coo(np.copy(self.res_coo[:, :total_edge_count])) res_graph.node_count = total_node_count res_graph.edge_count = total_edge_count batch_meta = BatchMeta(graph_nodes=graph_nodes, graph_edges=graph_edges) res_graph.batch_meta = batch_meta return res_graph
[文档]class UnBatchHomoGraph: """ Return list of MindHomoGraph from a Batched MindHomoGraph. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore_gl.graph.ops import BatchHomoGraph >>> import numpy as np >>> from mindspore_gl.graph import MindHomoGraph >>> graph_list = [] >>> for _ in range(5): ... graph = MindHomoGraph() ... edges = np.array([[0, 2, 2, 3, 4, 5, 5, 6], [1, 0, 1, 5, 3, 4, 6, 4]]) ... graph.set_topo_coo(edges) ... graph.node_count = 7 ... graph.edge_count = 8 ... graph_list.append(graph) >>> batch_fn = BatchHomoGraph() >>> batch_graph = batch_fn(graph_list) >>> unbatch_fn = UnBatchHomoGraph() >>> unbatch_graph = unbatch_fn(batch_graph) >>> print(unbatch_graph[0].edge_count) 8 """ def __init__(self): pass def __call__(self, graph: MindHomoGraph, **kwargs) -> List[MindHomoGraph]: assert graph.is_batched, "UnBatchHomoGraph can only be operated on batched_graph" res: List[MindHomoGraph] = [] for idx in range(graph.batch_meta.graph_count): res.append(graph[idx]) return res
[文档]class PadMode(Enum): """ Padding Mode, for graph and 2D array. - PadMode.CONST: padding the array into user specified shape. - PadMode.AUTO: auto generate the padding shape. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore_gl.graph import PadMode >>> const = PadMode.CONST >>> print(const.name, const.value) CONST 1 >>> auto = PadMode.AUTO >>> print(auto.name, auto.value) AUTO 2 """ CONST = 1 AUTO = 2
[文档]class PadDirection(Enum): """ Padding Direction for 2d array specifically. - PadDirection.ROW: padding in the direction of the row. - PadDirection.COL: padding in the direction of the col. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore_gl.graph import PadDirection >>> row = PadDirection.ROW >>> print(row.name, row.value) ROW 1 >>> col = PadDirection.COL >>> print(col.name, col.value) COL 2 """ ROW = 1 COL = 2
[文档]class PadArray2d: r""" PadArray2d, specific pad operator for 2D array. .. warning:: PadArray2d will reuse memory buffer to speedup pad operation. Args: dtype(numpy.dtype): To determine result's data type. direction(PadDirection): Pad direction for array, PadDirection. ROW means we will pad along axis=1, PadDirection.COl means we will pad along axis=0. fill_value(Union[float, int, optional]): Fill value for padded region. Default: None. reset_with_fill_value(bool, optional): PadArray2d will reuse memory buffer, you can set this value to False if you dont care about the padded value. Default: True. mode(PadMode, optional): Pad mode for array, if PadMode.CONST, this op will pad array to user-specific size. If PadMode.AUTO, this will choose padded result length according to input's length. The expected length can be calculated as :math:`length=2^{ceil\left ( \log_{2}{input\_length} \right ) }` Default: mindspore_gl.graph.PadMode.AUTO. size(Union[List, Tuple, optional]): User specific size for padding result. Default: None. use_shared_numpy(bool, optional): If we use SharedNDArray for speeding up inter process communication. This is recommended if you do feature collection and feature padding in child process and need inter process communication for graph feature. Default: False. Inputs: - **input_array** (numpy.array) - input numpy array for pad. Raises: ValueError: pad size should be provided when padding mode is PadMode.CONST. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore_gl.graph.ops import PadArray2d, PadMode, PadDirection >>> pad_op = PadArray2d(dtype=np.float32, mode=PadMode.CONST, direction=PadDirection.COL, ... size=(3, 1), fill_value=0) >>> node_list = np.array([[1]]) >>> res = pad_op(node_list) >>> print(res) [[1.] [0.] [0.]] """ def __init__(self, dtype, direction, fill_value=None, reset_with_fill_value=True, mode=PadMode.AUTO, size=None, use_shared_numpy=False): if mode == PadMode.CONST: assert size is not None and dtype is not None and fill_value is not None, \ "pad size should be provided when padding mode is PadMode.CONST" self.pad_mode = mode self.pad_direction = direction self.fill_value = fill_value self.dtype = dtype self.homo_batch = BatchHomoGraph() self.reset_with_fill_value = reset_with_fill_value self.use_shared_numpy = use_shared_numpy self.size = size if self.use_shared_numpy: self.array_pool = SharedArrayPool() else: self.array_pool = ArrayPool() if mode == PadMode.CONST: if self.use_shared_numpy: memory_buffer = shared_numpy.SharedNDArray.from_shape(size, dtype=dtype) self.array_pool.put(size, memory_buffer) else: memory_buffer = np.zeros(size, dtype=dtype) self.array_pool.put(size, memory_buffer) def __call__(self, input_array, **kwargs): """ Pad Array """ fill_value = kwargs.get("fill_value", None) if self.pad_mode == PadMode.CONST: memory_buffer = self.array_pool.pop(self.size) # If Memory Buffer Is None, It's Definitely SharedNDArray if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(self.size, dtype=self.dtype) self.array_pool.put(self.size, memory_buffer) if self.pad_direction == PadDirection.ROW: memory_buffer[:, :input_array.shape[1]] = input_array if self.reset_with_fill_value: memory_buffer[:, input_array.shape[1]:] = self.fill_value else: if input_array.dtype == np.float32 and self.dtype == np.float32: array_kernel.float_2d_array_col_copy(memory_buffer, input_array) else: memory_buffer[:input_array.shape[0]] = input_array if self.reset_with_fill_value: memory_buffer[input_array.shape[0]:] = self.fill_value # Put Back To Memory Buffer self.array_pool.put(self.size, memory_buffer) return memory_buffer memory_buffer = None target_size = None if self.pad_direction == PadDirection.ROW: bucket_length = math.ceil(math.log2(input_array.shape[1])) target_size = [input_array.shape[0], 1 << bucket_length] if fill_value is None: fill_value = self.fill_value or (1 << bucket_length) - 1 memory_buffer = self.array_pool.pop(target_size) if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(target_size, self.dtype) self.array_pool.put(target_size, memory_buffer) memory_buffer[:, :input_array.shape[1]] = input_array if self.reset_with_fill_value: memory_buffer[:, input_array.shape[1]:] = fill_value else: bucket_length = math.ceil(math.log2(input_array.shape[0])) target_size = [1 << bucket_length, input_array.shape[1]] if fill_value is None: fill_value = self.fill_value or (1 << bucket_length) - 1 memory_buffer = self.array_pool.pop(target_size) if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(target_size, self.dtype) self.array_pool.put(target_size, memory_buffer) memory_buffer[:input_array.shape[0]] = input_array if self.reset_with_fill_value: memory_buffer[input_array.shape[0]:] = fill_value # Put Back To Memory Buffer self.array_pool.put(target_size, memory_buffer) return memory_buffer
[文档] def lazy(self, shape: Union[List, Tuple], **kwargs): """ Lazy Array Pad, this will just determine padded result shape and return an empty array with target shape. Args: shape(Union[List, Tuple]): input array's shape for pad. kwargs(dict): config dict - **fill_value** (Union[int, float]): fill the padding array with value. Returns: memory_buffer(numpy.ndarray), an empty numpy array with target padded shape. """ fill_value = kwargs.get("fill_value", None) if self.pad_mode == PadMode.CONST: memory_buffer = self.array_pool.pop(self.size) # If Memory Buffer Is None, It's Definitely SharedNDArray if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(self.size, dtype=self.dtype) if self.reset_with_fill_value: if self.pad_direction == PadDirection.ROW: memory_buffer[:, shape[1]:] = self.fill_value else: memory_buffer[shape[0]:] = self.fill_value # Put Back To Memory Buffer self.array_pool.put(self.size, memory_buffer) return memory_buffer memory_buffer = None target_size = None if self.pad_direction == PadDirection.ROW: bucket_length = math.ceil(math.log2(shape[1])) target_size = [shape[0], 1 << bucket_length] if fill_value is None: fill_value = self.fill_value or (1 << bucket_length) - 1 memory_buffer = self.array_pool.pop(target_size) if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(target_size, self.dtype) if self.reset_with_fill_value: memory_buffer[:, shape[1]:] = fill_value else: bucket_length = math.ceil(math.log2(shape[0])) target_size = [1 << bucket_length, shape[1]] if fill_value is None: fill_value = self.fill_value or (1 << bucket_length) - 1 memory_buffer = self.array_pool.pop(target_size) if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(target_size, self.dtype) if self.reset_with_fill_value: memory_buffer[shape[0]:] = fill_value # Put Back To Memory Buffer self.array_pool.put(target_size, memory_buffer) return memory_buffer
[文档]class PadHomoGraph: r""" Pad MindHomoGraph, We pad graph by adding additional nodes and edges between these nodes. In short, :math:`PadHomoGraph(graph1) = BatchHomoGraph(graph1, fake\_graph)` node count and edge count in fake_graph is determined by user-specific parameters. Args: n_node(int, optional): target graph's node count. Default: None. n_edge(int, optional): target graph's edge count. Default: None. mode(PadMode, optional): Pad mode, if PadMode.CONST, target graph will have n_node nodes and n_edge edges. If PadMode.AUTO target graph's node_count and edge_count is calculated according to input graph's size by :math:`n\_node = 2^{ceil(log2(input\_graph.node\_count))}` , :math:`n\_edge = 2^{ceil(log2(input\_graph.edge\_count))}` . Default: PadMode.AUTO. csr(bool, optional): Is the csr graph. Default: False. Inputs: - **graph** (MindHomoGraph) - input graph. Outputs: - MindHomoGraph, padded graph. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindspore_gl.graph.ops import BatchHomoGraph, PadHomoGraph, PadMode >>> import numpy as np >>> from mindspore_gl.graph.graph import MindHomoGraph >>> graph_list = [] >>> for _ in range(1): ... graph = MindHomoGraph() ... edges = np.array([[0, 2, 2, 3, 4, 5, 5, 6], [1, 0, 1, 5, 3, 4, 6, 4]]) ... graph.set_topo_coo(edges) ... graph.node_count = 7 ... graph.edge_count = 8 ... graph_list.append(graph) >>> batch_fn = BatchHomoGraph() >>> batch_graph = batch_fn(graph_list) >>> n_node = graph.node_count + 1 >>> n_edge = graph.edge_count + 30 >>> pad_graph_op = PadHomoGraph(mode=PadMode.CONST, n_node=n_node, n_edge=n_edge) >>> pad_res = pad_graph_op(batch_graph) >>> print(pad_res[0].edge_count, pad_res[1].edge_count) 8 30 >>> print(pad_res[0].node_count, pad_res[1].node_count) 7 1 """ def __init__(self, n_node=None, mode=PadMode.AUTO, n_edge=None, csr=False): if mode == PadMode.CONST: assert n_edge is not None and n_node is not None, \ "n_node and n_edge should be given when padding with CONST Mode" self.n_node = n_node self.mode = mode self.n_edge = n_edge self.batch_op = BatchHomoGraph() self.csr = csr def __call__(self, graph: MindHomoGraph, **kwargs) -> MindHomoGraph: """ Do pad operation. """ # Check Input Graph is Valid To Pad res_graph = MindHomoGraph() if self.mode is PadMode.CONST: assert graph.edge_count < self.n_edge, \ "Given graph is too large for the given padding" if graph.is_batched: if self.mode == PadMode.CONST: # No Need To Pad if graph.edge_count == self.n_edge: return graph # Determine Padded Graph if self.csr: pad_graph_coo = generate_fill_array(graph.adj_coo, (2, self.n_edge), self.n_node - 1) else: pad_graph_coo = np.full([2, self.n_edge - graph.edge_count], self.n_node - 1, dtype=np.int32) pad_graph = MindHomoGraph() pad_graph.adj_coo = pad_graph_coo pad_graph.node_count = self.n_node - graph.node_count pad_graph.edge_count = self.n_edge - graph.edge_count # Pad Graph res_graph.adj_coo = np.concatenate([graph.adj_coo, pad_graph.adj_coo], axis=1) res_graph_graph_nodes = np.concatenate([graph.batch_meta.graph_nodes, np.array([self.n_node], dtype=np.int32)]) res_graph_graph_edges = np.concatenate([graph.batch_meta.graph_edges, np.array([self.n_edge], dtype=np.int32)]) res_graph.batch_meta = BatchMeta(graph_nodes=res_graph_graph_nodes, graph_edges=res_graph_graph_edges) res_graph.edge_count = self.n_edge res_graph.node_count = self.n_node return res_graph # No Need To Pad if graph.edge_count == 1 << math.ceil(math.log2(graph.edge_count)): return graph # Determine Pad Graph edge_bucket_length = math.ceil(math.log2(graph.edge_count)) padded_graph_edge_count = (1 << edge_bucket_length) - graph.edge_count padded_graph_node_count = (1 << math.ceil(math.log2(graph.node_count))) - graph.node_count pad_graph = MindHomoGraph() pad_value = (1 << math.ceil(math.log2(graph.node_count))) - 1 pad_graph.adj_coo = np.full([2, padded_graph_edge_count], pad_value, dtype=np.int32) pad_graph.node_count = padded_graph_node_count pad_graph.edge_count = padded_graph_edge_count # Pad Graph res_graph.adj_coo = np.concatenate([graph.adj_coo, pad_graph.adj_coo], axis=1) res_graph_graph_nodes = np.concatenate([graph.batch_meta.graph_nodes, np.array([1 << math.ceil(math.log2(graph.node_count))], dtype=np.int32)]) res_graph_graph_edges = np.concatenate([graph.batch_meta.graph_edges, np.array([1 << edge_bucket_length], dtype=np.int32)]) res_graph.batch_meta = BatchMeta(graph_nodes=res_graph_graph_nodes, graph_edges=res_graph_graph_edges) res_graph.edge_count = graph.edge_count + pad_graph.edge_count res_graph.node_count = graph.node_count + pad_graph.node_count return res_graph if self.mode == PadMode.CONST: # No Need To Pad if graph.edge_count == self.n_edge: return graph # Determine Pad Graph pad_graph_coo = np.full([2, self.n_edge - graph.edge_count], self.n_node - 1, dtype=np.int32) pad_graph = MindHomoGraph() pad_graph.adj_coo = pad_graph_coo pad_graph.node_count = self.n_node - graph.node_count pad_graph.edge_count = self.n_edge - graph.edge_count return self.batch_op([graph, pad_graph]) # No Need To Pad if graph.edge_count == 1 << math.ceil(math.log2(graph.edge_count)): return graph edge_bucket_length = math.ceil(math.log2(graph.edge_count)) padded_graph_edge_count = (1 << edge_bucket_length) - graph.edge_count padded_graph_node_count = (1 << math.ceil(math.log2(graph.node_count))) - graph.node_count pad_graph = MindHomoGraph() pad_value = (1 << math.ceil(math.log2(graph.node_count))) - 1 pad_graph.adj_coo = np.full([2, padded_graph_edge_count], pad_value, dtype=np.int32) pad_graph.node_count = padded_graph_node_count pad_graph.edge_count = padded_graph_edge_count return self.batch_op([graph, pad_graph])
class UnPadHomoGraph: """Empty placeholder""" def __init__(self): pass def __call__(self, graph: MindHomoGraph, **kwargs) -> MindHomoGraph: pass
[文档]class PadCsrEdge: r""" PadCsrEdge, specific pad operator for coo edges. After padding, the shape of the coo edge index to the csr indices and indptr becomes unified. .. warning:: PadArray2d will reuse memory buffer to speedup pad operation. Args: pad_nodes(int): nodes numbers of the graph. reset_with_fill_value(bool, optional): PadArray2d will reuse memory buffer, you can set this value to False if you dont care about the padded value. Default: True. length(int, optional): User specific length for padding result. Default: None. mode(PadMode, optional): Pad mode for array, if PadMode.CONST, this op will pad array to user-specific size. If PadMode.AUTO, this will choose padded result length according to input's length. The expected length can be calculated as :math:`length=2^{ceil\left ( \log_{2}{input\_length} \right ) }` Default: mindspore_gl.graph.PadMode.AUTO. use_shared_numpy(bool, optional): If we use SharedNDArray for speeding up inter process communication. This is recommended if you do feature collection and feature padding in child process and need inter process communication for graph feature. Default: False. Inputs: - **input_array** (numpy.array) - input numpy array for pad. Raises: ValueError: pad length should be provided when padding mode is PadMode.CONST. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> import numpy as np >>> from mindspore_gl.graph import PadCsrEdge, PadMode >>> node_pad = 10 >>> origin_edge_index = np.array([[0, 1, 2, 4], ... [2, 3, 1, 1]]) >>> pad_length = 20 >>> pad_op = PadCsrEdge(node_pad, length=pad_length, mode=PadMode.CONST) >>> res = pad_op(origin_edge_index) >>> print(res) [[0 1 2 4 5 6 7 8 5 6 7 8 5 6 7 8 5 6 7 8] [2 3 1 1 5 6 7 8 6 7 8 5 7 8 5 6 8 5 6 7]] """ def __init__(self, pad_nodes, reset_with_fill_value=True, length=None, mode=PadMode.AUTO, use_shared_numpy=False): self.pad_nodes = pad_nodes self.pad_mode = mode self.homo_batch = BatchHomoGraph() self.reset_with_fill_value = reset_with_fill_value self.use_shared_numpy = use_shared_numpy self.size = (2, length) if self.use_shared_numpy: self.array_pool = SharedArrayPool() else: self.array_pool = ArrayPool() if mode == PadMode.CONST: if self.use_shared_numpy: memory_buffer = shared_numpy.SharedNDArray.from_shape(self.size, dtype=np.int32) self.array_pool.put(self.size, memory_buffer) else: memory_buffer = np.zeros(self.size, dtype=np.int32) self.array_pool.put(self.size, memory_buffer) def __call__(self, input_array): """ Pad Array """ if self.pad_mode == PadMode.CONST: fill_array = generate_fill_array(input_array, self.size, self.pad_nodes) memory_buffer = self.array_pool.pop(self.size) # If Memory Buffer Is None, It's Definitely SharedNDArray if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(self.size, dtype=np.int32) self.array_pool.put(self.size, memory_buffer) memory_buffer[:, :input_array.shape[1]] = input_array if self.reset_with_fill_value: memory_buffer[:, input_array.shape[1]:] = fill_array # Put Back To Memory Buffer self.array_pool.put(self.size, memory_buffer) return memory_buffer bucket_length = math.ceil(math.log2(input_array.shape[1])) target_size = [2, 1 << bucket_length] fill_value = generate_fill_array(input_array, target_size, self.pad_nodes) memory_buffer = self.array_pool.pop(target_size) if memory_buffer is None: memory_buffer = shared_numpy.SharedNDArray.from_shape(target_size, np.int32) self.array_pool.put(target_size, memory_buffer) memory_buffer[:, :input_array.shape[1]] = input_array if self.reset_with_fill_value: memory_buffer[:, input_array.shape[1]:] = fill_value # Put Back To Memory Buffer self.array_pool.put(target_size, memory_buffer) return memory_buffer
def generate_fill_array(input_array, size, pad_nodes): """generate the fill array""" start = np.max(input_array) + 1 end = pad_nodes - 1 mini_length = size[1] - input_array.shape[1] fill_array = np.array([np.arange(start, end), np.arange(start, end)]) add_bias = 0 while fill_array.shape[1] < mini_length: add_array_0 = np.arange(start, end) add_array_1 = np.arange(start, end) add_array_1 = np.append(add_array_1[add_bias + 1:], add_array_1[:add_bias + 1]) add_array = np.array([add_array_0, add_array_1]) fill_array = np.concatenate((fill_array, add_array), axis=1) add_bias += 1 if add_bias == end - start: break fill_array = fill_array[:, :mini_length] return fill_array