优化数据处理

Ascend GPU CPU 数据准备

在线运行下载Notebook下载样例代码查看源文件

概述

数据是整个深度学习中最重要的一环,因为数据的好坏决定了最终结果的上限,模型的好坏只是去无限逼近这个上限,所以高质量的数据输入,会在整个深度神经网络中起到积极作用,数据在整个数据处理和数据增强的过程像经过pipeline管道的水一样,源源不断地流向训练系统,如图所示:

pipeline

MindSpore为用户提供了数据处理以及数据增强的功能,在数据的整个pipeline过程中,其中的每一步骤,如果都能够进行合理的运用,那么数据的性能会得到很大的优化和提升。本次体验将基于CIFAR-10数据集[1]来为大家展示如何在数据加载、数据处理和数据增强的过程中进行性能的优化。

此外,操作系统的存储、架构和计算资源也会一定程度上影响数据处理的性能。

准备环节

导入模块

dataset模块提供API用来加载和处理数据集。

[1]:
import mindspore.dataset as ds

numpy模块用于生成ndarray数组。

[2]:
import numpy as np

下载所需数据集

运行以下命令来获取数据集:

下载CIFAR-10二进制格式数据集,并将数据集文件解压到./datasets/目录下,数据加载的时候使用该数据集。

[ ]:
import os
import requests
import tarfile
import zipfile
import shutil

requests.packages.urllib3.disable_warnings()

def download_dataset(url, target_path):
    """download and decompress dataset"""
    if not os.path.exists(target_path):
        os.makedirs(target_path)
    download_file = url.split("/")[-1]
    if not os.path.exists(download_file):
        res = requests.get(url, stream=True, verify=False)
        if download_file.split(".")[-1] not in ["tgz", "zip", "tar", "gz"]:
            download_file = os.path.join(target_path, download_file)
        with open(download_file, "wb") as f:
            for chunk in res.iter_content(chunk_size=512):
                if chunk:
                    f.write(chunk)
    if download_file.endswith("zip"):
        z = zipfile.ZipFile(download_file, "r")
        z.extractall(path=target_path)
        z.close()
    if download_file.endswith(".tar.gz") or download_file.endswith(".tar") or download_file.endswith(".tgz"):
        t = tarfile.open(download_file)
        names = t.getnames()
        for name in names:
            t.extract(name, target_path)
        t.close()
    print("The {} file is downloaded and saved in the path {} after processing".format(os.path.basename(url), target_path))

download_dataset("https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz", "./datasets")
test_path = "./datasets/cifar-10-batches-bin/test"
train_path = "./datasets/cifar-10-batches-bin/train"
os.makedirs(test_path, exist_ok=True)
os.makedirs(train_path, exist_ok=True)
if not os.path.exists(os.path.join(test_path, "test_batch.bin")):
    shutil.move("./datasets/cifar-10-batches-bin/test_batch.bin", test_path)
[shutil.move("./datasets/cifar-10-batches-bin/"+i, train_path) for i in os.listdir("./datasets/cifar-10-batches-bin/") if os.path.isfile("./datasets/cifar-10-batches-bin/"+i) and not i.endswith(".html") and not os.path.exists(os.path.join(train_path, i))]

解压后的数据集文件的目录结构如下:

./datasets/cifar-10-batches-bin
├── readme.html
├── test
│   └── test_batch.bin
└── train
    ├── batches.meta.txt
    ├── data_batch_1.bin
    ├── data_batch_2.bin
    ├── data_batch_3.bin
    ├── data_batch_4.bin
    └── data_batch_5.bin

下载CIFAR-10 Python文件格式数据集,并将数据集文件解压到./datasets/cifar-10-batches-py目录下,数据转换的时候使用该数据集。

[ ]:
download_dataset("https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-python.tar.gz", "./datasets")

解压后数据集文件的目录结构如下:

./datasets/cifar-10-batches-py
├── batches.meta
├── data_batch_1
├── data_batch_2
├── data_batch_3
├── data_batch_4
├── data_batch_5
├── readme.html
└── test_batch

