mindspore.dataset.GeneratorDataset

class mindspore.dataset.GeneratorDataset(source, column_names=None, column_types=None, schema=None, num_samples=None, num_parallel_workers=1, shuffle=None, sampler=None, num_shards=None, shard_id=None, python_multiprocessing=True, max_rowsize=6)[源代码]

自定义Python数据源,通过迭代该数据源构造数据集。生成的数据集的列名和列类型取决于用户定义的Python数据源。

参数:
  • source (Union[Callable, Iterable, Random Accessible]) - 一个Python的可调用对象,可以是可迭代的Python对象,或支持随机访问的Python对象。

    • 如果 source 是可调用对象,要求 source 对象可以通过 source().next() 的方式返回一个由NumPy数组构成的元组。

    • 如果 source 是可迭代对象,要求 source 对象通过 iter(source).next() 的方式返回一个由NumPy数组构成的元组。

    • 如果 source 是支持随机访问的对象,要求 source 对象通过 source[idx] 的方式返回一个由NumPy数组构成的元组。

  • column_names (Union[str, list[str]],可选) - 指定数据集生成的列名。默认值: None ,不指定。用户可以通过此参数或 schema 参数指定列名。

  • column_types (list[mindspore.dtype],可选) - 指定生成数据集各个数据列的数据类型。默认值: None ,不指定。 如果未指定该参数,则自动推断类型;如果指定了该参数,将在数据输出时做类型匹配检查。

  • schema (Union[str, Schema], 可选) - 数据格式策略,用于指定读取数据列的数据类型、数据维度等信息。 支持传入JSON文件路径或 mindspore.dataset.Schema 构造的对象。默认值: None

  • num_samples (int, 可选) - 指定从数据集中读取的样本数。默认值: None ,读取全部样本。

  • num_parallel_workers (int, 可选) - 指定读取数据的工作进程数/线程数(由参数 python_multiprocessing 决定当前为多进程模式或多线程模式)。默认值: 1

  • shuffle (bool,可选) - 是否混洗数据集。只有输入的 source 参数带有可随机访问属性(__getitem__)时,才可以指定该参数。默认值: None 。下表中会展示不同配置的预期行为。

  • sampler (Union[Sampler, Iterable],可选) - 指定从数据集中选取样本的采样器。只有输入的 source 参数带有可随机访问属性(__getitem__)时,才可以指定该参数。默认值: None 。下表中会展示不同配置的预期行为。

  • num_shards (int, 可选) - 指定分布式训练时将数据集进行划分的分片数。默认值: None 。指定此参数后, num_samples 表示每个分片的最大样本数。

  • shard_id (int, 可选) - 指定分布式训练时使用的分片ID号。默认值: None 。只有当指定了 num_shards 时才能指定此参数。

  • python_multiprocessing (bool,可选) - 启用Python多进程模式加速运算。默认值: True 。当传入 source 的Python对象的计算量很大时,开启此选项可能会有较好效果。

  • max_rowsize (int, 可选) - 指定在多进程之间复制数据时,共享内存分配的基本单位,总占用的共享内存会随着 num_parallel_workersmindspore.dataset.config.set_prefetch_size() 增加而变大,仅当参数 python_multiprocessing 设为 True 时,此参数才会生效。默认值: 6 ,单位为MB。

异常:
  • RuntimeError - Python对象 source 在执行期间引发异常。

  • RuntimeError - column_names 参数指定的列名数量与 source 参数输出的数据数量不匹配。

  • ValueError - num_parallel_workers 参数超过最大线程数。

  • ValueError - 同时指定了 samplershuffle 参数。

  • ValueError - 同时指定了 samplernum_shards 参数或同时指定了 samplershard_id 参数。

  • ValueError - 指定了 num_shards 参数,但是未指定 shard_id 参数。

  • ValueError - 指定了 shard_id 参数,但是未指定 num_shards 参数。

  • ValueError - 如果 shard_id 取值不在[0, num_shards )范围。

样例:

