文档反馈

问题文档片段

问题文档片段包含公式时,显示为空格。

提交类型
issue

有点复杂...

找人问问吧。

PR

小问题,全程线上修改...

一键搞定!

请选择提交类型

问题类型
规范和低错类

- 规范和低错类:

- 错别字或拼写错误,标点符号使用错误、公式错误或显示异常。

- 链接错误、空单元格、格式错误。

- 英文中包含中文字符。

- 界面和描述不一致,但不影响操作。

- 表述不通顺,但不影响理解。

- 版本号不匹配:如软件包名称、界面版本号。

易用性

- 易用性:

- 关键步骤错误或缺失,无法指导用户完成任务。

- 缺少主要功能描述、关键词解释、必要前提条件、注意事项等。

- 描述内容存在歧义指代不明、上下文矛盾。

- 逻辑不清晰,该分类、分项、分步骤的没有给出。

正确性

- 正确性:

- 技术原理、功能、支持平台、参数类型、异常报错等描述和软件实现不一致。

- 原理图、架构图等存在错误。

- 命令、命令参数等错误。

- 代码片段错误。

- 命令无法完成对应功能。

- 界面错误,无法指导操作。

- 代码样例运行报错、运行结果不符。

风险提示

- 风险提示:

- 对重要数据或系统存在风险的操作,缺少安全提示。

内容合规

- 内容合规:

- 违反法律法规,涉及政治、领土主权等敏感词。

- 内容侵权。

请选择问题类型

问题描述

点击输入详细问题描述,以帮助我们快速定位问题。

数据迭代

下载Notebook下载样例代码查看源文件

原始数据集通过数据集加载接口读取到内存,再通过数据增强操作进行数据变换,最后得到的数据集对象有两种常规的数据迭代方法:

  1. 创建iterator迭代器进行数据迭代。

  2. 数据直接传入网络模型的Model接口(如model.trainmodel.eval等)进行迭代训练或推理。

创建迭代器

数据集对象通常可以创建两种不同的迭代器来遍历数据,分别为:

  1. 元组迭代器。创建元组迭代器的接口为create_tuple_iterator,通常用于Model.train内部使用,其迭代出来的数据可以直接用于训练。

  2. 字典迭代器。创建字典迭代器的接口为create_dict_iterator,自定义train训练模式下,用户可以根据字典中的key进行进一步的数据处理操作,再输入到网络中,使用较为灵活。

下面通过示例介绍两种迭代器的使用方式:

[1]:
import mindspore.dataset as ds

# 数据集
np_data = [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]

# 加载数据
dataset = ds.NumpySlicesDataset(np_data, column_names=["data"], shuffle=False)

然后使用create_tuple_iterator或者create_dict_iterator创建数据迭代器。

[2]:
# 创建元组迭代器
print("\n create tuple iterator")
for item in dataset.create_tuple_iterator():
    print("item:\n", item[0])

# 创建字典迭代器
print("\n create dict iterator")
for item in dataset.create_dict_iterator():
    print("item:\n", item["data"])

# 直接遍历数据集对象(等同于创建元组迭代器)
print("\n iterate dataset object directly")
for item in dataset:
    print("item:\n", item[0])

# 使用enumerate方式遍历(等同于创建元组迭代器)
print("\n iterate dataset using enumerate")
for index, item in enumerate(dataset):
    print("index: {}, item:\n {}".format(index, item[0]))
 create tuple iterator
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

 create dict iterator
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

 iterate dataset object directly
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

 iterate dataset using enumerate
index: 0, item:
 [[1 2]
 [3 4]]
index: 1, item:
 [[5 6]
 [7 8]]

如果需要产生多个epoch的数据,可以相应地调整入参num_epochs的取值。相比于多次调用迭代器接口,直接设置epoch数可以提高数据迭代的性能。

[3]:
epoch = 2  # 创建元组迭代器产生2个epoch的数据

iterator = dataset.create_tuple_iterator(num_epochs=epoch)

for i in range(epoch):
    print("epoch: ", i)
    for item in iterator:
        print("item:\n", item[0])
epoch:  0
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]
epoch:  1
item:
 [[1 2]
 [3 4]]
item:
 [[5 6]
 [7 8]]

迭代器默认输出的数据类型为mindspore.Tensor,如果希望得到numpy.ndarray类型的数据,可以设置入参output_numpy=True

[4]:
# 默认输出类型为mindspore.Tensor
for item in dataset.create_tuple_iterator():
    print("dtype: ", type(item[0]), "\nitem:\n", item[0])

# 设置输出类型为numpy.ndarray
for item in dataset.create_tuple_iterator(output_numpy=True):
    print("dtype: ", type(item[0]), "\nitem:\n", item[0])
dtype:  <class 'mindspore.common.tensor.Tensor'>
item:
 [[1 2]
 [3 4]]
dtype:  <class 'mindspore.common.tensor.Tensor'>
item:
 [[5 6]
 [7 8]]
dtype:  <class 'numpy.ndarray'>
item:
 [[1 2]
 [3 4]]
dtype:  <class 'numpy.ndarray'>
item:
 [[5 6]
 [7 8]]

在训练网络时使用迭代器

下面我们通过一个拟合线性函数的场景,介绍在训练网络时如何使用数据迭代器,线性函数表达式为:

output=x0×1+x1×2+x2×3+···+x7×8

其函数定义如下:

[5]:
def func(x):
    """定义线性函数表达式"""
    result = []
    for sample in x:
        total = 0
        for i, e in enumerate(sample):
            total += (i+1) * e
        result.append(total)
    return result