数据加载性能优化

MindSpore支持加载计算机视觉、自然语言处理等领域的常用数据集、特定格式的数据集以及用户自定义的数据集,详情内容请参考数据集加载。不同数据集加载接口的底层实现方式不同,性能也存在着差异,如下所示:

常用数据集

用户自定义

MindRecord

底层实现

C++

Python

C++

性能

性能优化方案

data-loading-performance-scheme

数据加载性能优化建议如下:

  • 对于已经提供加载接口的常用数据集,优先使用MindSpore提供的数据集加载接口进行加载,可以获得较好的加载性能,具体内容请参考内置加载算子,如果性能仍无法满足需求,则可采取多线程并发方案,请参考本文多线程优化方案

  • 不支持的数据集格式,推荐先将数据集转换为MindRecord数据格式后再使用MindDataset类进行加载(详细使用方法参考API),具体内容请参考将数据集转换为MindSpore数据格式,如果性能仍无法满足需求,则可采取多线程并发方案,请参考本文多线程优化方案

  • 不支持的数据集格式,算法快速验证场景,优选用户自定义GeneratorDataset类实现(详细使用方法参考API),如果性能仍无法满足需求,则可采取多进程并发方案,请参考本文多进程优化方案

代码示例

基于以上的数据加载性能优化建议,本次体验分别使用内置加载算子Cifar10Dataset类(详细使用方法参考API)、数据转换后使用MindDataset类、使用GeneratorDataset类进行数据加载,代码演示如下:

  1. 使用内置算子Cifar10Dataset类加载CIFAR-10数据集,这里使用的是CIFAR-10二进制格式的数据集,加载数据时采取多线程优化方案,开启了4个线程并发完成任务,最后对数据创建了字典迭代器,并通过迭代器读取了一条数据记录。

[5]:
cifar10_path = "./datasets/cifar-10-batches-bin/train"

# create Cifar10Dataset for reading data
cifar10_dataset = ds.Cifar10Dataset(cifar10_path, num_parallel_workers=4)
# create a dictionary iterator and read a data record through the iterator
print(next(cifar10_dataset.create_dict_iterator()))
{'image': Tensor(shape=[32, 32, 3], dtype=UInt8, value=
[[[181, 185, 194],
  [184, 187, 196],
  [189, 192, 201],
  ...
  [178, 181, 191],
  [171, 174, 183],
  [166, 170, 179]],
 [[182, 185, 194],
  [184, 187, 196],
  [189, 192, 201],
  ...
  [180, 183, 192],
  [173, 176, 185],
  [167, 170, 179]],
 [[185, 188, 197],
  [187, 190, 199],
  [193, 196, 205],
  ...
  [182, 185, 194],
  [176, 179, 188],
  [170, 173, 182]],
 ...
 [[176, 174, 185],
  [172, 171, 181],
  [174, 172, 183],
  ...
  [168, 171, 180],
  [164, 167, 176],
  [160, 163, 172]],
 [[172, 170, 181],
  [171, 169, 180],
  [173, 171, 182],
  ...
  [164, 167, 176],
  [160, 163, 172],
  [156, 159, 168]],
 [[171, 169, 180],
  [173, 171, 182],
  [177, 175, 186],
  ...
  [162, 165, 174],
  [158, 161, 170],
  [152, 155, 164]]]), 'label': Tensor(shape=[], dtype=UInt32, value= 6)}
  1. 使用Cifar10ToMR这个类将CIFAR-10数据集转换为MindSpore数据格式,这里使用的是CIFAR-10 python文件格式的数据集,然后使用MindDataset类加载MindSpore数据格式数据集,加载数据采取多线程优化方案,开启了4个线程并发完成任务,最后对数据创建了字典迭代器,并通过迭代器读取了一条数据记录。

[6]:
import os
from mindspore.mindrecord import Cifar10ToMR

trans_path = "./transform/"

