下沉模式

下载Notebook下载样例代码查看源文件

概述

昇腾芯片集成了AICORE和AICPU等计算单元。其中AICORE负责稠密Tensor和Vector运算,AICPU负责复杂控制逻辑的处理。

为充分发挥昇腾芯片的运算、逻辑控制和任务分发能力,MindSpore提供了数据图下沉、图下沉和循环下沉功能,极大地减少Host-Device交互开销,有效地提升训练与推理的性能。MindSpore的计算图包含网络算子以及算子间的依赖关系。

从用户的视角来看,网络训练的流程如下:

user-view

本教程以训练的执行流程为例介绍数据下沉、图下沉和循环下沉的原理和使用方法。

数据下沉

为了提升网络的执行性能,通常使用专用芯片来执行算子,一个芯片对应一个Device,Host与Device的一般交互流程如下:

without-sink

由上图可见,每个训练迭代都需要从Host拷贝数据到Device,可通过数据下沉消除Host和Device间拷贝输入数据的开销。

使能数据下沉后,MindSpore会在Device侧创建专门的数据缓存队列,MindSpore数据处理引擎使用高性能数据通道将数据的预处理结果发送到Device的数据队列上,计算图通过GetNext算子直接从数据队列拷贝输入数据,Host向数据队列发送数据和计算图从数据队列读取数据形成流水并行,执行当前迭代的同时可向数据队列发送下一个迭代的数据,从而隐藏了Host-Device数据拷贝的开销,MindSpore高性能数据处理引擎的原理参考这里

GPU后端和昇腾后端都支持数据下沉,GPU数据下沉的Host-Device交互流程如下:

data-sink

用户可通过train接口的dataset_sink_mode控制是否使能数据下沉。

注意:在数据下沉模式时,如果发生了数据处理的错误,数据处理引擎会停止工作,而GetNext算子会在限定时间内获取不到数据后提示超时错误,此时需留意数据引擎抛出的错误,及时修正并重新启动训练。如果数据引擎没有抛出错误,则可能是数据处理速度太慢,此时可以调节数据处理引擎的并行度或调整参数 op_timeout 以防止出现超时错误,参考这里

图下沉

一般情况下,每个训练迭代都需要下发并触发device上每个算子的执行,Host与Device交互频繁。

为减少Host与Device的交互,在图编译时,将网络中的算子打包并一起下发到device,每次迭代只触发一次计算图的执行即可,从而提升网络的执行效率。

graph-sink

GPU后端暂不支持图下沉;使用昇腾设备时,开启数据下沉会同时启用图下沉。

循环下沉

启用数据下沉和图下沉后,每个迭代的计算结果都会返回Host,并由Host判断是否需要进入下一个迭代,为减少每个迭代的Device-Host交互,可以将进入下一个迭代的循环判断下沉到Device,这样等所有迭代执行完成后再将计算结果返回到Host。循环下沉的Host-Device交互流程如下:

loop-sink

用户通过train接口的dataset_sink_modesink_size参数控制每个epoch的下沉迭代数量,Device侧连续执行sink_size个迭代后才返回到Host。当前MindSpore默认采用单算子执行的方式,这种情况下每迭代都会进行Device-Host的交互,此时相当于设置为sink_size=1。

使用方法

Model.train实现数据下沉

Modeltrain接口参数dataset_sink_mode可以控制数据是否下沉。dataset_sink_mode为True表示数据下沉,否则为非下沉。所谓下沉即数据通过通道直接传送到Device上。

dataset_sink_mode参数可以配合sink_size控制每个epoch下沉的数据量大小。当dataset_sink_mode设置为True,即数据下沉模式时:

  • 如果sink_size为默认值-1,则每一个epoch训练整个数据集,理想状态下下沉数据的速度快于硬件计算的速度,保证处理数据的耗时隐藏于网络计算时间内;

  • 如果sink_size>0,此时原始数据集可以被无限次遍历,下沉数据流程仍与sink_size=-1相同,不同点是每个epoch仅训练sink_size大小的数据量,如果有LossMonitor,那么会训练sink_size大小的数据量就打印一次loss值,下一个epoch继续从上次遍历的结束位置继续遍历。

