mindflow.data
- class mindflow.data.BoundaryBC(geometry)[源代码]
边界条件采样数据。
- 参数:
geometry (Geometry) - 指定边界条件的几何体信息。几何体来源于geometry基类,可以是圆盘或者立方体等。
- 异常:
ValueError - 如果几何体的sampling_config.bc为None。
- 支持平台:
Ascend
GPU
样例:
>>> from mindflow.geometry import generate_sampling_config, Geometry >>> from mindflow.data import BoundaryBC >>> geometry_config = dict({'BC' : dict({'random_sampling' : True, 'size' : 100, 'sampler' : 'uniform',})}) >>> sampling_config = generate_sampling_config(geometry_config) >>> geom = Geometry("geom", 1, 0.0, 1.0, sampling_config=sampling_config) >>> boundary_bc = BoundaryBC(geometry=geom)
- class mindflow.data.BoundaryIC(geometry)[源代码]
初始条件的采样数据。
- 参数:
geometry (Geometry) - 指定初始条件的几何体信息。
- 异常:
ValueError - 如果几何体的sampling_config.ic为None。
- 支持平台:
Ascend
GPU
样例:
>>> from mindflow.geometry import generate_sampling_config, Geometry >>> from mindflow.data import BoundaryIC >>> geometry_config = dict({'IC' : dict({'random_sampling' : True, 'size' : 100, 'sampler' : 'uniform',})}) >>> sampling_config = generate_sampling_config(geometry_config) >>> geom = Geometry("geom", 1, 0.0, 1.0, sampling_config=sampling_config) >>> boundary_ic = BoundaryIC(geometry=geom)
- class mindflow.data.Dataset(geometry_dict=None, existed_data_list=None, dataset_list=None)[源代码]
将数据集合并在一起。
- 参数:
geometry_dict (dict, 可选) - 指定要合并的几何数据集。键为几何实例,值为几何体类型的列表。例如,geometry_dict = {geom : [“domain”, “BC”, “IC”]}。默认值:None。
existed_data_list (Union[list, tuple, ExistedDataConfig], 可选) - 指定要合并的现有数据集。例如,existed_data_list = [ExistedDataConfig_Instance1, ExistedDataConfig_Instance2]。默认值:None。
dataset_list (Union[list, tuple, Data], 可选) - 指定要合并的数据实例。例如,dataset_list=[BoundaryIC_Instance, Equation_Instance, BoundaryBC_Instance and ExistedData_Instance]。默认值:None。
- 异常:
ValueError - 如果 geometry_dict 、 existed_data_list 和 dataset_list 都为None。
TypeError - 如果 geometry_dict 的类型不是dict。
TypeError - 如果 geometry_dict 的键类型不是Geometry的实例。
TypeError - 如果 existed_data_list 的类型不是列表、元组或ExistedDataConfig的实例。
TypeError - 如果 existed_data_list 的元素不是ExistedDataConfig的实例。
TypeError - 如果 dataset_list 的元素不是Data的实例。
- 支持平台:
Ascend
GPU
样例:
>>> from mindflow.geometry import Rectangle, generate_sampling_config >>> from mindflow.data import Dataset >>> rectangle_mesh = dict({'domain': dict({'random_sampling': False, 'size': [50, 25]})}) >>> rect_space = Rectangle("rectangle", coord_min=[0, 0], coord_max=[5, 5], ... sampling_config=generate_sampling_config(rectangle_mesh)) >>> geom_dict = {rect_space: ["domain"]} >>> dataset = Dataset(geometry_dict=geom_dict)
- create_dataset(batch_size=1, preprocess_fn=None, input_output_columns_map=None, shuffle=True, drop_remainder=True, prebatched_data=False, num_parallel_workers=1, num_shards=None, shard_id=None, python_multiprocessing=False)[源代码]
创建最终的MindSpore类型数据集以合并所有子数据集。
- 参数:
batch_size (int, 可选) - 每个批处理创建的行数,int值。默认值:1。
preprocess_fn (Union[list[TensorOp], list[functions]], 可选) - 要应用于数据集的进行操作的列表。按它们在此列表中的顺序遍历操作。默认值:None。
input_output_columns_map (dict, 可选) - 指定要替换的列和需要替换成的内容。键是要被替换的列名,值是要替换成的内容。如果映射后所有列都未更改,则无需设置此参数。默认值:None。
shuffle (bool, 可选) - 是否对数据集执行shuffle。需要随机可访问的输入。默认值:True,表中显示的预期顺序。
drop_remainder (bool, 可选) - 确定是否删除最后一个block,这个block的数据行数小于批处理大小。如果为True,且有更小的 batch_size ,可用于创建最后一个batch,那么这些行将被丢弃,而不传播到子节点。默认值:True。
prebatched_data (bool, 可选) - 在创建MindSpore数据集之前生成预批处理数据。如果为True,当按索引获取每个子数据集数据时,将返回预批处理数据。否则,批处理操作将由MindSpore数据集接口:dataset.batch完成。当 batch_size 非常大时,建议将此选项设置为True,以提高主机上的性能。默认值:False。
num_parallel_workers (int, 可选) - 并行处理数据集的工作线程(线程)数。默认值:1。
num_shards (int, 可选) - 数据集将被划成的分片数。需要随机可访问的输入。指定此参数时,num_samples 反映每个分片的最大样本数。默认值:None。
shard_id (int, 可选) - num_shards 内的shard ID。需要随机可访问的输入。仅当同时指定了 num_shards 时必须指定此参数。默认值:None。
python_multiprocessing (bool, 可选) - 并行使用多处理Python函数per_batch_map和multi-processing。 如果函数计算量很大,此选项可能会很有用。默认值:False。
- 返回:
BatchDataset,批处理之后的数据集。
样例:
>>> data = dataset.create_dataset()
- class mindflow.data.Equation(geometry)[源代码]
方程域的采样数据。
- 参数:
geometry (Geometry) - 指定方程域的几何信息。
- 异常:
TypeError - 如果 geometry 不是Geometry的实例。
ValueError - 如果 geometry 的sampling_config为None。
KeyError - 如果 geometry 的sampling_config.domain为None。
- 支持平台:
Ascend
GPU
样例:
>>> from mindflow.geometry import generate_sampling_config, Geometry >>> from mindflow.data import Equation >>> geometry_config = dict({'domain' : dict({'random_sampling' : True, 'size' : 100, 'sampler' : 'uniform',})}) >>> sampling_config = generate_sampling_config(geometry_config) >>> geom = Geometry("geom", 1, 0.0, 1.0, sampling_config=sampling_config) >>> boundary = Equation(geometry=geom)
- class mindflow.data.ExistedDataConfig(name, data_dir, columns_list, data_format='npy', constraint_type='Label', random_merge=True)[源代码]
设置ExistedDataset的参数。
- 参数:
name (str) - 指定数据集的名称。
data_dir (Union[str, list, tuple]) - 已存在数据文件的路径。
columns_list (Union[str, list, tuple]) - 数据集的列名列表。
data_format (str, 可选) - 现有数据文件的格式,默认值:“npy”。目前支持“npy”的格式。
constraint_type (str, 可选) - 指定创建的数据集的约束类型,默认值:“Label”。
random_merge (bool, 可选) - 指定是否随机合并给定数据集,默认值:True。
- 支持平台:
Ascend
GPU
- class mindflow.data.ExistedDataset(name=None, data_dir=None, columns_list=None, data_format='npy', constraint_type='Label', random_merge=True, data_config=None)[源代码]
使用给定的数据路径创建数据集。
说明
目前支持 npy 数据格式。
- 参数:
name (str, 可选) - 指定数据集的名称,默认值:None。如果 data_config 为None,则 name 应不是None。
data_dir (Union[str, list, tuple], 可选) - 已存在数据文件的路径,默认值:None。如果 data_config 为None, data_dir 不应为None。
columns_list (Union[str, list, tuple], 可选) - 数据集的列名列表,默认值:None。如果 data_config 为None, columns_list 不应为None。
data_format (str, 可选) - 现有数据文件的格式,默认值:“npy”。
constraint_type (str, 可选) - 指定创建的数据集的约束类型,默认值:“Label”。
random_merge (bool, 可选) - 指定是否随机合并给定的数据集,默认值:True。
data_config (ExistedDataConfig, 可选) - ExistedDataConfig实例,收集上述的信息,默认值:None。如果非None,则将通过使用它来简化创建数据集类。如果为None,则(name, data_dir, columns_list, data_format, constraint_type, random_merge)的信息用于替换。
- 异常:
ValueError - 当 data_config 为None时,参数 name / data_dir / columns_list 为None。
TypeError - 如果 data_config 不是ExistedDataConfig的实例。
ValueError - 如果 data_format 不是”npy”。
- 支持平台:
Ascend
GPU
样例:
>>> from mindflow.data import ExistedDataConfig, ExistedDataset >>> data_config = ExistedDataConfig(name='exist', ... data_dir=['./data.npy'], ... columns_list=['input_data'], data_format="npy", constraint_type="Equation") >>> dataset = ExistedDataset(data_config=data_config)
- class mindflow.data.MindDataset(dataset_files, dataset_name='dataset', constraint_type='Label', shuffle=True, num_shards=None, shard_id=None, sampler=None, num_samples=None, num_parallel_workers=None)[源代码]
从MindRecord类型的数据创建数据集。
- 参数:
dataset_files (Union[str, list[str]]) - 如果数据集文件是str,则它代表思维记录源的一个组件的文件名,其他具有相同源的文件在同一路径中,将自动找到并加载。如果dataset_file是列表,它表示要直接读取的数据集文件列表。
dataset_name (str, 可选) - 数据集名称,默认值:“dataset_name”。
constraint_type (str, 可选) - 指定数据集的约束类型,以获取其相应的损失函数。默认值:“Label”。
shuffle (Union[bool, Shuffle level], 可选) - 每个epoch对数据执行shuffle。如果shuffle为False,则不执行shuffle。如果shuffle为True,则执行全局shuffle。默认值:True。 而且,有两种shuffle level:
Shuffle.GLOBAL:对文件和样例进行shuffle。
Shuffle.FILES:仅对文件shuffle。
num_shards (int, 可选) - 数据集将划分为的分片数指定此参数时, num_samples 反映每个分片的最大样本数。默认值:None。
shard_id (int, 可选) - num_shards 中的分片ID。只有当同时指定 num_shards 时,才能指定该参数。默认值:None。
sampler (Sampler, 可选) - 用于从数据集。支持列表:SubsetRandomSampler、PkSampler、RandomSampler、SequentialSampler、DistributedSampler。默认值:None,采样器是独占的使用shuffle和block_reader。
num_samples (int, 可选) - 要包括在数据集中的样本数。默认值:None,所有样本。
num_parallel_workers (int, 可选) - 读取器的数。默认值:None。
- 异常:
ValueError - 如果dataset_files无效或不存在。
TypeError - 如果数据集名称不是string。
ValueError - 如果constraint_type.lower()不在[“equation”, “bc”, “ic”, “label”, “function”, “custom”]中。
RuntimeError - 如果指定了 num_shards ,但 shard_id 为None。
RuntimeError - 如果指定了 shard_id ,但 num_shards 为None。
ValueError - 如果 shard_id 无效(<0或>= num_shards)。
- 支持平台:
Ascend
GPU
样例:
>>> from mindflow.data import MindDataset >>> dataset_files = ["./data_dir"] # contains 1 or multiple MindRecord files >>> dataset = MindDataset(dataset_files=dataset_files)
- create_dataset(batch_size=1, preprocess_fn=None, updated_columns_list=None, drop_remainder=True, prebatched_data=False, num_parallel_workers=1, python_multiprocessing=False)[源代码]
创建最终的MindSpore类型数据集。
- 参数:
batch_size (int, 可选) - 每个批处理创建的行数为int。默认值:1。
preprocess_fn (Union[list[TensorOp], list[functions]], 可选) - 要进行的操作列表应用于数据集。操作按它们在此列表中的显示顺序应用。默认值:None。
updated_columns_list (list, 可选) - 对数据集的列进行的操作。默认值:None。
drop_remainder (bool, 可选) - 确定是否删除最后一个块,其数据行号小于批处理大小。如果为True,如果有更少的比批处理大小行可用于创建最后一个批处理,那么这些行将被丢弃,而不传播到子节点。默认值:True。
prebatched_data (bool, 可选) - 在数据预处理前生成预批处理数据。默认值:False。
num_parallel_workers (int, 可选) - 并行处理数据集的工作线程(线程)数量。默认值:1。
python_multiprocessing (bool, 可选) - 使用多处理并行Python函数per_batch_map。如果函数计算量很大,此选项可能会很有用。默认值:False。
- 返回:
BatchDataset,批处理的数据集。
样例:
>>> data = dataset.create_dataset()
- get_columns_list()[源代码]
获取数据集中的列。
- 返回:
list[str]。最终统一数据集的列名列表。
样例:
>>> columns_list = dataset.get_columns_list()
- set_constraint_type(constraint_type='Equation')[源代码]
设置数据集的约束类型。
- 参数:
constraint_type (Union[str, dict]) - 指定数据集的约束类型。如果是字符串,则约束所有子数据集的类型将设置为相同的类型。如果是dict,则子数据集及其约束类型由对(key, value)指定。默认值:“Equation”。
样例:
>>> dataset.set_constraint_type("Equation")
- split_dataset(dataset_dict, constraint_dict=None)[源代码]
拆分原始数据集以设置差异损失函数。
- 参数:
dataset_dict (dict) - 每个子数据集的字典,key是标记的名称,而value 指子数据集中包含的指定列。
constraint_dict (Union[None, str, dict]) - 指定数据集的约束类型。如果是None,则“Label”将为所有人设置。如果是字符串,则所有将设置为相同的字符串。如果是dict,子数据集及其约束类型由对(key, value)指定。默认值:None。
样例:
>>> dataset.split_dataset({"Equation" : "inner_points", "BC" : "bc_points"})