 "cells": [
   "cell_type": "markdown",
   "id": "597c6032",
   "metadata": {},
   "source": [
    "# 数据处理管道支持Python对象\n",
    "[![下载Notebook](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_notebook.svg)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/master/zh_cn/model_train/dataset/mindspore_python_objects.ipynb) [![下载样例代码](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_download_code.svg)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/master/zh_cn/model_train/dataset/mindspore_python_objects.py) [![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg)](https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/dataset/python_objects.ipynb)\n",
    "## 构造Python字典到数据处理管道\n",
    "1. 自定义数据集`GeneratorDataset`,用户将组织好的字典以返回值的形式输入到数据处理管道中。\n",
    "2. 自定义`map`增强操作,用户可以定义Python可调用对象,在该对象中返回字典数据。\n",
    "3. 自定义`batch(per_batch_map=...)`操作,用户在`batch`操作的`per_batch_map`中处理并返回字典数据。\n",
    "### 自定义数据集`GeneratorDataset`处理字典对象\n",
   "cell_type": "code",
   "execution_count": 1,
   "id": "4bb33742",
   "metadata": {},
   "outputs": [
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'col1': {'number': array(0), 'square': array(0)}, 'col2': array(0, dtype=int64)}\n",
      "{'col1': {'number': array(1), 'square': array(1)}, 'col2': array(1, dtype=int64)}\n",
      "{'col1': {'number': array(2), 'square': array(4)}, 'col2': array(2, dtype=int64)}\n",
      "{'col1': {'number': array(3), 'square': array(9)}, 'col2': array(3, dtype=int64)}\n",
      "{'col1': {'number': array(4), 'square': array(16)}, 'col2': array(4, dtype=int64)}"
   "source": [
    "import mindspore.dataset as ds\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {\"number\": i, \"square\": i ** 2}\n",
    "        col2 = i\n",
    "        yield col1, col2\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\", \"col2\"])\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   "cell_type": "markdown",
   "id": "2d36dd94",
   "metadata": {},
   "source": [
    "### 自定义`map`增强操作处理字典对象\n",
    "> 除了用户自定义函数以外,现有的数据处理管道变换操作(`mindspore.dataset.transforms`、`mindspore.dataset.vision`等)均不支持字典类型的输入。\n",
   "cell_type": "code",
   "execution_count": 2,
   "id": "825e3e53",
   "metadata": {},
   "outputs": [
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'col1': {'original_col1': array(0), 'square': array(0)}}\n",
      "{'col1': {'original_col1': array(1), 'square': array(1)}}\n",
      "{'col1': {'original_col1': array(2), 'square': array(4)}}\n",
      "{'col1': {'original_col1': array(3), 'square': array(9)}}\n",
      "{'col1': {'original_col1': array(4), 'square': array(16)}}"
   "source": [
    "import mindspore.dataset as ds\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        yield i\n",
    "def my_pyfunc(col1):\n",
    "    new_col1 = {\"original_col1\": col1, \"square\": col1 ** 2}\n",
    "    return new_col1\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\"])\n",
    "data = data.map(operations=my_pyfunc, input_columns=[\"col1\"])\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   "cell_type": "markdown",
   "id": "d3ed8cea",
   "metadata": {},
   "source": [
    "### `batch`操作处理字典对象\n",
   "cell_type": "code",
   "execution_count": 3,
   "id": "01cd1fda",
   "metadata": {},
   "outputs": [
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ">>> before batch\n",
      "{'col1': {'powers': array([0, 0, 0])}, 'col2': array(0, dtype=int64)}\n",
      "{'col1': {'powers': array([1, 1, 1])}, 'col2': array(1, dtype=int64)}\n",
      "{'col1': {'powers': array([2, 4, 8])}, 'col2': array(2, dtype=int64)}\n",
      "{'col1': {'powers': array([3, 9, 27])}, 'col2': array(3, dtype=int64)}\n",
      "{'col1': {'powers': array([4, 16, 64])}, 'col2': array(4, dtype=int64)}\n",
      ">>> after batch\n",
      "{'col1': {'powers': array([[0,  0,  0],\n",
      "                           [1,  1,  1],\n",
      "                           [2,  4,  8],\n",
      "                           [3,  9, 27],\n",
      "                           [4, 16, 64]])},\n",
      " 'col2': array([0, 1, 2, 3, 4], dtype=int64)}"
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {\"nested_dict\": {\"powers\": np.power(i, [1, 2, 3])}}\n",
    "        col2 = i\n",
    "        yield (col1, col2)\n",
    "def my_pyfunc(col1):\n",
    "    assert isinstance(col1, dict)\n",
    "    new_col1 = col1[\"nested_dict\"]\n",
    "    return new_col1\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\", \"col2\"])\n",
    "data = data.map(operations=my_pyfunc, input_columns=[\"col1\"])\n",
    "print(\">>> before batch\")\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)\n",
    "data = data.batch(batch_size=5)\n",
    "print(\">>> after batch\")\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   "cell_type": "markdown",
   "id": "3d160789",
   "metadata": {},
   "source": [
   "cell_type": "code",
   "execution_count": 4,
   "id": "036c365b",
   "metadata": {},
   "outputs": [
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'col1': {'original_col1': [array(0), array(1), array(2)], 'index': array([0, 1, 2])}, 'col2': {'copied_col1': [array(0), array(1), array(2)]}}\n",
      "{'col1': {'original_col1': [array(3), array(4), array(5)], 'index': array([0, 1, 2])}, 'col2': {'copied_col1': [array(3), array(4), array(5)]}}\n",
      "{'col1': {'original_col1': [array(6), array(7), array(8)], 'index': array([0, 1, 2])}, 'col2': {'copied_col1': [array(6), array(7), array(8)]}}"
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "def my_generator():\n",
    "    for i in range(9):\n",
    "        yield i\n",
    "def my_per_batch_map(col1, batch_info):\n",
    "    new_col1 = {\"original_col1\": col1, \"index\": np.arange(3)}\n",
    "    new_col2 = {\"copied_col1\": col1}\n",
    "    return new_col1, new_col2\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\"])\n",
    "data = data.batch(batch_size=3, per_batch_map=my_per_batch_map, output_columns=[\"col1\", \"col2\"])\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   "cell_type": "markdown",
   "id": "579c6d5d",
   "metadata": {},
   "source": [
    "## 从数据处理管道中获取Python字典\n",
   "cell_type": "code",
   "execution_count": 5,
   "id": "9c5b8055",
   "metadata": {},
   "outputs": [
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ">>> Iter dataset with converting all data to Tensor\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 0)}, 'col2': Tensor(shape=[], dtype=Int64, value= 0)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 1)}, 'col2': Tensor(shape=[], dtype=Int64, value= 1)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 2)}, 'col2': Tensor(shape=[], dtype=Int64, value= 2)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 3)}, 'col2': Tensor(shape=[], dtype=Int64, value= 3)}\n",
      "{'col1': {'my_data': Tensor(shape=[], dtype=Int64, value= 4)}, 'col2': Tensor(shape=[], dtype=Int64, value= 4)}\n",
      ">>> Iter dataset with converting all data to Numpy\n",
      "{'col1': {'my_data': array(0)}, 'col2': array(0, dtype=int64)}\n",
      "{'col1': {'my_data': array(1)}, 'col2': array(1, dtype=int64)}\n",
      "{'col1': {'my_data': array(2)}, 'col2': array(2, dtype=int64)}\n",
      "{'col1': {'my_data': array(3)}, 'col2': array(3, dtype=int64)}\n",
      "{'col1': {'my_data': array(4)}, 'col2': array(4, dtype=int64)}"
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {\"my_data\": np.array(i)}\n",
    "        col2 = i\n",
    "        yield col1, col2\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=[\"col1\", \"col2\"])\n",
    "print(\">>> Iter dataset with converting all data to Tensor\")\n",
    "for d in data.create_dict_iterator(num_epochs=1):\n",
    "    print(d)\n",
    "print(\">>> Iter dataset with converting all data to Numpy\")\n",
    "for d in data.create_dict_iterator(num_epochs=1, output_numpy=True):\n",
    "    print(d)"
   "cell_type": "markdown",
   "id": "206ba003",
   "metadata": {},
   "source": [
    "- 在[数据下沉](https://www.mindspore.cn/docs/zh-CN/master/model_train/train_process/optimize/sink_mode.html)模式下,由于数据下沉通道当前无法支持字典类型的数据,字典类型的数据发送到下沉通道会造成错误。因此可以考虑关闭数据下沉模式(`dataset_sink_mode=False`),或在最后一个数据处理节点将字典类型的数据展开为列表或元组类型的数据,例如:"
   "cell_type": "code",
   "execution_count": 6,
   "id": "f5cf12af",
   "metadata": {},
   "outputs": [
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ">>> get data in dict type\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 0), 'my_data2': Tensor(shape=[], dtype=Int64, value= 1)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 1), 'my_data2': Tensor(shape=[], dtype=Int64, value= 2)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 2), 'my_data2': Tensor(shape=[], dtype=Int64, value= 3)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 3), 'my_data2': Tensor(shape=[], dtype=Int64, value= 4)}]\n",
      "[{'my_data': Tensor(shape=[], dtype=Int64, value= 4), 'my_data2': Tensor(shape=[], dtype=Int64, value= 5)}]\n",
      ">>> get data in sequence type\n",
      "[Tensor(shape=[], dtype=Int64, value= 0), Tensor(shape=[], dtype=Int64, value= 1)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 1), Tensor(shape=[], dtype=Int64, value= 2)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 2), Tensor(shape=[], dtype=Int64, value= 3)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 3), Tensor(shape=[], dtype=Int64, value= 4)]\n",
      "[Tensor(shape=[], dtype=Int64, value= 4), Tensor(shape=[], dtype=Int64, value= 5)]"
   "source": [
    "import numpy as np\n",
    "import mindspore.dataset as ds\n",
    "def my_generator():\n",
    "    for i in range(5):\n",
    "        col1 = {'my_data': np.array(i), 'my_data2': np.array(i + 1)}\n",
    "        yield col1\n",
    "data = ds.GeneratorDataset(source=my_generator, column_names=['col1'])\n",
    "print('>>> get data in dict type')\n",
    "for d in data:\n",
    "    print(d)\n",
    "def dict_to_tuple(d):\n",
    "    return tuple([i for i in d.values()])\n",
    "# flatten the dict object bedfore it passed into network\n",
    "data = data.map(dict_to_tuple, input_columns=['col1'], output_columns=['my_data', 'my_data2'])\n",
    "print('>>> get data in sequence type')\n",
    "for d in data:\n",
    "    print(d)"
   "cell_type": "markdown",
   "id": "56b10637",
   "metadata": {},
   "source": [
    "- 在非数据下沉模式下,此特性没有使用限制,只需注意字典中存储的类型是否能够被模型识别和处理。"
 "metadata": {
  "kernelspec": {
   "display_name": "MindSpore",
   "language": "python",
   "name": "mindspore"
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.5"
 "nbformat": 4,
 "nbformat_minor": 5