mindspore.dataset.InMemoryGraphDataset

class mindspore.dataset.InMemoryGraphDataset(data_dir, save_dir='./processed', column_names='graph', num_samples=None, num_parallel_workers=1, shuffle=None, num_shards=None, shard_id=None, python_multiprocessing=True, max_rowsize=6)[源代码]

用于将图数据加载到内存中的Dataset基类。

建议通过继承这个基类来实现自定义Dataset,并重写相应的方法,如 processsaveload ,可参考 ArgoverseDataset 源码。自定义Dataset的初始化过程如下,首先检查在给定的 data_dir 中是否已经有处理好的数据。如果是则调用 load 方法直接加载它,否则将调用 process 方法创建图,并调用 save 方法将图保存到 save_dir

可以访问所创建dataset中的图并使用,例如 graphs = my_dataset.graphs,也可以迭代dataset对象如 my_dataset.create_tuple_iterator() 来获取数据(这时需要实现 __getitem____len__)方法,具体请参考以下示例。注意:内部逻辑指定了 __new__ 阶段会重新初始化 __init__ ,如果自定义图实现了 __new__ 方法,该方法将失效。

参数:
  • data_dir (str) - 加载数据集的目录,这里包含原始格式的数据,并将在 process 方法中被加载。

  • save_dir (str) - 保存处理后得到的数据集的相对目录,该目录位于 data_dir 下面。默认值:”./processed”。

  • column_names (Union[str, list[str]],可选) - dataset包含的单个列名或多个列名组成的列表。默认值:’graph’。当实现类似 __getitem__ 等方法时,列名的数量应该等于该方法中返回数据的条数。

  • num_samples (int, 可选) - 指定从数据集中读取的样本数。默认值:None,读取全部样本。

  • num_parallel_workers (int, 可选) - 指定读取数据的工作进程数/线程数(由参数 python_multiprocessing 决定当前为多进程模式或多线程模式)。默认值:1。

  • shuffle (bool,可选) - 是否混洗数据集。当实现的Dataset带有可随机访问属性( __getitem__ )时,才可以指定该参数。默认值:None。

  • num_shards (int, 可选) - 指定分布式训练时将数据集进行划分的分片数。默认值:None。指定此参数后, num_samples 表示每个分片的最大样本数。

  • shard_id (int, 可选) - 指定分布式训练时使用的分片ID号。默认值:None。只有当指定了 num_shards 时才能指定此参数。

  • python_multiprocessing (bool,可选) - 启用Python多进程模式加速运算。默认值:True。当传入 source 的Python对象的计算量很大时,开启此选项可能会有较好效果。

  • max_rowsize (int, 可选) - 指定在多进程之间复制数据时,共享内存分配的最大空间。默认值:6,单位为MB。仅当参数 python_multiprocessing 设为True时,此参数才会生效。

异常:
  • TypeError - 如果 data_dir 不是str类型。

  • TypeError - 如果 save_dir 不是str类型。

  • TypeError - 如果 num_parallel_workers 不是int类型。

  • TypeError - 如果 shuffle 不是bool类型。

  • TypeError - 如果 python_multiprocessing 不是bool类型。

  • TypeError - 如果 perf_mode 不是bool类型。

  • RuntimeError - 如果 data_dir 无效或不存在。

  • RuntimeError - 指定了 num_shards 参数,但是未指定 shard_id 参数。

  • RuntimeError - 指定了 shard_id 参数,但是未指定 num_shards 参数。

  • ValueError - num_parallel_workers 参数超过系统最大线程数。

样例:

>>> from mindspore.dataset import InMemoryGraphDataset, Graph
>>>
>>> class MyDataset(InMemoryGraphDataset):
...     def __init__(self, data_dir):
...         super().__init__(data_dir)
...
...     def process(self):
...         # create graph with loading data in given data_dir
...         # here create graph with numpy array directly instead
...         edges = np.array([[0, 1], [1, 2]])
...         graph = Graph(edges=edges)
...         self.graphs.append(graph)
...
...     def __getitem__(self, index):
...         # this method and '__len__' method are required when iterating created dataset
...         graph = self.graphs[index]
...         return graph.get_all_edges('0')
...
...     def __len__(self):
...         return len(self.graphs)
load()[源代码]