使用上面的线性函数构造自定义训练数据集和验证数据集。在构造自定义训练数据集时需要注意,上述线性函数表达式有8个未知数,把训练数据集的数据带入上述线性函数得出8个线性无关方程,通过解方程即可得出未知数的值。

[6]:
import numpy as np

class MyTrainData:
    """自定义训练用数据集类"""
    def __init__(self):
        """初始化操作"""
        self.__data = np.array([[[1, 1, 1, 1, 1, 1, 1, 1]],
                                [[1, 1, 1, 1, 1, 1, 1, 0]],
                                [[1, 1, 1, 1, 1, 1, 0, 0]],
                                [[1, 1, 1, 1, 1, 0, 0, 0]],
                                [[1, 1, 1, 1, 0, 0, 0, 0]],
                                [[1, 1, 1, 0, 0, 0, 0, 0]],
                                [[1, 1, 0, 0, 0, 0, 0, 0]],
                                [[1, 0, 0, 0, 0, 0, 0, 0]]]).astype(np.float32)
        self.__label = np.array([func(x) for x in self.__data]).astype(np.float32)

    def __getitem__(self, index):
        """定义随机访问函数"""
        return self.__data[index], self.__label[index]

    def __len__(self):
        """定义获取数据集大小函数"""
        return len(self.__data)

class MyEvalData:
    """自定义验证用数据集类"""
    def __init__(self):
        """初始化操作"""
        self.__data = np.array([[[1, 2, 3, 4, 5, 6, 7, 8]],
                                [[1, 1, 1, 1, 1, 1, 1, 1]],
                                [[8, 7, 6, 5, 4, 3, 2, 1]]]).astype(np.float32)
        self.__label = np.array([func(x) for x in self.__data]).astype(np.float32)

    def __getitem__(self, index):
        """定义随机访问函数"""
        return self.__data[index], self.__label[index]

    def __len__(self):
        """定义获取数据集大小函数"""
        return len(self.__data)

下面我们使用mindspore.nn.Dense创建自定义网络,网络的输入为8×1的矩阵。

[7]:
import mindspore.nn as nn
from mindspore.common.initializer import Normal

class MyNet(nn.Cell):
    """自定义网络"""
    def __init__(self):
        super(MyNet, self).__init__()
        self.fc = nn.Dense(8, 1, weight_init=Normal(0.02))

    def construct(self, x):
        x = self.fc(x)
        return x

自定义网络训练,代码如下:

[8]:
import mindspore.dataset as ds
import mindspore as ms
from mindspore import amp

def train(dataset, net, optimizer, loss, epoch):
    """自定义训练过程"""
    print("--------- Train ---------")

    train_network = amp.build_train_network(net, optimizer, loss)
    for i in range(epoch):
        # 使用数据迭代器获取数据
        for item in dataset.create_dict_iterator():
            data = item["data"]
            label = item["label"]
            loss = train_network(data, label)

        # 每5个epoch打印一次
        if i % 5 == 0:
            print("epoch:{}, loss: {}".format(i, loss))

dataset = ds.GeneratorDataset(MyTrainData(), ["data", "label"], shuffle=True)  # 定义数据集

epoch = 40                                                  # 定义训练轮次
net = MyNet()                                               # 定义网络
loss = nn.MSELoss(reduction="mean")                         # 定义损失函数
optimizer = nn.Momentum(net.trainable_params(), 0.01, 0.9)  # 定义优化器

# 开始训练
train(dataset, net, optimizer, loss, epoch)
--------- Train ---------
epoch:0, loss: 117.58063
epoch:5, loss: 0.28427964
epoch:10, loss: 0.02881975
epoch:15, loss: 0.050988887
epoch:20, loss: 0.0087212445
epoch:25, loss: 0.040158965
epoch:30, loss: 0.010140566
epoch:35, loss: 0.00040051914

从上面的打印结果可以看出,随着训练次数逐渐增多,损失值趋于收敛。接下来我们使用上面训练好的网络进行推理,并打印预测值与目标值。

[9]:
def eval(net, data):
    """自定义推理过程"""
    print("--------- Eval ---------")

    for item in data:
        predict = net(ms.Tensor(item[0]))[0]
        print("predict: {:7.3f}, label: {:7.3f}".format(predict.asnumpy()[0], item[1][0]))

# 开始推理
eval(net, MyEvalData())
--------- Eval ---------
predict: 203.996, label: 204.000
predict:  36.012, label:  36.000
predict: 116.539, label: 120.000

从上面的打印结果可以看出,推理结果较为准确。

更多关于数据迭代器的使用说明,请参考create_tuple_iteratorcreate_dict_iterator的API文档。

数据迭代训练

数据集对象创建后,可通过传入Model接口,由接口内部进行数据迭代,并送入网络执行训练或推理。实例代码如下:

[ ]:
import numpy as np
import mindspore as ms
from mindspore import ms_function
from mindspore import nn
import mindspore.dataset as ds
import mindspore.ops as ops

def create_dataset():
    """创建自定义数据集"""
    np_data = [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
    np_data = np.array(np_data, dtype=np.float16)
    dataset = ds.NumpySlicesDataset(np_data, column_names=["data"], shuffle=False)
    return dataset

class Net(nn.Cell):
    """创建一个神经网络"""
    def __init__(self):
        super(Net, self).__init__()
        self.relu = ops.ReLU()
        self.print = ops.Print()

    @ms_function
    def construct(self, x):
        self.print(x)
        return self.relu(x)

dataset = create_dataset()

network = Net()
model = ms.Model(network)

# 数据集传入model中,train接口进行数据迭代处理
model.train(epoch=1, train_dataset=dataset)