if not os.path.exists(trans_path):
    os.mkdir(trans_path)

os.system("rm -f {}cifar10*".format(trans_path))

cifar10_path = './datasets/cifar-10-batches-py'
cifar10_mindrecord_path = './transform/cifar10.record'

cifar10_transformer = Cifar10ToMR(cifar10_path, cifar10_mindrecord_path)
# execute transformation from CIFAR-10 to MindRecord
cifar10_transformer.transform(['label'])

# create MindDataset for reading data
cifar10_mind_dataset = ds.MindDataset(dataset_files=cifar10_mindrecord_path, num_parallel_workers=4)
# create a dictionary iterator and read a data record through the iterator
print(next(cifar10_mind_dataset.create_dict_iterator()))
{'data': Tensor(shape=[1289], dtype=UInt8, value= [255, 216, 255, 224,   0,  16,  74,  70,  73,  70,   0,   1,   1,   0,   0,   1,   0,   1,   0,   0, 255, 219,   0,  67,
   0,   2,   1,   1,   1,   1,   1,   2,   1,   1,   1,   2,   2,   2,   2,   2,   4,   3,   2,   2,   2,   2,   5,   4,
   4,   3,   4,   6,   5,   6,   6,   6,   5,   6,   6,   6,   7,   9,   8,   6,   7,   9,   7,   6,   6,   8,  11,   8,
   9,  10,  10,  10,  10,  10,   6,   8,  11,  12,  11,  10,  12,   9,  10,  10,  10, 255, 219,   0,  67,   1,   2,   2,
   ...
   ...
   ...
  39, 227, 206, 143, 241,  91, 196, 154, 230, 189, 125, 165, 105, 218,  94, 163, 124, 146,  11, 187,  29,  34, 217, 210,
  23, 186,  56,  14, 192,  19, 181,   1,  57,  36,  14,  51, 211, 173, 105,   9, 191, 100, 212, 174, 122,  25, 110,  39,
  11, 133, 193, 226, 169,  73,  36, 234,  69,  90, 222,  93,  31, 223, 115, 255, 217]), 'id': Tensor(shape=[], dtype=Int64, value= 46084), 'label': Tensor(shape=[], dtype=Int64, value= 5)}
  1. 使用GeneratorDataset类加载自定义数据集,并且采取多进程优化方案,开启了4个进程并发完成任务,最后对数据创建了字典迭代器,并通过迭代器读取了一条数据记录。

[7]:
def generator_func(num):
    for i in range(num):
        yield (np.array([i]),)

# create GeneratorDataset for reading data
dataset = ds.GeneratorDataset(source=generator_func(5), column_names=["data"], num_parallel_workers=4)
# create a dictionary iterator and read a data record through the iterator
print(next(dataset.create_dict_iterator()))
{'data': Tensor(shape=[1], dtype=Int64, value= [0])}

shuffle性能优化

shuffle操作主要是对有序的数据集或者进行过repeat的数据集进行混洗,MindSpore专门为用户提供了shuffle函数,其中设定的buffer_size参数越大,混洗程度越大,但时间、计算资源消耗也会大。该接口支持用户在整个pipeline的任何时候都可以对数据进行混洗,具体内容请参考shuffle处理。但是因为底层的实现方式不同,该方式的性能不如直接在内置加载算子中设置shuffle参数直接对数据进行混洗。

性能优化方案

shuffle-performance-scheme

shuffle性能优化建议如下:

  • 直接使用内置加载算子的shuffle参数进行数据的混洗。

  • 如果使用的是shuffle函数,当性能仍无法满足需求,可通过调大buffer_size参数的值来优化提升性能。

代码示例

基于以上的shuffle性能优化建议,本次体验分别使用内置加载算子Cifar10Dataset类的shuffle参数和Shuffle函数进行数据的混洗,代码演示如下:

  1. 使用内置算子Cifar10Dataset类加载CIFAR-10数据集,这里使用的是CIFAR-10二进制格式的数据集,并且设置shuffle参数为True来进行数据混洗,最后对数据创建了字典迭代器,并通过迭代器读取了一条数据记录。