下沉的总数据量由epochsink_size两个变量共同控制,即总数据量=epoch*sink_size

当使用LossMonitorTimeMonitor或其他Callback接口时,如果dataset_sink_mode设置为False,Host侧和Device侧之间每个step交互一次,所以会每个step返回一个结果,如果dataset_sink_mode为True,因为数据在Device上通过通道传输,Host侧和Device侧之间每个epoch进行一次数据交互,所以每个epoch只返回一次结果。

  • 当前CPU不支持数据下沉。

  • 当设置为GRAPH模式时,每个batch数据的shape必须相同;当设置为PYNATIVE模式时,要求每个batch的size相同。

  • 由于数据下沉对数据集的遍历是连续,当前不支持非连续遍历。

  • 如果在使用数据下沉模式时,出现fault kernel_name=GetNextGetNext... task error或者outputs = self.get_next()等类似的错误,那么有可能是数据处理过程中某些样本处理太耗时,导致网络计算侧长时间拿不到数据报错,此时可以将dataset_sink_mode设置为False再次验证,或者对数据集使用create_dict_iterator()接口单独循环数据集,并参考数据处理性能优化调优数据处理,保证数据处理高性能。

代码样例如下:

[2]:
import os
import requests
import mindspore.dataset as ds
import mindspore as ms
import mindspore.dataset.transforms as transforms
import mindspore.dataset.vision as vision
import mindspore.nn as nn
from mindspore import train
from mindspore.common.initializer import TruncatedNormal
from mindspore.dataset.vision import Inter
import mindspore.ops as ops

requests.packages.urllib3.disable_warnings()

def create_dataset(data_path, batch_size=32, repeat_size=1,
                   num_parallel_workers=1):
    """
    create dataset for train or test
    """
    # define dataset
    mnist_ds = ds.MnistDataset(data_path)

    resize_height, resize_width = 32, 32
    rescale = 1.0 / 255.0
    shift = 0.0
    rescale_nml = 1 / 0.3081
    shift_nml = -1 * 0.1307 / 0.3081

    # define map operations
    resize_op = vision.Resize((resize_height, resize_width), interpolation=Inter.LINEAR)  # Bilinear mode
    rescale_nml_op = vision.Rescale(rescale_nml, shift_nml)
    rescale_op = vision.Rescale(rescale, shift)
    hwc2chw_op = vision.HWC2CHW()
    type_cast_op = transforms.TypeCast(ms.int32)

    # apply map operations on images
    mnist_ds = mnist_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=resize_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=rescale_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=rescale_nml_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=hwc2chw_op, num_parallel_workers=num_parallel_workers)

    # apply DatasetOps
    buffer_size = 10000
    mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size)  # 10000 as in LeNet train script
    mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)
    mnist_ds = mnist_ds.repeat(repeat_size)

    return mnist_ds


def conv(in_channels, out_channels, kernel_size, stride=1, padding=0):
    """weight initial for conv layer"""
    weight = weight_variable()
    return nn.Conv2d(in_channels, out_channels,
                     kernel_size=kernel_size, stride=stride, padding=padding,
                     weight_init=weight, has_bias=False, pad_mode="valid")


def fc_with_initialize(input_channels, out_channels):
    """weight initial for fc layer"""
    weight = weight_variable()
    bias = weight_variable()
    return nn.Dense(input_channels, out_channels, weight, bias)


def weight_variable():
    """weight initial"""
    return TruncatedNormal(0.02)


