格式转换

在线运行下载Notebook下载样例代码查看源文件

MindSpore中可以把用于训练网络模型的数据集,转换为MindSpore特定的格式数据(MindSpore Record格式),从而更加方便地保存和加载数据。其目标是归一化用户的数据集,并进一步通过MindDataset接口实现数据的读取,并用于训练过程。

conversion

此外,MindSpore还针对部分数据场景进行了性能优化,使用MindSpore Record数据格式可以减少磁盘IO、网络IO开销,从而获得更好的使用体验。

MindSpore Record数据格式具备的特征如下:

  1. 实现数据统一存储、访问,使得训练时数据读取更加简便。

  2. 数据聚合存储、高效读取,使得训练时数据方便管理和移动。

  3. 高效的数据编解码操作,使得用户可以对数据操作无感知。

  4. 可以灵活控制数据切分的分区大小,实现分布式数据处理。

Record文件结构

如下图所示,MindSpore Record文件由数据文件和索引文件组成。

MindSpore Record

其中数据文件包含文件头、标量数据页、块数据页,用于存储用户归一化后的训练数据,且单个MindSpore Record文件建议小于20G,用户可将大数据集进行分片存储为多个MindSpore Record文件。

而索引文件则包含基于标量数据(如图像Label、图像文件名等)生成的索引信息,用于方便的检索、统计数据集信息。

数据文件中的文件头、标量数据页、块数据页的具体用途如下所示:

  • 文件头:是MindSpore Record文件的元信息,主要用来存储文件头大小、标量数据页大小、块数据页大小、Schema信息、索引字段、统计信息、文件分区信息、标量数据与块数据对应关系等。

  • 标量数据页:主要用来存储整型、字符串、浮点型数据,如图像的Label、图像的文件名、图像的长宽等信息,即适合用标量来存储的信息会保存在这里。

  • 块数据页:主要用来存储二进制串、NumPy数组等数据,如二进制图像文件本身、文本转换成的字典等。

值得注意的是,数据文件和索引文件均暂不支持重命名操作。

转换成Record格式

下面主要介绍如何将CV类数据和NLP类数据转换为MindSpore Record文件格式,并通过MindDataset接口,实现MindSpore Record文件的读取。

转换CV类数据集

本示例主要以包含100条记录的CV数据集并将其转换为MindSpore Record格式为例子,介绍如何将CV类数据集转换成MindSpore Record文件格式,并使用MindDataset接口读取。

首先,需要创建100张图片的数据集并对齐进行保存,其样本包含file_name(字符串)、label(整型)、 data(二进制)三个字段,然后使用MindDataset接口读取该MindSpore Record文件。

  1. 生成100张图像,并转换成MindSpore Record文件格式。

[1]:
import os
from PIL import Image
from io import BytesIO

import mindspore.mindrecord as record


# 输出的MindSpore Record文件完整路径
MINDRECORD_FILE = "test.mindrecord"

if os.path.exists(MINDRECORD_FILE):
    os.remove(MINDRECORD_FILE)
    os.remove(MINDRECORD_FILE + ".db")

# 定义包含的字段
cv_schema = {"file_name": {"type": "string"},
             "label": {"type": "int32"},
             "data": {"type": "bytes"}}

# 声明MindSpore Record文件格式
writer = record.FileWriter(file_name=MINDRECORD_FILE, shard_num=1)
writer.add_schema(cv_schema, "it is a cv dataset")
writer.add_index(["file_name", "label"])

# 创建数据集
data = []
for i in range(100):
    i += 1
    sample = {}
    white_io = BytesIO()
    Image.new('RGB', (i*10, i*10), (255, 255, 255)).save(white_io, 'JPEG')
    image_bytes = white_io.getvalue()
    sample['file_name'] = str(i) + ".jpg"
    sample['label'] = i
    sample['data'] = white_io.getvalue()

    data.append(sample)
    if i % 10 == 0:
        writer.write_raw_data(data)
        data = []

if data:
    writer.write_raw_data(data)

writer.commit()
[1]:
MSRStatus.SUCCESS

从上面的打印结果MSRStatus.SUCCESS可以看出,数据集转换成功。在本篇后续的例子中如果数据集转换成功均可看到此打印结果。

  1. 通过MindDataset接口读取MindSpore Record文件格式。

[2]:
import mindspore.dataset as ds
import mindspore.dataset.vision.c_transforms as vision

