数据处理概述

下载Notebook下载样例代码查看源文件

MindSpore Dataset 提供两种数据处理能力:数据处理Pipeline模式和数据处理轻量化模式。

  1. 数据处理Pipeline模式:提供基于C++ Runtime的并发数据处理流水线(Pipeline)能力。用户通过定义数据集加载、数据变换、数据Batch等流程,即可以实现数据集的高效加载、高效处理、高效Batch,且并发度可调、缓存可调等能力,实现为NPU卡训练提供零Bottle Neck的训练数据。

  2. 数据处理轻量化模式:支持用户使用数据变换操作(如:Resize、Crop、HWC2CHW等)进行单个样本的数据处理。

本章节后续重点讲述两种数据处理模式。

数据处理Pipeline模式

用户通过API定义的Dataset流水线,运行训练进程后Dataset会从数据集中循环加载数据 -> 处理 -> Batch -> 迭代器,最终用于训练。

MindSpore Dataset Pipeline

如上图所示,MindSpore Dataset模块使得用户很简便地定义数据预处理Pipeline,并以最高效(多进程/多线程)的方式处理数据集中样本,具体的步骤参考如下:

  • 加载数据集(Dataset):用户可以方便地使用 Dataset类 (标准格式数据集vision数据集nlp数据集audio数据集) 来加载已支持的数据集,或者通过 UDF Loader + GeneratorDataset 自定义数据集 实现Python层自定义数据集的加载,同时加载类方法可以使用多种Sampler、数据分片、数据shuffle等功能;

  • 数据集操作(filter/ skip):用户通过数据集对象方法 .shuffle / .filter / .skip / .split / .take / … 来实现数据集的进一步混洗、过滤、跳过、最多获取条数等操作;

  • 数据集样本变换操作(map):用户可以将数据变换操作 (vision数据变换nlp数据变换audio数据变换 ) 添加到map操作中执行,数据预处理过程中可以定义多个map操作,用于执行不同变换操作,数据变换操作也可以是 用户自定义变换的 PyFunc ;

  • 批(batch):用户在样本完成变换后,使用 .batch 操作将多个样本组织成batch,也可以通过batch的参数 per_batch_map 来自定义batch逻辑;

  • 迭代器(create_dict_iterator):最后用户通过数据集对象方法 .create_dict_iterator / .create_tuple_iterator 来创建迭代器将预处理完成的数据循环输出。

数据集加载

下面主要介绍单个数据集加载、数据集组合、数据集切分、数据集保存等常用数据集加载方式。

单个数据集加载

数据集加载类用于实现本地磁盘、OBS数据集、共享存储上的训练数据集加载,主要作用是将存储上的数据集Load至内存中。数据集加载接口如下:

数据集接口分类

API列表

说明

标准格式数据集

MindDatasetTFRecordDatasetCSVDataset

其中 MindDataset 依赖 MindSpore 数据格式, 详见: 格式转换

自定义数据集

GeneratorDatasetRandomDataset

其中 GeneratorDataset 负责加载 用户自定义DataLoader, 详见: 自定义数据集

常用数据集

ImageFolderDatasetCifar10DatasetIWSLT2017DatasetLJSpeechDataset

用于常用的开源数据集

以上数据集加载(示例)可以配置不同的参数以实现不同的加载效果,常用参数举例如下:

  1. 从数据集中过滤指定的列,参数名:columns_list,该参数仅针对部分数据集接口,默认值:None,加载所有数据列。

  2. 可以配置数据集的读取并发数,参数名:num_parallel_workers,默认值:8。

  3. 可以通过参数配置数据集的采样逻辑:

    1. 开启混洗,参数名:shuffle,默认值:True。

    2. 对数据集进行分片,参数名:num_shards & shard_id,默认值:None,不分片。

    3. 其他更多的采样逻辑可以参考:数据采样

数据集组合

数据集组合可以将多个数据集以串联/并朕的方式组合起来,形成一个全新的dataset对象。

  • 将多个数据集串联起来

[27]:
import mindspore.dataset as ds

ds.config.set_seed(1234)

data = [1, 2, 3]
dataset1 = ds.NumpySlicesDataset(data=data, column_names=["column_1"])

data = [4, 5, 6]
dataset2 = ds.NumpySlicesDataset(data=data, column_names=["column_1"])

dataset = dataset1.concat(dataset2)
for item in dataset.create_dict_iterator():
    print(item)
{'column_1': Tensor(shape=[], dtype=Int32, value= 3)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 2)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 1)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 6)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 5)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 4)}
  • 将多个数据集并联起来

[28]:
import mindspore.dataset as ds

ds.config.set_seed(1234)

data = [1, 2, 3]
dataset1 = ds.NumpySlicesDataset(data=data, column_names=["column_1"])

data = [4, 5, 6]
dataset2 = ds.NumpySlicesDataset(data=data, column_names=["column_2"])

dataset = dataset1.zip(dataset2)
for item in dataset.create_dict_iterator():
    print(item)
{'column_1': Tensor(shape=[], dtype=Int32, value= 3), 'column_2': Tensor(shape=[], dtype=Int32, value= 6)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 2), 'column_2': Tensor(shape=[], dtype=Int32, value= 5)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 1), 'column_2': Tensor(shape=[], dtype=Int32, value= 4)}

数据集切分