[8]:
cifar10_path = "./datasets/cifar-10-batches-bin/train"

# create Cifar10Dataset for reading data
cifar10_dataset = ds.Cifar10Dataset(cifar10_path, shuffle=True)
# create a dictionary iterator and read a data record through the iterator
print(next(cifar10_dataset.create_dict_iterator()))
{'image': Tensor(shape=[32, 32, 3], dtype=UInt8, value=
[[[213, 205, 194],
  [215, 207, 196],
  [219, 210, 200],
  ...
  [253, 254, 249],
  [253, 254, 249],
  [253, 254, 249]],
 [[218, 208, 198],
  [220, 210, 200],
  [222, 212, 202],
  ...
  [253, 254, 249],
  [253, 254, 249],
  [253, 254, 249]],
 [[219, 209, 198],
  [222, 211, 200],
  [224, 214, 202],
  ...
  [254, 253, 248],
  [254, 253, 248],
  [254, 253, 248]],
 ...
 [[135, 141, 139],
  [135, 141, 139],
  [146, 152, 150],
  ...
  [172, 174, 172],
  [181, 182, 182],
  [168, 168, 167]],
 [[113, 119, 117],
  [109, 115, 113],
  [117, 123, 121],
  ...
  [155, 159, 156],
  [150, 155, 155],
  [135, 140, 140]],
 [[121, 127, 125],
  [117, 123, 121],
  [121, 127, 125],
  ...
  [180, 184, 180],
  [141, 146, 144],
  [125, 130, 129]]]), 'label': Tensor(shape=[], dtype=UInt32, value= 8)}
  1. 使用shuffle函数进行数据混洗,参数buffer_size设置为3,数据采用GeneratorDataset类自定义生成。

[9]:
def generator_func():
    for i in range(5):
        yield (np.array([i, i+1, i+2, i+3, i+4]),)

ds1 = ds.GeneratorDataset(source=generator_func, column_names=["data"])
print("before shuffle:")
for data in ds1.create_dict_iterator():
    print(data["data"])

ds2 = ds1.shuffle(buffer_size=3)
print("after shuffle:")
for data in ds2.create_dict_iterator():
    print(data["data"])
before shuffle:
[0 1 2 3 4]
[1 2 3 4 5]
[2 3 4 5 6]
[3 4 5 6 7]
[4 5 6 7 8]
after shuffle:
[2 3 4 5 6]
[3 4 5 6 7]
[1 2 3 4 5]
[0 1 2 3 4]
[4 5 6 7 8]

数据增强性能优化

在图片分类的训练中,尤其是当数据集比较小的时候,用户可以使用数据增强的方式来预处理图片,从而丰富数据集。MindSpore为用户提供了多种数据增强的方式,其中包括:

  • 使用内置C算子(c_transforms模块)进行数据增强。

  • 使用内置Python算子(py_transforms模块)进行数据增强。

  • 用户可根据自己的需求,自定义Python函数进行数据增强。

具体的内容请参考数据增强。因为底层的实现方式不同,所以性能还是有一定的差异,如下所示:

模块

底层接口

说明

c_transforms

C++(基于OpenCV)

性能高

py_transforms

Python(基于PIL)

该模块提供了多种图像增强功能,并提供了PIL Image和Numpy数组之间的传输方法

性能优化方案

data-enhancement-performance-scheme

数据增强性能优化建议如下:

MindSpore也支持用户同时使用c_transforms和py_transforms模块中的数据增强方法,但由于两者底层实现不同,过度地混用将增加资源开销,降低处理性能。推荐用户可以单独使用c_transforms或py_transforms中的算子;或者先统一使用其中一种,再统一使用另一种;请不要在两种不同实现模块的数据增强接口中频繁地进行切换。

代码示例

