Source code for mindspore.train.data_sink

# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Data sink help for minddata dataset"""
from functools import wraps
import mindspore.ops as ops
from mindspore import context
from mindspore.common.dtype import pytype_to_dtype
from mindspore.common.api import jit
from mindspore.train._utils import _exec_datagraph, _get_types_and_shapes
from mindspore.train.dataset_helper import _has_dynamic_shape, _check_inputs
import mindspore.dataset as ds
from mindspore._c_expression import _set_dataset_mode_config
from mindspore.parallel._utils import _get_device_num, _need_to_full, _to_full_shapes, _get_pipeline_stages
from mindspore import _checkparam as Validator


def _init_sink_dataset(dataset, sink_size, input_signature, create_info):
    """
    Initialize data sinking
    """
    if hasattr(dataset, '__transfer_dataset__'):
        raise ValueError(f"The dataset has been used with network.")

    dataset_size = dataset.get_dataset_size()
    dataset_types, dataset_shapes = _get_types_and_shapes(dataset)
    dynamic_shape = _has_dynamic_shape(dataset_shapes) or ds.config.get_dynamic_shape()

    # create transfer_dataset
    is_info_queue = (create_info and sink_size == 1 and dataset_size != 1 and
                     input_signature is None and not dynamic_shape and
                     context.get_context('device_target') == 'Ascend')
    transfer_dataset = _exec_datagraph(dataset, sink_size, create_data_info_queue=is_info_queue)
    dataset.__transfer_dataset__ = transfer_dataset

    # send data
    transfer_dataset.send(-1)

    # create GetNext op
    if input_signature is not None:
        _check_inputs(input_signature, dataset_shapes, dataset_types)

    queue_name = transfer_dataset.queue_name
    if _need_to_full() and context.get_context('mode') == context.GRAPH_MODE:
        device_num = _get_device_num() // _get_pipeline_stages()
        dataset_shapes = _to_full_shapes(dataset_shapes, device_num)
    next_op = ops.GetNext(dataset_types, dataset_shapes, len(dataset_types), queue_name)

    _set_dataset_mode_config('sink')

    dataset.__transfer_dataset__ = transfer_dataset

    return next_op, is_info_queue


class _DataSinkAux:
    @staticmethod
    def __deepcopy__(memodict):
        return


def _get_next_op(dataset, ori_next_op, is_info_queue):
    """
    get the next operation.
    """

    if not is_info_queue:
        return ori_next_op, ''

    if not hasattr(dataset, '__sink_aux__'):
        dataset.__sink_aux__ = _DataSinkAux()
        dataset.__sink_aux__.next_ops = {}
        dataset.__sink_aux__.sink_funcs = {}

    queue_name = dataset.__transfer_dataset__.queue_name
    dataset_types, dataset_shapes = dataset.__transfer_dataset__.get_data_info()
    dataset_types = [pytype_to_dtype(x) for x in dataset_types]
    key = str(dataset_types) + str(dataset_shapes)
    if key in dataset.__sink_aux__.next_ops:
        next_op = dataset.__sink_aux__.next_ops[key]
    else:
        if _need_to_full() and context.get_context('mode') == context.GRAPH_MODE:
            device_num = _get_device_num() // _get_pipeline_stages()
            dataset_shapes = _to_full_shapes(dataset_shapes, device_num)
        next_op = ops.GetNext(dataset_types, dataset_shapes, len(dataset_types), queue_name)

    return next_op, (key, dataset_shapes, dataset_types)


def _get_sink_fun(sink_fun, key_info, is_info_queue, dataset, jit_config):
    """
    get the sink function.
    """
    if not is_info_queue:
        if not hasattr(dataset, '__sink_fun__'):
            if jit_config is None:
                dst_sink_fun = sink_fun
            else:
                dst_sink_fun = jit(sink_fun, jit_config=jit_config)
            dataset.__sink_fun__ = dst_sink_fun

        return dataset.__sink_fun__

    key = key_info[0]
    if key in dataset.__sink_aux__.sink_funcs:
        dst_sink_fun = dataset.__sink_aux__.sink_funcs[key]
    else:
        if jit_config is None:
            dst_sink_fun = sink_fun
        else:
            dst_sink_fun = jit(sink_fun, jit_config=jit_config)
        dataset.__sink_aux__.sink_funcs[key] = dst_sink_fun

    return dst_sink_fun


[docs]def data_sink(fn, dataset, sink_size=1, jit_config=None, input_signature=None): """ A wrapper function to generate a function for the input function. Note: When using data sinking, the dataset will be automatically sent in a loop, and only the step size of sinking `sink_size` needs to be considered. The default value of `sink_size` is ``1``, which means that all data will be sunk every epoch. If `sink_size` is greater than 1, the amount of data sunk per epoch will be the dataset with a size of `sink_size`. Args: fn (Function): The Python function that will be run with dataset. dataset (Dataset): The dataset iterator. The dataset can be generated by dataset generator API in :class:`mindspore.dataset`, such as :class:`mindspore.dataset.ImageFolderDataset`. sink_size (int): Control the amount of data in each sink. `sink_size` must be positive integer. Default: ``1`` . jit_config (JitConfig): Controls the execution mode(Graph mode/PyNative mode) of the generated function, and Jit config for compile. Default: ``None`` , means running in PyNative mode. input_signature (Union[Tensor, List or Tuple of Tensors]): The Tensor which describes the input arguments. The shape and dtype of the Tensor will be supplied to this function. If input_signature is specified, each input to `fn` must be a `Tensor`. And the input parameters of `fn` cannot accept `**kwargs`. The shape and dtype of actual inputs should keep the same as input_signature. Otherwise, TypeError will be raised. Default: ``None`` . Returns: Function, the generated function will be executed in data sinking mode. Raises: ValueError: If `sink_size` is not positive integer. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> import numpy as np >>> import mindspore as ms >>> from mindspore import dataset as ds >>> >>> data = {"x": np.ones((1,), dtype=np.int32), "y": np.ones((1,), dtype=np.int32)} >>> dataset = ds.NumpySlicesDataset(data=data) >>> >>> def func_net(x, y): ... out = x + y ... return out >>> >>> sink_process = ms.data_sink(func_net, dataset, sink_size=1) >>> for _ in range(2): ... out = sink_process() ... print(out) 2 2 """ Validator.check_value_type("sink_size", sink_size, int, "Data sink") if sink_size <= 0: raise ValueError( f"The 'sink_size' must be positive, but got sink_size {sink_size}.") if context.get_context('device_target') not in ('Ascend', 'GPU'): raise ValueError( f"Data sinking supports ascend or gpu device target, " f"but device target is {context.get_context('device_target')}.") loop = sink_size create_info = True if jit_config is None: create_info = (loop == 1) loop = 1 ori_next_op, is_info_queue = _init_sink_dataset(dataset, loop, input_signature, create_info) @wraps(fn) def sink_process(*args, **kwargs): next_op, key_info = _get_next_op(dataset, ori_next_op, is_info_queue) def sink_fun(): data = next_op() out = fn(*data) return out real_sink_fun = _get_sink_fun(sink_fun, key_info, is_info_queue, dataset, jit_config) loop = sink_size if jit_config is not None and context.get_context('mode') == context.GRAPH_MODE: loop = 1 out = None for _ in range(loop): out = real_sink_fun() return out return sink_process