将数据集切分成 训练数据集 和 验证数据集,分别用于训练过程和验证过程。

[29]:
import mindspore.dataset as ds

data = [1, 2, 3, 4, 5, 6]
dataset = ds.NumpySlicesDataset(data=data, column_names=["column_1"], shuffle=False)

train_dataset, eval_dataset = dataset.split([4, 2])

print(">>>> train dataset >>>>")
for item in train_dataset.create_dict_iterator():
    print(item)
>>>> train dataset >>>>
{'column_1': Tensor(shape=[], dtype=Int32, value= 5)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 2)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 6)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 1)}
[30]:
print(">>>> eval dataset >>>>")
for item in eval_dataset.create_dict_iterator():
    print(item)
>>>> eval dataset >>>>
{'column_1': Tensor(shape=[], dtype=Int32, value= 3)}
{'column_1': Tensor(shape=[], dtype=Int32, value= 4)}

数据集保存

将数据集重新保存到MindRecord数据格式。

[31]:
import os
import mindspore.dataset as ds

ds.config.set_seed(1234)

data = [1, 2, 3, 4, 5, 6]
dataset = ds.NumpySlicesDataset(data=data, column_names=["column_1"])
if os.path.exists("./train_dataset.mindrecord"):
    os.remove("./train_dataset.mindrecord")
if os.path.exists("./train_dataset.mindrecord.db"):
    os.remove("./train_dataset.mindrecord.db")
dataset.save("./train_dataset.mindrecord")

数据变换

普通数据变换

用户可以使用 .map(...) 操作对样本进行变换操作,使用 .filter(...) 操作对样本进行过滤操作,使用 .project(...) 操作对多列进行排序和过滤,使用 .rename(...) 操作对指定列重命名,使用 .shuffle(...) 操作对数据进行缓存区大小的混洗,使用 .skip(...) 操作跳过数据集的前 n 条,使用 .take(...) 操作只读数据集的前 n 条样本,如下重点说明 .map(...) 的使用方法:

  • .map(...) 中使用 Dataset 提供的数据变换操作

    Dataset提供了丰富的数据变换操作(列表),这些数据变换操作可以直接放在 .map(...) 中使用。具体使用方法参考 map变换操作

  • .map(...) 中使用 自定义 数据变换操作

    Dataset也支持用户自定义的数据变换操作,仅需将用户自定义函数传递给 .map(...) 退可。具体使用方法参考:自定义map变换操作

  • .map(...) 中返回 Dict 数据结构 数据

    Dataset也支持用户自定义的数据变换操作中返回 Dict 数据结构,使得 定义的数据变换 更加灵活。具体使用方法参考:自定义map变换操作处理字典对象

自动数据增强

除了以上的普通数据变换,Dataset 还提供了一种自动数据变换方式,可以基于特定策略自动对图像进行数据变换处理。详细说明见:自动数据增强

数据batch

Dataset提供 .batch(...) 操作,可以很方便的将数据变换操作后的样本组织成batch。

  1. 默认 .batch(...) 操作,将batch_size个样本组织成shape为 (batch_size, …)的数据,详细用法请参考 batch操作

  2. 自定义 .batch(..., per_batch_map, ...) 操作,支持用户将 [np.ndarray, nd.ndarray, …] 多条数据按照自定义逻辑组织batch,详细用法请参考 自定义batch操作

数据集迭代器

用户在定义完成 数据集加载(xxDataset)-> 数据处理(.map)-> 数据batch(.batch) Dataset流水线后,可以通过 迭代器方法 .create_dict_iterator(...) / .create_tuple_iterator(...) 循环将数据输出。具体的使用方法参考:数据集迭代器

性能优化

数据处理性能优化

针对数据处理Pipeline性能不足的场景,可以参考 数据处理性能优化 来进一步优化性能,以满足训练端到端性能要求。

单节点数据缓存

另外,对于推理场景,为了追求极致的性能,可以使用 单节点数据缓存 将数据集缓存于本地内存中,以加速数据集的读取和预处理。

数据处理轻量化模式

用户可以直接使用数据变换操作处理一条数据,返回值即是数据变换的结果。

数据变换操作(vision数据变换nlp数据变换audio数据变换 )可以像调用普通函数一样直接来使用,一般用法是先初始化数据变换对象,然后通过 括号方法 传入需要处理的数据 并得到处理的结果。

[32]:
from download import download
from PIL import Image
import mindspore.dataset.vision as vision

url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/banana.jpg"
download(url, './banana.jpg', replace=True)
Downloading data from https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/banana.jpg (17 kB)

file_sizes: 100%|██████████████████████████| 17.1k/17.1k [00:00<00:00, 8.55MB/s]
Successfully downloaded file to ./banana.jpg
[32]:
'./banana.jpg'
[33]:

img_ori = Image.open("banana.jpg").convert("RGB") print("Image.type: {}, Image.shape: {}".format(type(img_ori), img_ori.size))
Image.type: <class 'PIL.Image.Image'>, Image.shape: (356, 200)
[34]:
# Apply Resize to input immediately
resize_op = vision.Resize(size=(320))
img = resize_op(img_ori)
print("Image.type: {}, Image.shape: {}".format(type(img), img.size))
Image.type: <class 'PIL.Image.Image'>, Image.shape: (569, 320)

更多的示例请参考:轻量化数据处理