基于以上的数据增强性能优化建议,本次体验分别使用c_transforms模块和自定义Python函数进行了数据增强,演示代码如下所示:

  1. 使用c_transforms模块进行数据增强,数据增强时采用多线程优化方案,开启了4个线程并发完成任务,并且采用了算子融合优化方案,使用RandomResizedCrop融合类替代RandomResize类和RandomCrop类。

[10]:
import mindspore.dataset.vision.c_transforms as C
import matplotlib.pyplot as plt

cifar10_path = "./datasets/cifar-10-batches-bin/train"

# create Cifar10Dataset for reading data
cifar10_dataset = ds.Cifar10Dataset(cifar10_path, num_parallel_workers=4)
transforms = C.RandomResizedCrop((800, 800))
# apply the transform to the dataset through dataset.map()
cifar10_dataset = cifar10_dataset.map(operations=transforms, input_columns="image", num_parallel_workers=4)

data = next(cifar10_dataset.create_dict_iterator())
plt.imshow(data["image"].asnumpy())
plt.show()
_images/optimize_data_processing_46_0.png
  1. 使用自定义Python函数进行数据增强,数据增强时采用多进程优化方案,开启了4个进程并发完成任务。

[11]:
def generator_func():
    for i in range(5):
        yield (np.array([i, i+1, i+2, i+3, i+4]),)

ds3 = ds.GeneratorDataset(source=generator_func, column_names=["data"])
print("before map:")
for data in ds3.create_dict_iterator():
    print(data["data"])

func = lambda x: x**2
ds4 = ds3.map(operations=func, input_columns="data", python_multiprocessing=True, num_parallel_workers=4)
print("after map:")
for data in ds4.create_dict_iterator():
    print(data["data"])
before map:
[0 1 2 3 4]
[1 2 3 4 5]
[2 3 4 5 6]
[3 4 5 6 7]
[4 5 6 7 8]
after map:
[ 0  1  4  9 16]
[ 1  4  9 16 25]
[ 4  9 16 25 36]
[ 9 16 25 36 49]
[16 25 36 49 64]

操作系统性能优化

由于MindSpore的数据处理主要在Host端进行,运行环境的配置也会对处理性能产生影响,主要体现在存储设备、NUMA架构和CPU计算资源等方面。

  1. 存储设备

    数据的加载过程涉及频繁的磁盘操作,磁盘读写的性能直接影响了数据加载的速度。当数据集较大时,推荐使用固态硬盘进行数据存储,固态硬盘的读写速度普遍较普通磁盘高,能够减少I/O操作对数据处理性能的影响。

    一般地,加载后的数据将会被缓存到操作系统的页面缓存中,在一定程度上降低了后续读取的开销,加速了后续Epoch的数据加载速度。用户也可以通过MindSpore提供的单节点缓存技术,手动缓存加载增强后的数据,避免了重复的数据加载和数据增强。

  2. NUMA架构

    NUMA的全称为Non-Uniform Memory Access,即非一致性内存访问,是为了解决传统的对称多处理器(SMP)架构中的可扩展性问题而诞生的一种内存架构。在传统架构中,多个处理器共用一条内存总线,容易产生带宽不足、内存冲突等问题。而在NUMA架构中,处理器和内存被划分为多个组,每个组称为一个节点(Node),各个节点拥有独立的集成内存控制器(IMC)总线,用于节点内通信,不同节点间则通过快速路径互连(QPI)进行通信。对于某一节点来说,处在同节点内的内存被称为本地内存,处在其他节点的内存被称为外部内存,访问本地内存的延迟会小于访问外部内存的延迟。在数据处理过程中,可以通过将进程与节点绑定,来减小内存访问的延迟。一般我们可以使用以下命令进行进程与node节点的绑定:

    numactl --cpubind=0 --membind=0 python train.py
    
  3. CPU(计算资源)

    尽管可以通过多线程并行技术加快数据处理的速度,但是实际运行时并不能保证CPU计算资源完全被利用起来。如果能够人为地事先完成计算资源配置的设定,将能在一定程度上提高CPU计算资源的利用率。

    • 计算资源的分配

      在分布式训练中,同一设备上可能开启多个训练进程。默认情况下,各个进程的资源分配与抢占将会遵循操作系统本身的策略进行,当进程较多时,频繁的资源竞争可能会导致数据处理性能的下降。如果能够事先设定各个进程的计算资源分配,就能避免这种资源竞争带来的开销。

      numactl --cpubind=0 python train.py
      

      taskset -c 0-15 python train.py
      

      numactl的方式较为粗粒度,直接指定numa node id,而taskset的方式是细粒度的,它能够直接指定numa node上的cpu core,其中0-15表示的core id从0到15。

    • CPU频率设置

      出于节约能效的考虑,操作系统会根据需要适时调整CPU的运行频率,但更低的功耗意味着计算性能的下降,会减慢数据处理的速度。要想充分发挥CPU的最大算力,需要手动设置CPU的运行频率。如果发现操作系统的CPU运行模式为平衡模式或者节能模式,可以通过将其调整为性能模式,提升数据处理的性能。

      cpupower frequency-set -g performance
      

