数据迭代

下载Notebook下载样例代码查看源文件

原始数据集通过数据集加载接口读取到内存,再通过数据增强操作进行数据变换,最后得到的数据集对象有两种常规的数据迭代方法:

  1. 创建iterator迭代器进行数据迭代。

  2. 数据直接传入网络模型的Model接口(如model.trainmodel.eval等)进行迭代训练或推理。

创建迭代器

数据集对象通常可以创建两种不同的迭代器来遍历数据,分别为:

  1. 元组迭代器。创建元组迭代器的接口为create_tuple_iterator,通常用于Model.train内部使用,其迭代出来的数据可以直接用于训练。

  2. 字典迭代器。创建字典迭代器的接口为create_dict_iterator,自定义train训练模式下,用户可以根据字典中的key进行进一步的数据处理操作,再输入到网络中,使用较为灵活。

下面通过示例介绍两种迭代器的使用方式:

[1]:
import mindspore.dataset as ds

# 数据集
np_data = [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]

# 加载数据
dataset = ds.NumpySlicesDataset(np_data, column_names=["data"], shuffle=False)

然后使用create_tuple_iterator或者create_dict_iterator创建数据迭代器。

[2]:
# 创建元组迭代器
print("\n create tuple iterator")
for item in dataset.create_tuple_iterator():
    print("item:\n", item[0])

# 创建字典迭代器
print("\n create dict iterator")
for item in dataset.create_dict_iterator():
    print("item:\n", item["data"])

# 直接遍历数据集对象(等同于创建元组迭代器)
print("\n iterate dataset object directly")
for item in dataset:
    print("item:\n", item[0])

# 使用enumerate方式遍历(等同于创建元组迭代器)
print("\n iterate dataset using enumerate")
for index, item in enumerate(dataset):
    print("index: {}, item:\n {}".format(index, item[0]))

 create tuple iterator
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

 create dict iterator
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

 iterate dataset object directly
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

 iterate dataset using enumerate
index: 0, item:
 [[1 2]
 [3 4]]
index: 1, item:
 [[5 6]
 [7 8]]

如果需要产生多个epoch的数据,可以相应地调整入参num_epochs的取值。相比于多次调用迭代器接口,直接设置epoch数可以提高数据迭代的性能。

[3]:
epoch = 2  # 创建元组迭代器产生2个epoch的数据

iterator = dataset.create_tuple_iterator(num_epochs=epoch)

for i in range(epoch):
    print("epoch: ", i)
    for item in iterator:
        print("item:\n", item[0])
epoch:  0
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]
epoch:  1
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

迭代器默认输出的数据类型为mindspore.Tensor,如果希望得到numpy.ndarray类型的数据,可以设置入参output_numpy=True

[4]:
# 默认输出类型为mindspore.Tensor
for item in dataset.create_tuple_iterator():
    print("dtype: ", type(item[0]), "\nitem:\n", item[0])

# 设置输出类型为numpy.ndarray
for item in dataset.create_tuple_iterator(output_numpy=True):
    print("dtype: ", type(item[0]), "\nitem:\n", item[0])
dtype:  <class 'mindspore.common.tensor.Tensor'>
item:
 [[1 2]
 [3 4]]
dtype:  <class 'mindspore.common.tensor.Tensor'>
item:
 [[5 6]
 [7 8]]
dtype:  <class 'numpy.ndarray'>
item:
 [[1 2]
 [3 4]]
dtype:  <class 'numpy.ndarray'>
item:
 [[5 6]
 [7 8]]

在训练网络时使用迭代器

下面我们通过一个拟合线性函数的场景,介绍在训练网络时如何使用数据迭代器,线性函数表达式为:

\[output = {x_0}\times1 + {x_1}\times2 + {x_2}\times3 + ··· + {x_7}\times8\]

其函数定义如下:

[5]:
def func(x):
    """定义线性函数表达式"""
    result = []
    for sample in x:
        total = 0
        for i, e in enumerate(sample):
            total += (i+1) * e
        result.append(total)
    return result

使用上面的线性函数构造自定义训练数据集和验证数据集。在构造自定义训练数据集时需要注意,上述线性函数表达式有8个未知数,把训练数据集的数据带入上述线性函数得出8个线性无关方程,通过解方程即可得出未知数的值。

[6]:
import numpy as np

class MyTrainData:
    """自定义训练用数据集类"""
    def __init__(self):
        """初始化操作"""
        self.__data = np.array([[[1, 1, 1, 1, 1, 1, 1, 1]],
                                [[1, 1, 1, 1, 1, 1, 1, 0]],
                                [[1, 1, 1, 1, 1, 1, 0, 0]],
                                [[1, 1, 1, 1, 1, 0, 0, 0]],
                                [[1, 1, 1, 1, 0, 0, 0, 0]],
                                [[1, 1, 1, 0, 0, 0, 0, 0]],
                                [[1, 1, 0, 0, 0, 0, 0, 0]],
                                [[1, 0, 0, 0, 0, 0, 0, 0]]]).astype(np.float32)
        self.__label = np.array([func(x) for x in self.__data]).astype(np.float32)

    def __getitem__(self, index):
        """定义随机访问函数"""
        return self.__data[index], self.__label[index]

    def __len__(self):
        """定义获取数据集大小函数"""
        return len(self.__data)