class LeNet5(nn.Cell):
    """
    Lenet network
    Args:
        num_class (int): Num classes. Default: 10.

    Returns:
        Tensor, output tensor

    Examples:
        >>> LeNet(num_class=10)
    """

    def __init__(self, num_class=10):
        super(LeNet5, self).__init__()
        self.num_class = num_class
        self.batch_size = 32
        self.conv1 = conv(1, 6, 5)
        self.conv2 = conv(6, 16, 5)
        self.fc1 = fc_with_initialize(16 * 5 * 5, 120)
        self.fc2 = fc_with_initialize(120, 84)
        self.fc3 = fc_with_initialize(84, self.num_class)
        self.relu = nn.ReLU()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)

    def construct(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.max_pool2d(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.max_pool2d(x)
        x = ops.reshape(x, (self.batch_size, -1))
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x

def download_dataset(dataset_url, path):
    filename = dataset_url.split("/")[-1]
    save_path = os.path.join(path, filename)
    if os.path.exists(save_path):
        return
    if not os.path.exists(path):
        os.makedirs(path)
    res = requests.get(dataset_url, stream=True, verify=False)
    with open(save_path, "wb") as f:
        for chunk in res.iter_content(chunk_size=512):
            if chunk:
                f.write(chunk)
    print("The {} file is downloaded and saved in the path {} after processing".format(os.path.basename(dataset_url), path))


if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU")
    ds_train_path = "./datasets/MNIST_Data/train/"
    download_dataset("https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-labels-idx1-ubyte", ds_train_path)
    download_dataset("https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-images-idx3-ubyte", ds_train_path)
    ds_train = create_dataset(ds_train_path, 32)

    network = LeNet5(10)
    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
    net_opt = nn.Momentum(network.trainable_params(), 0.01, 0.9)
    model = train.Model(network, net_loss, net_opt)

    print("============== Starting Training ==============")
    model.train(epoch=10, train_dataset=ds_train, callbacks=[train.LossMonitor()], dataset_sink_mode=True, sink_size=1000)
The train-labels-idx1-ubyte file is downloaded and saved in the path ./datasets/MNIST_Data/train/ after processing
The train-images-idx3-ubyte file is downloaded and saved in the path ./datasets/MNIST_Data/train/ after processing
============== Starting Training ==============
TotalTime = 1.57468, [16]
[parse]: 0.00476957
[symbol_resolve]: 0.379241, [1]
    [Cycle 1]: 0.378255, [1]
        [resolve]: 0.378242
[combine_like_graphs]: 0.00174131
[meta_unpack_prepare]: 0.000686213
[abstract_specialize]: 0.0927178
[auto_monad]: 0.00169479
[inline]: 4.96954e-06
[pipeline_split]: 8.78051e-06
[optimize]: 1.05932, [16]
...
epoch: 1 step: 1000, loss is 0.2323482483625412
epoch: 2 step: 1000, loss is 0.1581915020942688
epoch: 3 step: 1000, loss is 0.0452561192214489
epoch: 4 step: 1000, loss is 0.0008174572139978409
epoch: 5 step: 1000, loss is 0.026678290218114853
epoch: 6 step: 1000, loss is 0.24375736713409424
epoch: 7 step: 1000, loss is 0.004280050750821829
epoch: 8 step: 1000, loss is 0.08765432983636856
epoch: 9 step: 1000, loss is 0.06880836188793182
epoch: 10 step: 1000, loss is 0.05223526805639267

batch_size为32的情况下,数据集的大小为1875,当sink_size设置为1000时,表示每个epoch下沉1000个batch的数据,下沉次数为epoch=10,下沉的总数据量为:epoch*sink_size=10000。

dataset_sink_mode为True,所以每个epoch返回一次结果。训练过程中使用DatasetHelper进行数据集的迭代及数据信息的管理。如果为下沉模式,使用 mindspore.connect_network_with_dataset 函数连接当前的训练网络或评估网络 network 和 DatasetHelper,此函数使用 mindspore.ops.GetNext 包装输入网络,以实现在前向计算时,在设备(Device)侧从对应名称为 queue_name 的数据通道中获取数据,并将数据传递到输入网络。如果为非下沉模式,则在主机(Host)直接遍历数据集获取数据。

dataset_sink_mode为False时,sink_size参数设置无效。

data_sink实现数据下沉

在MindSpore的函数式编程范式中,还可以使用data_sink接口将模型的执行函数和数据集绑定,实现数据下沉。参数含义如下:

  • fn:下沉模型的执行函数;

  • dataset:数据集,可以由mindspore.dataset生成;

  • sink_size:用来调整每次下沉执行的数据量,可以指定为任意正数,默认值为1,即每次下沉只执行一个step的数据。如需单次下沉执行整个epoch的数据,可以使用datasetget_datasize_size()方法来指定其值。也可以单次下沉多个epoch,设置其值为epoch * get_datasize_size()。(多次data_sink的调用对数据集是连续遍历的,下一次调用是从上一次调用结束位置后继续遍历)

  • jit_config:编译时所使用的JitConfig配置项,详细可参考mindspore.JitConfig。默认值:None,表示以PyNative模式运行。

  • input_signature:用于表示输入参数的Tensor。Tensor的shape和dtype将作为函数的输入shape和dtype。默认值:None。

  • 当前CPU不支持数据下沉。

  • 当设置为GRAPH模式时,每个batch数据的shape必须相同;当设置为PYNATIVE模式时,要求每个batch的size相同。

  • 由于数据下沉对数据集的遍历是连续,当前不支持非连续遍历。

  • 如果在使用数据下沉模式时,出现fault kernel_name=GetNextGetNext... task error或者outputs = self.get_next()等类似的错误,那么有可能是数据处理过程中某些样本处理太耗时,导致网络计算侧长时间拿不到数据报错,此时可以将dataset_sink_mode设置为False再次验证,或者对数据集使用create_dict_iterator()接口单独循环数据集,并参考数据处理性能优化调优数据处理,保证数据处理高性能。

代码示例如下:

[ ]:
import os
import requests
import mindspore.dataset as ds
import mindspore as ms
import mindspore.dataset.transforms as transforms
import mindspore.dataset.vision as vision
import mindspore.nn as nn
from mindspore.common.initializer import TruncatedNormal
from mindspore.dataset.vision import Inter
import mindspore.ops as ops

requests.packages.urllib3.disable_warnings()

def create_dataset(data_path, batch_size=32, repeat_size=1,
                   num_parallel_workers=1):
    """
    create dataset for train or test
    """
    # define dataset
    mnist_ds = ds.MnistDataset(data_path)

    resize_height, resize_width = 32, 32
    rescale = 1.0 / 255.0
    shift = 0.0
    rescale_nml = 1 / 0.3081
    shift_nml = -1 * 0.1307 / 0.3081

    # define map operations
    resize_op = vision.Resize((resize_height, resize_width), interpolation=Inter.LINEAR)  # Bilinear mode
    rescale_nml_op = vision.Rescale(rescale_nml, shift_nml)
    rescale_op = vision.Rescale(rescale, shift)
    hwc2chw_op = vision.HWC2CHW()
    type_cast_op = transforms.TypeCast(ms.int32)

    # apply map operations on images
    mnist_ds = mnist_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=resize_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=rescale_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=rescale_nml_op, num_parallel_workers=num_parallel_workers)
    mnist_ds = mnist_ds.map(input_columns="image", operations=hwc2chw_op, num_parallel_workers=num_parallel_workers)

    # apply DatasetOps
    buffer_size = 10000
    mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size)  # 10000 as in LeNet train script
    mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)
    mnist_ds = mnist_ds.repeat(repeat_size)

    return mnist_ds