>>> import mindspore.dataset as ds
>>> import numpy as np
>>>
>>> # 1) Multidimensional generator function as callable input.
>>> def generator_multidimensional():
...     for i in range(64):
...         yield (np.array([[i, i + 1], [i + 2, i + 3]]),)
>>>
>>> dataset = ds.GeneratorDataset(source=generator_multidimensional, column_names=["multi_dimensional_data"])
>>>
>>> # 2) Multi-column generator function as callable input.
>>> def generator_multi_column():
...     for i in range(64):
...         yield np.array([i]), np.array([[i, i + 1], [i + 2, i + 3]])
>>>
>>> dataset = ds.GeneratorDataset(source=generator_multi_column, column_names=["col1", "col2"])
>>>
>>> # 3) Iterable dataset as iterable input.
>>> class MyIterable:
...     def __init__(self):
...         self._index = 0
...         self._data = np.random.sample((5, 2))
...         self._label = np.random.sample((5, 1))
...
...     def __next__(self):
...         if self._index >= len(self._data):
...             raise StopIteration
...         else:
...             item = (self._data[self._index], self._label[self._index])
...             self._index += 1
...             return item
...
...     def __iter__(self):
...         self._index = 0
...         return self
...
...     def __len__(self):
...         return len(self._data)
>>>
>>> dataset = ds.GeneratorDataset(source=MyIterable(), column_names=["data", "label"])
>>>
>>> # 4) Random accessible dataset as random accessible input.
>>> class MyAccessible:
...     def __init__(self):
...         self._data = np.random.sample((5, 2))
...         self._label = np.random.sample((5, 1))
...
...     def __getitem__(self, index):
...         return self._data[index], self._label[index]
...
...     def __len__(self):
...         return len(self._data)
>>>
>>> dataset = ds.GeneratorDataset(source=MyAccessible(), column_names=["data", "label"])
>>>
>>> # list, dict, tuple of Python is also random accessible
>>> dataset = ds.GeneratorDataset(source=[(np.array(0),), (np.array(1),), (np.array(2),)], column_names=["col"])
教程样例:

说明

  • 如果配置 python_multiprocessing=True (默认值: True ) 和 num_parallel_workers>1 (默认值:1) 表示启动了多进程方式进行数据load加速, 此时随着数据集迭代,子进程的内存占用会逐渐增加,主要是因为自定义数据集的子进程以 Copy-On-Write 的方式获取主进程中的成员变量。 举例:如果自定义数据集 __init__ 函数中包含大量成员变量数据(例如:在数据集构建时加载了一个非常大的文件名列表)并且使用了多进程方式, 那这可能会导致产生OOM的问题(总内存的预估使用量是:(子进程数量 + 1) * 父进程的内存大小)。最简单的解决方法是成员变量用非引用数据类型 (如:Pandas、Numpy或PyArrow对象)替换Python对象(如:list / dict / int / float / string等),或者加载更少的元数据以减小成员变量, 或者配置 python_multiprocessing=False 使用多线程方式。

    下面有几个类/函数可以帮助你减少成员变量的大小,你可以选择来使用:

    1. mindspore.dataset.utils.LineReader :在 __init__ 函数中,使用该类初始化你的文本文件对象,然后在 __getitem__ 函数中通过该对象按行号读取文件内容。

  • source 参数接收用户自定义的Python函数(PyFuncs),不要将 mindspore.nnmindspore.ops 目录下或其他的网络计算算子添加 到 source 中。

  • 入参 num_samplesshufflenum_shardsshard_id 可用于控制数据集所使用的采样器,其与入参 sampler 搭配使用的效果如下。

参数 samplernum_samplesshufflenum_shardsshard_id 的不同组合得到的采样器

参数 sampler

参数 num_shards / shard_id

参数 shuffle

参数 num_samples

使用的采样器

mindspore.dataset.Sampler 类型

None

None

None

sampler

numpy.ndarray,list,tuple,int 类型

/

/

num_samples

SubsetSampler(indices = sampler , num_samples = num_samples )

iterable 类型

/

/

num_samples

IterSampler(sampler = sampler , num_samples = num_samples )

None

num_shards / shard_id

None / True

num_samples

DistributedSampler(num_shards = num_shards , shard_id = shard_id , shuffle = True , num_samples = num_samples )

None

num_shards / shard_id