class MyEvalData:
    """自定义验证用数据集类"""
    def __init__(self):
        """初始化操作"""
        self.__data = np.array([[[1, 2, 3, 4, 5, 6, 7, 8]],
                                [[1, 1, 1, 1, 1, 1, 1, 1]],
                                [[8, 7, 6, 5, 4, 3, 2, 1]]]).astype(np.float32)
        self.__label = np.array([func(x) for x in self.__data]).astype(np.float32)

    def __getitem__(self, index):
        """定义随机访问函数"""
        return self.__data[index], self.__label[index]

    def __len__(self):
        """定义获取数据集大小函数"""
        return len(self.__data)

下面我们使用mindspore.nn.Dense创建自定义网络,网络的输入为8×1的矩阵。

[7]:
import mindspore.nn as nn
from mindspore.common.initializer import Normal

class MyNet(nn.Cell):
    """自定义网络"""
    def __init__(self):
        super(MyNet, self).__init__()
        self.fc = nn.Dense(8, 1, weight_init=Normal(0.02))

    def construct(self, x):
        x = self.fc(x)
        return x

自定义网络训练,代码如下:

[8]:
import mindspore.dataset as ds
import mindspore as ms
from mindspore import amp

def train(dataset, net, optimizer, loss, epoch):
    """自定义训练过程"""
    print("--------- Train ---------")

    train_network = amp.build_train_network(net, optimizer, loss)
    for i in range(epoch):
        # 使用数据迭代器获取数据
        for item in dataset.create_dict_iterator():
            data = item["data"]
            label = item["label"]
            loss = train_network(data, label)

        # 每5个epoch打印一次
        if i % 5 == 0:
            print("epoch:{}, loss: {}".format(i, loss))

dataset = ds.GeneratorDataset(MyTrainData(), ["data", "label"], shuffle=True)  # 定义数据集

epoch = 40                                                  # 定义训练轮次
net = MyNet()                                               # 定义网络
loss = nn.MSELoss(reduction="mean")                         # 定义损失函数
optimizer = nn.Momentum(net.trainable_params(), 0.01, 0.9)  # 定义优化器

# 开始训练
train(dataset, net, optimizer, loss, epoch)
--------- Train ---------
epoch:0, loss: 117.58063
epoch:5, loss: 0.28427964
epoch:10, loss: 0.02881975
epoch:15, loss: 0.050988887
epoch:20, loss: 0.0087212445
epoch:25, loss: 0.040158965
epoch:30, loss: 0.010140566
epoch:35, loss: 0.00040051914

从上面的打印结果可以看出,随着训练次数逐渐增多,损失值趋于收敛。接下来我们使用上面训练好的网络进行推理,并打印预测值与目标值。

[9]:
def eval(net, data):
    """自定义推理过程"""
    print("--------- Eval ---------")

    for item in data:
        predict = net(ms.Tensor(item[0]))[0]
        print("predict: {:7.3f}, label: {:7.3f}".format(predict.asnumpy()[0], item[1][0]))

# 开始推理
eval(net, MyEvalData())
--------- Eval ---------
predict: 203.996, label: 204.000
predict:  36.012, label:  36.000
predict: 116.539, label: 120.000

从上面的打印结果可以看出,推理结果较为准确。

更多关于数据迭代器的使用说明,请参考create_tuple_iteratorcreate_dict_iterator的API文档。

数据迭代训练

数据集对象创建后,可通过传入Model接口,由接口内部进行数据迭代,并送入网络执行训练或推理。实例代码如下:

[ ]:
import numpy as np
import mindspore as ms
from mindspore import ms_function
from mindspore import nn
import mindspore.dataset as ds
import mindspore.ops as ops

def create_dataset():
    """创建自定义数据集"""
    np_data = [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
    np_data = np.array(np_data, dtype=np.float16)
    dataset = ds.NumpySlicesDataset(np_data, column_names=["data"], shuffle=False)
    return dataset

class Net(nn.Cell):
    """创建一个神经网络"""
    def __init__(self):
        super(Net, self).__init__()
        self.relu = ops.ReLU()
        self.print = ops.Print()

    @ms_function
    def construct(self, x):
        self.print(x)
        return self.relu(x)

dataset = create_dataset()

network = Net()
model = ms.Model(network)

# 数据集传入model中,train接口进行数据迭代处理
model.train(epoch=1, train_dataset=dataset)