# 读取MindSpore Record文件格式
data_set = ds.MindDataset(dataset_files=MINDRECORD_FILE)
decode_op = vision.Decode()
data_set = data_set.map(operations=decode_op, input_columns=["data"], num_parallel_workers=2)

# 样本计数
print("Got {} samples".format(data_set.get_dataset_size()))
Got 100 samples

转换NLP类数据集

本示例首先创建一个包含100条记录的MindSpore Record文件格式,其样本包含八个字段,均为整型数组,然后使用MindDataset接口读取该MindSpore Record文件。

为了方便展示,此处略去了将文本转换成字典序的预处理过程。

  1. 生成100条文本数据,并转换成MindSpore Record文件格式。

[3]:
import os
import numpy as np
import mindspore.mindrecord as record

# 输出的MindSpore Record文件完整路径
MINDRECORD_FILE = "test.mindrecord"

if os.path.exists(MINDRECORD_FILE):
    os.remove(MINDRECORD_FILE)
    os.remove(MINDRECORD_FILE + ".db")

# 定义样本数据包含的字段
nlp_schema = {"source_sos_ids": {"type": "int64", "shape": [-1]},
              "source_sos_mask": {"type": "int64", "shape": [-1]},
              "source_eos_ids": {"type": "int64", "shape": [-1]},
              "source_eos_mask": {"type": "int64", "shape": [-1]},
              "target_sos_ids": {"type": "int64", "shape": [-1]},
              "target_sos_mask": {"type": "int64", "shape": [-1]},
              "target_eos_ids": {"type": "int64", "shape": [-1]},
              "target_eos_mask": {"type": "int64", "shape": [-1]}}

# 声明MindSpore Record文件格式
writer = record.FileWriter(file_name=MINDRECORD_FILE, shard_num=1)
writer.add_schema(nlp_schema, "Preprocessed nlp dataset.")

# 创建虚拟数据集
data = []
for i in range(100):
    i += 1
    sample = {"source_sos_ids": np.array([i, i + 1, i + 2, i + 3, i + 4], dtype=np.int64),
              "source_sos_mask": np.array([i * 1, i * 2, i * 3, i * 4, i * 5, i * 6, i * 7], dtype=np.int64),
              "source_eos_ids": np.array([i + 5, i + 6, i + 7, i + 8, i + 9, i + 10], dtype=np.int64),
              "source_eos_mask": np.array([19, 20, 21, 22, 23, 24, 25, 26, 27], dtype=np.int64),
              "target_sos_ids": np.array([28, 29, 30, 31, 32], dtype=np.int64),
              "target_sos_mask": np.array([33, 34, 35, 36, 37, 38], dtype=np.int64),
              "target_eos_ids": np.array([39, 40, 41, 42, 43, 44, 45, 46, 47], dtype=np.int64),
              "target_eos_mask": np.array([48, 49, 50, 51], dtype=np.int64)}
    data.append(sample)

    if i % 10 == 0:
        writer.write_raw_data(data)
        data = []

if data:
    writer.write_raw_data(data)

writer.commit()
[3]:
MSRStatus.SUCCESS
  1. 通过MindDataset接口读取MindSpore Record格式文件。

[4]:
import mindspore.dataset as ds

# 读取MindSpore Record文件格式
data_set = ds.MindDataset(dataset_files=MINDRECORD_FILE, shuffle=False)

# 样本计数
print("Got {} samples".format(data_set.get_dataset_size()))

# 打印部分数据
count = 0
for item in data_set.create_dict_iterator():
    print("source_sos_ids:", item["source_sos_ids"])
    count += 1
    if count == 10:
        break
Got 100 samples
source_sos_ids: [1 2 3 4 5]
source_sos_ids: [2 3 4 5 6]
source_sos_ids: [3 4 5 6 7]
source_sos_ids: [4 5 6 7 8]
source_sos_ids: [5 6 7 8 9]
source_sos_ids: [ 6  7  8  9 10]
source_sos_ids: [ 7  8  9 10 11]
source_sos_ids: [ 8  9 10 11 12]
source_sos_ids: [ 9 10 11 12 13]
source_sos_ids: [10 11 12 13 14]

其他数据集转换

MindSpore提供转换常用数据集的工具类,能够将常用的数据集转换为MindSpore Record文件格式。

更多数据集转换的详细说明参考API文档

转换CIFAR-10数据集