False

num_samples

DistributedSampler(num_shards = num_shards , shard_id = shard_id , shuffle = False , num_samples = num_samples )

None

None

None / True

None

RandomSampler(num_samples = num_samples )

None

None

None / True

num_samples

RandomSampler(replacement = True , num_samples = num_samples )

None

None

False

num_samples

SequentialSampler(num_samples = num_samples )

预处理操作

mindspore.dataset.Dataset.apply

对数据集对象执行给定操作函数。

mindspore.dataset.Dataset.concat

对传入的多个数据集对象进行拼接操作。

mindspore.dataset.Dataset.filter

通过自定义判断条件对数据集对象中的数据进行过滤。

mindspore.dataset.Dataset.flat_map

对数据集对象中每一条数据执行给定的数据处理,并将结果展平。

mindspore.dataset.Dataset.map

给定一组数据增强列表,按顺序将数据增强作用在数据集对象上。

mindspore.dataset.Dataset.project

从数据集对象中选择需要的列,并按给定的列名的顺序进行排序。

mindspore.dataset.Dataset.rename

对数据集对象按指定的列名进行重命名。

mindspore.dataset.Dataset.repeat

重复此数据集 count 次。

mindspore.dataset.Dataset.reset

重置下一个epoch的数据集对象。

mindspore.dataset.Dataset.save

将数据处理管道中正处理的数据保存为通用的数据集格式。

mindspore.dataset.Dataset.shuffle

通过创建 buffer_size 大小的缓存来混洗该数据集。

mindspore.dataset.Dataset.skip

跳过此数据集对象的前 count 条数据。

mindspore.dataset.Dataset.split

将数据集拆分为多个不重叠的子数据集。

mindspore.dataset.Dataset.take

截取数据集的前指定条数据。

mindspore.dataset.Dataset.zip

将多个dataset对象按列进行合并压缩,多个dataset对象不能有相同的列名。

Batch(批操作)

mindspore.dataset.Dataset.batch

将数据集中连续 batch_size 条数据组合为一个批数据,并可通过可选参数 per_batch_map 指定组合前要进行的预处理操作。

mindspore.dataset.Dataset.bucket_batch_by_length

根据数据的长度进行分桶。

mindspore.dataset.Dataset.padded_batch

将数据集中连续 batch_size 条数据组合为一个批数据,并可通过可选参数 pad_info 预先将样本补齐。

迭代器

mindspore.dataset.Dataset.create_dict_iterator

基于数据集对象创建迭代器。

mindspore.dataset.Dataset.create_tuple_iterator

基于数据集对象创建迭代器。

数据集属性

mindspore.dataset.Dataset.get_batch_size

获得数据集对象定义的批处理大小,即一个批处理数据中包含的数据条数。

mindspore.dataset.Dataset.get_class_indexing

获取类别名称到类别索引的映射字典。

mindspore.dataset.Dataset.get_col_names

返回数据集对象中包含的列名。

mindspore.dataset.Dataset.get_dataset_size

返回一个epoch中的batch数。

mindspore.dataset.Dataset.get_repeat_count

获取 RepeatDataset 中定义的repeat操作的次数。

mindspore.dataset.Dataset.input_indexs

获取/设置数据列索引,它表示使用下沉模式时数据列映射至网络中的对应关系。

mindspore.dataset.Dataset.num_classes

获取数据集对象中所有样本的类别数目。

mindspore.dataset.Dataset.output_shapes

获取数据集对象中每列数据的shape。

mindspore.dataset.Dataset.output_types

获取数据集对象中每列数据的数据类型。

应用采样方法

mindspore.dataset.MappableDataset.add_sampler

为当前数据集添加子采样器。

mindspore.dataset.MappableDataset.use_sampler

替换当前数据集的最末子采样器,保持父采样器不变。

其他方法

mindspore.dataset.Dataset.sync_update

释放阻塞条件并使用给定数据触发回调函数。

mindspore.dataset.Dataset.sync_wait

为同步操作在数据集对象上添加阻塞条件。

mindspore.dataset.Dataset.to_json

将数据处理管道序列化为JSON字符串,如果提供了文件名,则转储到文件中。