def conv(in_channels, out_channels, kernel_size, stride=1, padding=0):
    """weight initial for conv layer"""
    weight = weight_variable()
    return nn.Conv2d(in_channels, out_channels,
                     kernel_size=kernel_size, stride=stride, padding=padding,
                     weight_init=weight, has_bias=False, pad_mode="valid")


def fc_with_initialize(input_channels, out_channels):
    """weight initial for fc layer"""
    weight = weight_variable()
    bias = weight_variable()
    return nn.Dense(input_channels, out_channels, weight, bias)


def weight_variable():
    """weight initial"""
    return TruncatedNormal(0.02)


class LeNet5(nn.Cell):
    """
    Lenet network
    Args:
        num_class (int): Num classes. Default: 10.

    Returns:
        Tensor, output tensor

    Examples:
        >>> LeNet(num_class=10)
    """

    def __init__(self, num_class=10):
        super(LeNet5, self).__init__()
        self.num_class = num_class
        self.batch_size = 32
        self.conv1 = conv(1, 6, 5)
        self.conv2 = conv(6, 16, 5)
        self.fc1 = fc_with_initialize(16 * 5 * 5, 120)
        self.fc2 = fc_with_initialize(120, 84)
        self.fc3 = fc_with_initialize(84, self.num_class)
        self.relu = nn.ReLU()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)

    def construct(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.max_pool2d(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.max_pool2d(x)
        x = ops.reshape(x, (self.batch_size, -1))
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x

def download_dataset(dataset_url, path):
    filename = dataset_url.split("/")[-1]
    save_path = os.path.join(path, filename)
    if os.path.exists(save_path):
        return
    if not os.path.exists(path):
        os.makedirs(path)
    res = requests.get(dataset_url, stream=True, verify=False)
    with open(save_path, "wb") as f:
        for chunk in res.iter_content(chunk_size=512):
            if chunk:
                f.write(chunk)
    print("The {} file is downloaded and saved in the path {} after processing".format(os.path.basename(dataset_url), path))

if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU")
    ds_train_path = "./datasets/MNIST_Data/train/"
    download_dataset("https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-labels-idx1-ubyte", ds_train_path)
    download_dataset("https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-images-idx3-ubyte", ds_train_path)

    network = LeNet5(10)
    network.set_train()
    weights = network.trainable_params()
    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
    net_opt = nn.Momentum(network.trainable_params(), 0.01, 0.9)

    def forward_fn(data, label):
        loss = net_loss(network(data), label)
        return loss

    grad_fn = ms.value_and_grad(forward_fn, None, weights)

    def train_step(data, label):
        loss, grads = grad_fn(data, label)
        net_opt(grads)
        return loss

    print("============== Different calling methods train 10 epochs ==============")
    jit = ms.JitConfig()
    print("1. Default, execute one step data each per sink")
    ds_train = create_dataset(ds_train_path, 32)
    data_size = ds_train.get_dataset_size()
    epochs = 10
    sink_process = ms.data_sink(train_step, ds_train, jit_config=jit)
    for _ in range(data_size * epochs):
        loss = sink_process()
        print(f"step {_ + 1}, loss is {loss}")

    print("2. Execute one epoch data per sink")
    ds_train = create_dataset(ds_train_path, 32)
    data_size = ds_train.get_dataset_size()
    epochs = 10
    sink_process = ms.data_sink(train_step, ds_train, sink_size=data_size, jit_config=jit)
    for _ in range(epochs):
        loss = sink_process()
        print(f"epoch {_ + 1}, loss is {loss}")

    print("3. Execute multiple epoch data per sink")
    ds_train = create_dataset(ds_train_path, 32)
    data_size = ds_train.get_dataset_size()
    epochs = 10
    sink_process = ms.data_sink(train_step, ds_train, sink_size=epochs*data_size, jit_config=jit)
    loss = sink_process()
    print(f"loss is {loss}")

代码中分别使用3种调用方式训练10个epoch。

  1. 默认行为,每次下沉1个step的数据,每个step结束后返回loss,训练10个epoch需要在Host侧循环调用ds_train.get_dataset_size() * 10次;

  2. 每次下沉1个epoch的数据,每个epoch执行结束后返回loss,训练10个epoch需要在Host侧循环调用10次;

  3. 单次下沉10个epoch的数据,10个epoch执行结束后返回loss,无需在Host侧进行循环。

上述方法中,方法1在每个step结束后与Device进行一次交互,效率较低;方法3在训练中不需要与Device进行交互,执行效率最高,但只能返回最后的一个step的loss。