用户可以通过Cifar10ToMR类,将CIFAR-10原始数据转换为MindSpore Record,并使用MindDataset接口读取。

  1. 下载CIFAR-10数据集并解压到指定目录,以下示例代码将数据集下载并解压到指定位置。

[5]:
from mindvision import dataset

# 声明数据集下载地址和数据集存储路径
dl_path = "./datasets"
dl_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-python.tar.gz"

# 下载和解压数据集
dl = dataset.DownLoad()
dl.download_and_extract_archive(url=dl_url, download_path=dl_path)

解压后数据集文件的目录结构如下所示:

./datasets/cifar-10-batches-py
├── batches.meta
├── data_batch_1
├── data_batch_2
├── data_batch_3
├── data_batch_4
├── data_batch_5
├── readme.html
└── test_batch
  1. 创建Cifar10ToMR对象,调用transform接口,将CIFAR-10数据集转换为MindSpore Record文件格式。

[6]:
import os
from mindspore.mindrecord import Cifar10ToMR

ds_target_path = "./datasets/mindspore_dataset_conversion/"

os.system("rm -f {}*".format(ds_target_path))
os.system("mkdir -p {}".format(ds_target_path))

# CIFAR-10数据集路径
CIFAR10_DIR = "./datasets/cifar-10-batches-py"
# 输出的MindSpore Record文件路径
MINDRECORD_FILE = "./datasets/mindspore_dataset_conversion/cifar10.mindrecord"

cifar10_transformer = Cifar10ToMR(CIFAR10_DIR, MINDRECORD_FILE)
cifar10_transformer.transform(['label'])
[6]:
MSRStatus.SUCCESS
  1. 通过MindDataset接口读取MindSpore Record文件格式。

[7]:
import mindspore.dataset as ds
import mindspore.dataset.vision.c_transforms as vision

# 读取MindSpore Record文件格式
data_set = ds.MindDataset(dataset_files=MINDRECORD_FILE)
decode_op = vision.Decode()
data_set = data_set.map(operations=decode_op, input_columns=["data"], num_parallel_workers=2)

# 样本计数
print("Got {} samples".format(data_set.get_dataset_size()))
Got 50000 samples

转换CSV数据集

本示例首先创建一个包含5条记录的CSV文件,然后通过CsvToMR工具类将CSV文件转换为MindSpore Record文件格式,并最终通过MindDataset接口将其读取出来。

本示例依赖第三方支持包pandas,可使用命令pip install pandas安装。如本文档以Notebook运行时,完成安装后需要重启kernel才能执行后续代码。

  1. 生成CSV文件,并转换成MindSpore Record。

[8]:
import csv
import os
from mindspore import mindrecord as record

# CSV文件的路径
CSV_FILE = "test.csv"
# 输出的MindSpore Record文件路径
MINDRECORD_FILE = "test.mindrecord"

if os.path.exists(MINDRECORD_FILE):
    os.remove(MINDRECORD_FILE)
    os.remove(MINDRECORD_FILE + ".db")