性能优化方案总结

多线程优化方案

在数据加载和增强中,可以通过设置接口的num_parallel_workers参数调整数据处理时的并发执行线程数目,利用CPU的多核多线程特点提升数据处理的性能。如果用户没有手动指定num_parallel_workers参数,各个数据处理操作将默认使用8个子线程来进行并发处理。例如:

  • 在数据加载的过程中,内置数据加载类有num_parallel_workers参数用来设置线程数。

  • 在数据增强的过程中,map函数有num_parallel_workers参数用来设置线程数。

  • 在Batch的过程中,batch函数有num_parallel_workers参数用来设置线程数。

具体内容请参考内置加载算子。 在使用MindSpore进行单卡或多卡训练时,num_parallel_workers参数的设置应遵循以下原则:

  • 各数据加载和处理操作所设置的num_parallel_workers参数之和应不大于CPU所支持的最大线程数,否则将造成各个操作间的资源竞争。

  • 在设置num_parallel_workers参数之前,建议先使用MindSpore的Profiler(性能分析)工具分析训练中各个操作的性能情况,将更多的资源分配给性能较差的操作,即设置更大的num_parallel_workers,使得各个操作之间的吞吐达到平衡,避免不必要的等待。

  • 在单卡训练场景中,提高num_parallel_workers参数往往能直接提高处理性能,但在多卡场景下,由于CPU竞争加剧,一味地提高num_parallel_workers可能会导致性能劣化,需要在实际训练中尝试使用折中数值。

多进程优化方案

数据处理中Python实现的算子均支持多进程的模式,例如:

  • GeneratorDataset这个类默认是多进程模式,它的num_parallel_workers参数表示的是开启的进程数,默认为1,具体内容请参考GeneratorDataset

  • 如果使用Python自定义函数或者py_transforms模块进行数据增强的时候,当map函数的参数python_multiprocessing设置为True时,此时参数num_parallel_workers表示的是进程数,参数python_multiprocessing默认为False,此时参数num_parallel_workers表示的是线程数,具体的内容请参考内置加载算子

Compose优化方案

Map算子可以接收Tensor算子列表,并将按照顺序应用所有的这些算子,与为每个Tensor算子使用的Map算子相比,此类“胖Map算子”可以获得更好的性能,如图所示:

compose

算子融合优化方案

提供某些融合算子,这些算子将两个或多个算子的功能聚合到一个算子中。具体内容请参考数据增强算子,与它们各自组件的流水线相比,这种融合算子提供了更好的性能。如图所示:

operator-fusion

操作系统优化方案

  • 使用固态硬盘进行数据存储。

  • 将进程与node节点绑定。

  • 人工分配更多的计算资源。

  • 提高CPU运算频率。

参考文献

[1] Alex Krizhevsky. Learning Multiple Layers of Features from Tiny Images.