从给定(处理好的)路径加载数据,也可以在自己实现的Dataset类中实现这个方法。

process()[源代码]

与原始数据集相关的处理方法,建议在自定义的Dataset中重写此方法。

save()[源代码]

将经过 process 函数处理后的数据以 numpy.npz 格式保存到磁盘中,也可以在自己实现的Dataset类中自己实现这个方法。

预处理操作

mindspore.dataset.Dataset.apply

对数据集对象执行给定操作函数。

mindspore.dataset.Dataset.concat

对传入的多个数据集对象进行拼接操作。

mindspore.dataset.Dataset.filter

通过自定义判断条件对数据集对象中的数据进行过滤。

mindspore.dataset.Dataset.flat_map

对数据集对象中每一条数据执行给定的数据处理,并将结果展平。

mindspore.dataset.Dataset.map

给定一组数据增强列表,按顺序将数据增强作用在数据集对象上。

mindspore.dataset.Dataset.project

从数据集对象中选择需要的列,并按给定的列名的顺序进行排序。

mindspore.dataset.Dataset.rename

对数据集对象按指定的列名进行重命名。

mindspore.dataset.Dataset.repeat

重复此数据集 count 次。

mindspore.dataset.Dataset.reset

重置下一个epoch的数据集对象。

mindspore.dataset.Dataset.save

将数据处理管道中正处理的数据保存为通用的数据集格式。

mindspore.dataset.Dataset.shuffle

通过创建 buffer_size 大小的缓存来混洗该数据集。

mindspore.dataset.Dataset.skip

跳过此数据集对象的前 count 条数据。

mindspore.dataset.Dataset.split

将数据集拆分为多个不重叠的子数据集。

mindspore.dataset.Dataset.take

从数据集中获取最多 count 的元素。

mindspore.dataset.Dataset.zip

将多个dataset对象按列进行合并压缩,多个dataset对象不能有相同的列名。

Batch(批操作)

mindspore.dataset.Dataset.batch

将数据集中连续 batch_size 条数据组合为一个批数据,并可通过可选参数 per_batch_map 指定组合前要进行的预处理操作。

mindspore.dataset.Dataset.bucket_batch_by_length

根据数据的长度进行分桶。

mindspore.dataset.Dataset.padded_batch

将数据集中连续 batch_size 条数据组合为一个批数据,并可通过可选参数 pad_info 预先将样本补齐。

迭代器

mindspore.dataset.Dataset.create_dict_iterator

基于数据集对象创建迭代器。

mindspore.dataset.Dataset.create_tuple_iterator

基于数据集对象创建迭代器。

数据集属性

mindspore.dataset.Dataset.get_batch_size

获得数据集对象定义的批处理大小,即一个批处理数据中包含的数据条数。

mindspore.dataset.Dataset.get_class_indexing

返回类别索引。

mindspore.dataset.Dataset.get_col_names

返回数据集对象中包含的列名。

mindspore.dataset.Dataset.get_dataset_size

返回一个epoch中的batch数。

mindspore.dataset.Dataset.get_repeat_count

获取 RepeatDataset 中定义的repeat操作的次数。

mindspore.dataset.Dataset.input_indexs

获取/设置数据列索引,它表示使用下沉模式时数据列映射至网络中的对应关系。

mindspore.dataset.Dataset.num_classes

获取数据集对象中所有样本的类别数目。

mindspore.dataset.Dataset.output_shapes

获取数据集对象中每列数据的shape。

mindspore.dataset.Dataset.output_types

获取数据集对象中每列数据的数据类型。

应用采样方法

mindspore.dataset.MappableDataset.add_sampler

为当前数据集添加子采样器。

mindspore.dataset.MappableDataset.use_sampler

替换当前数据集的最末子采样器,保持父采样器不变。

其他方法

mindspore.dataset.Dataset.device_que

将数据异步传输到Ascend/GPU设备上。

mindspore.dataset.Dataset.sync_update

释放阻塞条件并使用给定数据触发回调函数。

mindspore.dataset.Dataset.sync_wait

为同步操作在数据集对象上添加阻塞条件。

mindspore.dataset.Dataset.to_json

将数据处理管道序列化为JSON字符串,如果提供了文件名,则转储到文件中。