def generate_csv():
    """生成csv格式文件数据"""
    headers = ["id", "name", "math", "english"]
    rows = [(1, "Lily", 78.5, 90),
            (2, "Lucy", 99, 85.2),
            (3, "Mike", 65, 71),
            (4, "Tom", 95, 99),
            (5, "Jeff", 85, 78.5)]
    with open(CSV_FILE, 'w', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        writer.writerows(rows)

# 生成csv格式文件数据
generate_csv()

# 转换csv格式文件
csv_transformer = record.CsvToMR(CSV_FILE, MINDRECORD_FILE, partition_number=1)
csv_transformer.transform()

assert os.path.exists(MINDRECORD_FILE)
assert os.path.exists(MINDRECORD_FILE + ".db")
  1. 通过MindDataset接口读取MindSpore Record。

[9]:
import mindspore.dataset as ds

data_set = ds.MindDataset(dataset_files=MINDRECORD_FILE)

# 样本计数
print("Got {} samples".format(data_set.get_dataset_size()))
Got 5 samples

转换TFRecord数据集

本示例需提前安装TensorFlow,目前只支持TensorFlow 1.13.0-rc1及以上版本。如本文档以Notebook运行时,完成安装后需要重启kernel才能执行后续代码。

本示例首先通过TensorFlow创建一个TFRecord文件,然后通过TFRecordToMR工具类将TFRecord文件转换为MindSpore Record格式文件,最后通过MindDataset接口将其读取出来,并使用Decode函数对image_bytes字段进行解码。

  1. 导入相关模块。

    import collections
    from io import BytesIO
    import os
    import mindspore.dataset as ds
    import mindspore.mindrecord as record
    import mindspore.dataset.vision.c_transforms as vision
    from PIL import Image
    import tensorflow as tf
    
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    
  1. 生成TFRecord文件。

    # TFRecord文件的路径
    TFRECORD_FILE = "test.tfrecord"
    # 输出的MindSpore Record文件路径
    MINDRECORD_FILE = "test.mindrecord"
    
    def generate_tfrecord():
        def create_int_feature(values):
            if isinstance(values, list):
                feature = tf.train.Feature(int64_list=tf.train.Int64List(value=list(values)))
            else:
                feature = tf.train.Feature(int64_list=tf.train.Int64List(value=[values]))
            return feature
    
        def create_float_feature(values):
            if isinstance(values, list):
                feature = tf.train.Feature(float_list=tf.train.FloatList(value=list(values)))
            else:
                feature = tf.train.Feature(float_list=tf.train.FloatList(value=[values]))
            return feature
    
        def create_bytes_feature(values):
            if isinstance(values, bytes):
                white_io = BytesIO()
                Image.new('RGB', (10, 10), (255, 255, 255)).save(white_io, 'JPEG')
                image_bytes = white_io.getvalue()
                feature = tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_bytes]))
            else:
                feature = tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(values, encoding='utf-8')]))
            return feature
    
        writer = tf.io.TFRecordWriter(TFRECORD_FILE)
    
        example_count = 0
        for i in range(10):
            # 随机创建Tensorflow样本数据
            file_name = "000" + str(i) + ".jpg"
            image_bytes = bytes(str("aaaabbbbcccc" + str(i)), encoding="utf-8")
            int64_scalar = i
            float_scalar = float(i)
            int64_list = [i, i+1, i+2, i+3, i+4, i+1234567890]
            float_list = [float(i), float(i+1), float(i+2.8), float(i+3.2),
                          float(i+4.4), float(i+123456.9), float(i+98765432.1)]
    
            # 把数据存入TFRecord文件格式中
            features = collections.OrderedDict()
            features["file_name"] = create_bytes_feature(file_name)
            features["image_bytes"] = create_bytes_feature(image_bytes)
            features["int64_scalar"] = create_int_feature(int64_scalar)
            features["float_scalar"] = create_float_feature(float_scalar)
            features["int64_list"] = create_int_feature(int64_list)
            features["float_list"] = create_float_feature(float_list)
    
            tf_example = tf.train.Example(features=tf.train.Features(feature=features))
            writer.write(tf_example.SerializeToString())
            example_count += 1
    
        writer.close()
        print("Write {} rows in tfrecord.".format(example_count))
    
    generate_tfrecord()
    
    Write 10 rows in tfrecord.
    
  1. 将TFRecord转换成MindSpore Record。

    feature_dict = {"file_name": tf.io.FixedLenFeature([], tf.string),
                    "image_bytes": tf.io.FixedLenFeature([], tf.string),
                    "int64_scalar": tf.io.FixedLenFeature([], tf.int64),
                    "float_scalar": tf.io.FixedLenFeature([], tf.float32),
                    "int64_list": tf.io.FixedLenFeature([6], tf.int64),
                    "float_list": tf.io.FixedLenFeature([7], tf.float32),
                    }
    
    if os.path.exists(MINDRECORD_FILE):
        os.remove(MINDRECORD_FILE)
        os.remove(MINDRECORD_FILE + ".db")
    
    tfrecord_transformer = record.TFRecordToMR(TFRECORD_FILE, MINDRECORD_FILE, feature_dict, ["image_bytes"])
    tfrecord_transformer.transform()
    
    assert os.path.exists(MINDRECORD_FILE)
    assert os.path.exists(MINDRECORD_FILE + ".db")
    
  1. 通过MindDataset接口读取MindSpore Record。

    data_set = ds.MindDataset(dataset_files=MINDRECORD_FILE)
    decode_op = vision.Decode()
    data_set = data_set.map(operations=decode_op, input_columns=["image_bytes"], num_parallel_workers=2)
    
    # 样本计数
    print("Got {} samples".format(data_set.get_dataset_size()))
    
    Got 10 samples