{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "597c6032",
   "metadata": {},
   "source": [
    "# 数据处理管道支持Python对象\n",
    "\n",
    "[![下载Notebook](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_notebook.svg)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/master/zh_cn/model_train/dataset/mindspore_python_objects.ipynb) [![下载样例代码](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_download_code.svg)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/master/zh_cn/model_train/dataset/mindspore_python_objects.py) [![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg)](https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/dataset/python_objects.ipynb)\n",
    "\n",
    "数据处理管道中的特定操作(如自定义数据集`GeneratorDataset`、自定义`map`增强操作、自定义`batch(per_batch_map=...)`)支持任意Python类型对象作为输入。\n",
    "\n",
    "为了支持此特性,MindSpore在数据管道中使用了Python(`dict`)字典去管理不同类型的对象。与其他类型相比,Python字典类型在数据管道中不会被转换成C++中的类型,而是以引用的形式保留在数据管道中。\n",
    "\n",
    "注意,虽然目前数据管道只新增了识别字典类型的对象,但并不限制字典中的对象内容。因此可以将其他Python类型封装进字典中,再传入到数据处理管道中,以达到支持任意Python对象的目的。因此本教程主要介绍如何构造字典类型的数据,输入到数据管道,并在迭代器中取得数据。\n",
    "\n",
    "## 构造Python字典到数据处理管道\n",
    "\n",
    "将字典输入到数据处理管道中,可通过以下几个操作实现:\n",
    "\n",
    "1. 自定义数据集`GeneratorDataset`,用户将组织好的字典以返回值的形式输入到数据处理管道中。\n",
    "2. 自定义`map`增强操作,用户可以定义Python可调用对象,在该对象中返回字典数据。\n",
    "3. 自定义`batch(per_batch_map=...)`操作,用户在`batch`操作的`per_batch_map`中处理并返回字典数据。\n",
    "\n",
    "### 自定义数据集`GeneratorDataset`处理字典对象\n",
    "\n",
    "下面这个例子展示了如何通过`GeneratorDataset`将字典对象传送到数据处理管道。\n",
    "\n",
    "示例中的`my_generator`返回了2个元素,分别对应2个数据列,其中字典被视为第一列`col1`。数据处理管道的规则一般会检查返回值是否可以被转换为NumPy类型,但若返回值为字典则会例外,且字典中存储的元素没有限制(包括键/值的数量和类型)。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "4bb33742",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'col1': {'number': array(0), 'square': array(0)}, 'col2': array(0, dtype=int64)}\n",
      "{'col1': {'number': array(1), 'square': array(1)}, 'col2': array(1, dtype=int64)}\n",
      "{'col1': {'number': array(2), 'square': array(4)}, 'col2': array(2, dtype=int64)}\n",
      "{'col1': {'number': array(3), 'square': array(9)}, 'col2': array(3, dtype=int64)}\n",
      "{'col1': {'number': array(4), 'square': array(16)}, 'col2': array(4, dtype=int64)}"
     ]
    }
   ],
   "source": [
    "import mindspore.dataset as ds\n",
    "\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {\"number\": i, \"square\": i ** 2}\n",
    "        col2 = i\n",
    "        yield col1, col2\n",
    "\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\", \"col2\"])\n",
    "\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2d36dd94",
   "metadata": {},
   "source": [
    "### 自定义`map`增强操作处理字典对象\n",
    "\n",
    "与`GeneratorDataset`相同,每个字典对象被看作一个数据列,且其中的元素没有限制。\n",
    "\n",
    "> 除了用户自定义函数以外,现有的数据处理管道变换操作(`mindspore.dataset.transforms`、`mindspore.dataset.vision`等)均不支持字典类型的输入。\n",
    "\n",
    "这个例子说明如何通过`map`操作和自定义Python方法,将字典类型加入到数据处理管道中:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "825e3e53",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'col1': {'original_col1': array(0), 'square': array(0)}}\n",
      "{'col1': {'original_col1': array(1), 'square': array(1)}}\n",
      "{'col1': {'original_col1': array(2), 'square': array(4)}}\n",
      "{'col1': {'original_col1': array(3), 'square': array(9)}}\n",
      "{'col1': {'original_col1': array(4), 'square': array(16)}}"
     ]
    }
   ],
   "source": [
    "import mindspore.dataset as ds\n",
    "\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        yield i\n",
    "\n",
    "def my_pyfunc(col1):\n",
    "    new_col1 = {\"original_col1\": col1, \"square\": col1 ** 2}\n",
    "    return new_col1\n",
    "\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\"])\n",
    "data = data.map(operations=my_pyfunc, input_columns=[\"col1\"])\n",
    "\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d3ed8cea",
   "metadata": {},
   "source": [
    "### `batch`操作处理字典对象\n",
    "\n",
    "当对数据集使用`batch`操作时,如果有一个数据列中包含有字典对象,数据处理管道会将多组样本中的字典的相同键组合在一起。因此对数据进行`batch`操作前,确保所有的字典对象都必须具有相同的键。\n",
    "\n",
    "`batch`操作的结果(对于该列)也将是一个字典,其中所有值都是NumPy数组。如果这种转换产生了`np.object_`类型的数组,由于模型训练侧的限制,将向用户显示一条错误消息并且终止数据处理管道。\n",
    "\n",
    "下面展示了当数据管道中存在Python字典时,`batch`操作是如何把字典中\"power\"键的元素组合起来的。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "01cd1fda",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ">>> before batch\n",
      "{'col1': {'powers': array([0, 0, 0])}, 'col2': array(0, dtype=int64)}\n",
      "{'col1': {'powers': array([1, 1, 1])}, 'col2': array(1, dtype=int64)}\n",
      "{'col1': {'powers': array([2, 4, 8])}, 'col2': array(2, dtype=int64)}\n",
      "{'col1': {'powers': array([3, 9, 27])}, 'col2': array(3, dtype=int64)}\n",
      "{'col1': {'powers': array([4, 16, 64])}, 'col2': array(4, dtype=int64)}\n",
      ">>> after batch\n",
      "{'col1': {'powers': array([[0,  0,  0],\n",
      "                           [1,  1,  1],\n",
      "                           [2,  4,  8],\n",
      "                           [3,  9, 27],\n",
      "                           [4, 16, 64]])},\n",
      " 'col2': array([0, 1, 2, 3, 4], dtype=int64)}"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {\"nested_dict\": {\"powers\": np.power(i, [1, 2, 3])}}\n",
    "        col2 = i\n",
    "        yield (col1, col2)\n",
    "\n",
    "def my_pyfunc(col1):\n",
    "    assert isinstance(col1, dict)\n",
    "    new_col1 = col1[\"nested_dict\"]\n",
    "    return new_col1\n",
    "\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\", \"col2\"])\n",
    "data = data.map(operations=my_pyfunc, input_columns=[\"col1\"])\n",
    "\n",
    "print(\">>> before batch\")\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)\n",
    "\n",
    "data = data.batch(batch_size=5)\n",
    "\n",
    "print(\">>> after batch\")\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3d160789",
   "metadata": {},
   "source": [
    "如果用户提供了`per_batch_map`函数,字典中的对应元素将根据键分组到Python列表中。这个例子说明如何通过`batch`操作和`per_batch_map`方法,将字典类型加入到数据处理管道中:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "036c365b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'col1': {'original_col1': [array(0), array(1), array(2)], 'index': array([0, 1, 2])}, 'col2': {'copied_col1': [array(0), array(1), array(2)]}}\n",
      "{'col1': {'original_col1': [array(3), array(4), array(5)], 'index': array([0, 1, 2])}, 'col2': {'copied_col1': [array(3), array(4), array(5)]}}\n",
      "{'col1': {'original_col1': [array(6), array(7), array(8)], 'index': array([0, 1, 2])}, 'col2': {'copied_col1': [array(6), array(7), array(8)]}}"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "\n",
    "def my_generator():\n",
    "    for i in range(9):\n",
    "        yield i\n",
    "\n",
    "def my_per_batch_map(col1, batch_info):\n",
    "    new_col1 = {\"original_col1\": col1, \"index\": np.arange(3)}\n",
    "    new_col2 = {\"copied_col1\": col1}\n",
    "    return new_col1, new_col2\n",
    "\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\"])\n",
    "data = data.batch(batch_size=3, per_batch_map=my_per_batch_map, output_columns=[\"col1\", \"col2\"])\n",
    "\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "579c6d5d",
   "metadata": {},
   "source": [
    "## 从数据处理管道中获取Python字典\n",
    "\n",
    "直接迭代数据集对象即可获得字典类型的数据。当使用迭代器获取数据时,数据处理管道会尝试将字典对象中的所有值转成Tensor类型(如果`output_numpy`设置为`True`,则将转为NumPy类型)。\n",
    "\n",
    "注意,上述类型转换操作是递归进行的,即应用于嵌套字典内的所有值以及列表和元组内的所有元素。无法被转成NumPy数组/Tensor类型的对象(例如类对象)会被直接传入到模型,若模型无法处理该对象类型将会报错。\n",
    "\n",
    "下面的例子展示了通过迭代器获取字典数据或其他数据。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "9c5b8055",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ">>> Iter dataset with converting all data to Tensor\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 0)}, 'col2': Tensor(shape=[], dtype=Int64, value= 0)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 1)}, 'col2': Tensor(shape=[], dtype=Int64, value= 1)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 2)}, 'col2': Tensor(shape=[], dtype=Int64, value= 2)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 3)}, 'col2': Tensor(shape=[], dtype=Int64, value= 3)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 4)}, 'col2': Tensor(shape=[], dtype=Int64, value= 4)}\n",
      ">>> Iter dataset with converting all data to Numpy\n",
      "{'col1': {'my_data': array(0)}, 'col2': array(0, dtype=int64)}\n",
      "{'col1': {'my_data': array(1)}, 'col2': array(1, dtype=int64)}\n",
      "{'col1': {'my_data': array(2)}, 'col2': array(2, dtype=int64)}\n",
      "{'col1': {'my_data': array(3)}, 'col2': array(3, dtype=int64)}\n",
      "{'col1': {'my_data': array(4)}, 'col2': array(4, dtype=int64)}"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {\"my_data\": np.array(i)}\n",
    "        col2 = i\n",
    "        yield col1, col2\n",
    "\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\", \"col2\"])\n",
    "\n",
    "print(\">>> Iter dataset with converting all data to Tensor\")\n",
    "for d in data.create_dict_iterator(num_epochs=1):\n",
    "    print(d)\n",
    "\n",
    "print(\">>> Iter dataset with converting all data to Numpy\")\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "206ba003",
   "metadata": {},
   "source": [
    "在模型训练/推理场景,从数据管道获取字典类型数据时有以下注意事项:\n",
    "\n",
    "- 在[数据下沉](https://www.mindspore.cn/docs/zh-CN/master/model_train/train_process/optimize/sink_mode.html)模式下,由于数据下沉通道当前无法支持字典类型的数据,字典类型的数据发送到下沉通道会造成错误。因此可以考虑关闭数据下沉模式(`dataset_sink_mode=False`),或在最后一个数据处理节点将字典类型的数据展开为列表或元组类型的数据,例如:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "f5cf12af",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ">>> get data in dict type\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 0), 'my_data2': Tensor(shape=[], dtype=Int64, value= 1)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 1), 'my_data2': Tensor(shape=[], dtype=Int64, value= 2)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 2), 'my_data2': Tensor(shape=[], dtype=Int64, value= 3)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 3), 'my_data2': Tensor(shape=[], dtype=Int64, value= 4)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 4), 'my_data2': Tensor(shape=[], dtype=Int64, value= 5)}]\n",
      ">>> get data in sequence type\n",
      "[Tensor(shape=[], dtype=Int64, value= 0), Tensor(shape=[], dtype=Int64, value= 1)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 1), Tensor(shape=[], dtype=Int64, value= 2)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 2), Tensor(shape=[], dtype=Int64, value= 3)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 3), Tensor(shape=[], dtype=Int64, value= 4)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 4), Tensor(shape=[], dtype=Int64, value= 5)]"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {'my_data': np.array(i), 'my_data2': np.array(i + 1)}\n",
    "        yield col1\n",
    "\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=['col1'])\n",
    "\n",
    "print('>>> get data in dict type')\n",
    "for d in data:\n",
    "    print(d)\n",
    "\n",
    "def dict_to_tuple(d):\n",
    "    return tuple([i for i in d.values()])\n",
    "\n",
    "# flatten the dict object bedfore it passed into network\n",
    "data = data.map(dict_to_tuple, input_columns=['col1'], output_columns=['my_data', 'my_data2'])\n",
    "\n",
    "print('>>> get data in sequence type')\n",
    "for d in data:\n",
    "    print(d)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "56b10637",
   "metadata": {},
   "source": [
    "- 在非数据下沉模式下,此特性没有使用限制,只需注意字典中存储的类型是否能够被模型识别和处理。"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "MindSpore",
   "language": "python",
   "name": "mindspore"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}