# Copyright 2019-2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
This file contains basic classes that help users do flexible dataset loading.
You can define your own dataset loading class, and use GeneratorDataset to help load data.
You can refer to the
`tutorial <https://www.mindspore.cn/tutorials/en/r1.9/beginner/dataset.html#customizing-dataset>`_
to help define your dataset loading.
After declaring the dataset object, you can further apply dataset operations
(e.g. filter, skip, concat, map, batch) on it.
"""
import builtins
import math
import os
import signal
import time
import multiprocessing
from multiprocessing.util import Finalize
import queue
from functools import partial
import subprocess
import threading
import weakref
import platform
import psutil
import numpy as np
import mindspore._c_dataengine as cde
from mindspore.common import Tensor
from mindspore import log as logger
from .datasets import UnionBaseDataset, MappableDataset, Schema, to_list, _PythonMultiprocessing, _check_shm_usage
from . import samplers
from .queue import _SharedQueue
from .validators import check_generatordataset, check_numpyslicesdataset, check_paddeddataset
from ..core.config import get_enable_shared_mem, get_prefetch_size, get_multiprocessing_timeout_interval, \
get_enable_watchdog
from ..core.datatypes import mstypelist_to_detypelist
from ..core.py_util_helpers import ExceptionHandler
def _iter_fn(dataset, num_samples):
"""
Generator function wrapper for iterable dataset.
"""
if num_samples is not None and num_samples != 0:
ds_iter = iter(dataset)
for _ in range(num_samples):
try:
val = next(ds_iter)
except StopIteration:
return
# convert output tensors to ndarrays
yield _convert_row(val)
else:
for val in dataset:
# convert output tensors to ndarrays
yield _convert_row(val)
def _generator_fn(generator, num_samples):
"""
Generator function wrapper for generator function dataset.
"""
if num_samples is not None and num_samples != 0:
gen_iter = generator()
for _ in range(num_samples):
try:
val = next(gen_iter)
except StopIteration:
return
yield _convert_row(val)
else:
gen_iter = generator()
for val in gen_iter:
yield _convert_row(val)
def _cpp_sampler_fn(sample_ids, dataset):
"""
Generator function wrapper for mappable dataset with cpp sampler.
"""
if not isinstance(sample_ids, np.ndarray):
raise RuntimeError("Sample IDs are not in a numpy array.")
if sample_ids.size == 0:
raise RuntimeError("Sampler passed an empty sample IDs list.")
for i in sample_ids:
val = dataset[i]
# convert output tensors to ndarrays
yield _convert_row(val)
def _cpp_sampler_fn_mp(sample_ids, sample_fn):
"""
Multiprocessing generator function wrapper for mappable dataset with cpp sampler.
"""
if not isinstance(sample_ids, np.ndarray):
raise RuntimeError("Sample IDs are not in a numpy array.")
if sample_ids.size == 0:
raise RuntimeError("Sampler passed an empty sample IDs list.")
return sample_fn.process(sample_ids)
def _fill_worker_indices(workers, indices, idx):
"""
Worker index queue filler, fill worker index queue in round robin order.
"""
num_worker = len(workers)
while idx < len(indices):
try:
workers[idx % num_worker].put(indices[idx])
idx += 1
except queue.Full:
break
return idx
def _convert_row(row):
"""
Convert Op return value to numpy
"""
if isinstance(row, dict):
raise TypeError("Input data is expected to be " \
"int, float, str, bytes, numpy.ndarray, Tensor or list/tuple of them, but got dict.")
# convert single item to np.array
prim_type = (int, float, str, bytes, np.ndarray, Tensor)
if isinstance(row, prim_type):
if isinstance(row, Tensor): # mindspore.Tensor
item = row.asnumpy()
else:
item = np.array(row, copy=False)
if item.dtype == 'object':
raise TypeError("Data type of the input or its converted Numpy array is expected to be " \
"int or float or str, but got {}.".format(item.dtype))
return tuple([item])
value = []
# convert each item to np.array
idx = 0
for x in row:
idx += 1
if isinstance(x, Tensor): # mindspore.Tensor
value.append(x.asnumpy())
elif isinstance(x, dict):
raise TypeError("The {}th item of input data is expected to be " \
"int, float, str, bytes, numpy.ndarray, Tensor, but got dict.".format(idx))
else:
item = np.array(x, copy=False)
if item.dtype == 'object':
raise TypeError("Data type of {}th item of the input or its converted Numpy array is expected to be " \
"int or float or str, but got {}.".format(idx, item.dtype))
value.append(item)
return tuple(value)
class SamplerFn:
"""
Multiprocessing or multithread generator function wrapper master process.
"""
def __init__(self, dataset, num_worker, multi_process, max_rowsize):
self.workers = []
self.dataset = dataset
self.num_worker = num_worker
self.multi_process = multi_process
self.max_rowsize = max_rowsize
self.need_join = False
self.ppid = os.getpid()
self.pids = []
self.check_interval = get_multiprocessing_timeout_interval() # the interval of check queue's size
self._final_join = True
# Event for end of epoch
if multi_process is True:
try:
self.eof = multiprocessing.Event()
except Exception:
raise RuntimeError("Init multiprocessing.Event() failed, This might be caused by insufficient shm,"
+ " and the recommended shm size is at least 5 GB.")
else:
self.eof = threading.Event()
# Create workers
# get default queue size and adjust queuesize per worker if there are large # workers
queue_size = get_prefetch_size()
queue_size = min(queue_size, queue_size * 4 // num_worker)
queue_size = max(2, queue_size)
if multi_process and get_enable_shared_mem():
_check_shm_usage(num_worker, queue_size, max_rowsize)
count = multiprocessing.Value('i', 0)
for _ in range(num_worker):
if multi_process is True:
try:
worker = _GeneratorWorkerMp(dataset, self.eof, max_rowsize, queue_size, self.ppid, count)
except Exception:
raise RuntimeError("Init multiprocessing.Queue() failed, This might be caused by insufficient shm, "
"and the recommended shm size is at least 5 GB.")
worker.daemon = True
# When multi processes fork a subprocess, the lock of the main process is copied to the subprocess,
# which may cause deadlock. Therefore, the subprocess startup is performed in the initialization phase.
# In this phase, the main process is not locked.
worker.start()
self.pids.append(worker.pid)
self.need_join = True
else:
worker = _GeneratorWorkerMt(dataset, self.eof)
worker.daemon = True
self.workers.append(worker)
self._launch_cleanup_worker(multi_process=multi_process)
def process(self, indices):
"""
The main process, start the child process or child thread, and fill the index queue.
Get the result and return.
"""
for w in self.workers:
# Check whether the queue of the subprocess is empty.
if not w.queue_empty():
raise Exception("The queue of the subprocess is not empty.")
# Start all workers
if not w.is_alive():
w.start()
# Fill initial index queues
idx_cursor = 0
idx_cursor = _fill_worker_indices(self.workers, indices, idx_cursor)
# Fetch results
for i in range(len(indices)):
if self.eof.is_set():
self._stop_subprocess()
return
if self.multi_process is True and not psutil.pid_exists(self.workers[i % self.num_worker].pid):
self._stop_subprocess()
return
# Fetch result and put index
try:
# To avoid get timeout from queue, check the res_queue size.
start_time = int(time.time())
wait_count = 1
while self.workers[i % self.num_worker].res_queue.empty():
time.sleep(0.1)
cost_time = int(time.time()) - start_time
if cost_time / self.check_interval >= wait_count:
wait_count += 1
logger.warning("It has been waiting for " + str(cost_time) + "s because the multi "
"thread/process of the generator generates data had been hung by gil lock. "
"Check whether the source of generator has an infinite loop operation or the "
"output data is too large. You can also set the timeout interval by "
"ds.config.set_multiprocessing_interval to adjust the output frequency of this "
"log.")
pid = self.workers[i % self.num_worker].pid
logger.warning("Generator subprocess ID {} is stuck.".format(pid))
install_status, _ = subprocess.getstatusoutput("py-spy --version")
if install_status == 0:
stack = subprocess.getoutput("py-spy dump -p {} -l -s".format(pid))
logger.warning("Generator subprocess stack:\n{}".format(stack))
else:
logger.warning("Please `pip install py-spy` to get the stacks of the stuck process.")
result = self.workers[i % self.num_worker].get()
if isinstance(result, ExceptionHandler):
result.reraise()
except queue.Empty:
self._stop_subprocess()
raise Exception("Generator worker process timeout.")
except KeyboardInterrupt:
self._stop_subprocess()
raise Exception("Generator worker receives KeyboardInterrupt.")
if self.eof.is_set():
self._stop_subprocess()
return
if idx_cursor < len(indices):
idx_cursor = _fill_worker_indices(self.workers, indices, idx_cursor)
yield _convert_row(result)
def _launch_cleanup_worker(self, multi_process):
"""
We need a extra thread and process if main process or subprocess was killed.
Args:
multi_process: Whether use multiprocess.
"""
if multi_process is True and platform.system().lower() != 'windows':
_clean_worker_func = _PythonMultiprocessing._clean_process # pylint: disable=W0212
self.cleaning_process = multiprocessing.Process(target=_clean_worker_func, args=(self.ppid, self.workers))
self.cleaning_process.daemon = True
self.cleaning_process.start()
if get_enable_watchdog():
self.eot = threading.Event()
self.watch_dog = threading.Thread(target=_PythonMultiprocessing._watch_dog, # pylint: disable=W0212
args=(self.eot, self.workers + [self.cleaning_process]))
self.watch_dog.daemon = True
self.watch_dog.start()
if self._final_join is True:
self._jointhread = Finalize(
self.watch_dog, self._finalize_join,
args=(weakref.ref(self.watch_dog), self.eot),
exitpriority=-5
)
def _stop_subprocess(self):
"""Only the main process can call join."""
if self.need_join is True and self.ppid == os.getpid():
if hasattr(self, 'eof') and self.eof is not None and not self.eof.is_set():
self.eof.set()
self.need_join = False
for w in self.workers:
if self.multi_process is True and hasattr(w, '_closed') and w._closed is False: # pylint: disable=W0212
try:
w.join()
except Exception: # pylint: disable=W0703
# Block all errors when join
continue
self._abort_watchdog()
def _abort_watchdog(self):
if hasattr(self, 'eot') and self.eot is not None and not self.eot.is_set():
self.eot.set()
if hasattr(self, 'cleaning_process') and self.cleaning_process is not None:
_PythonMultiprocessing._terminate_processes([self.cleaning_process]) # pylint: disable=W0212
@classmethod
def _finalize_join(cls, twr, eot):
thread = twr()
if thread is not None:
if eot is not None and not eot.is_set():
eot.set()
thread.join()
def __del__(self):
try:
self._stop_subprocess()
except TypeError:
pass
def __deepcopy__(self, memodict, exclude=()):
self.__init__(self.dataset, self.num_worker, self.multi_process, self.max_rowsize)
def _subprocess_handle(eof, signum, frame):
threading.Thread(target=eof.set()).start()
def _ignore_sigint(is_multiprocessing):
"""
We need to ignore sigint signal here so subprocesses can exit normally and clear.
"""
if is_multiprocessing:
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _main_process_already_exit(eof, is_multiprocessing, idx_queue, result_queue, ppid):
"""
Judge whether main process already exit.
"""
if eof.is_set() or (is_multiprocessing and platform.system().lower() != 'windows' and
not _PythonMultiprocessing.is_process_alive(ppid)):
if is_multiprocessing:
idx_queue.cancel_join_thread()
result_queue.cancel_join_thread()
return True
return False
def _generator_worker_loop(dataset, idx_queue, result_queue, eof, is_multiprocessing, ppid=-1):
"""
Multithread or multiprocess generator worker process loop.
"""
if is_multiprocessing:
signal.signal(signal.SIGTERM, partial(_subprocess_handle, eof))
while True:
_ignore_sigint(is_multiprocessing=is_multiprocessing)
# Fetch index, block
try:
idx = idx_queue.get(timeout=1)
except queue.Empty:
if _main_process_already_exit(eof, is_multiprocessing, idx_queue, result_queue, ppid) is True:
return
# If end-of-file (eof) is not set, continue to get data from idx_queue
continue
if idx is None:
# When the queue is out of scope from master process, a None item can be fetched from the queue.
# Upon receiving None, worker process should check if eof is set.
if not eof.is_set():
raise Exception("")
return
if eof.is_set():
if is_multiprocessing:
idx_queue.cancel_join_thread()
result_queue.cancel_join_thread()
return
# Fetch data, any exception from __getitem__ will terminate worker and timeout master process
try:
result = dataset[idx]
except Exception: # pylint: disable=broad-except
result = ExceptionHandler(where="in GeneratorDataset worker process")
# Send data, block
while True:
try:
result_queue.put(result, timeout=5)
except queue.Full:
if _main_process_already_exit(eof, is_multiprocessing, idx_queue, result_queue, ppid) is True:
return
# If eof is not set, continue to put data to result_queue
continue
break
del result, idx
class _GeneratorWorkerMt(threading.Thread):
"""
Worker process for multi-thread Generator.
"""
def __init__(self, dataset, eof):
self.idx_queue = queue.Queue(16)
self.res_queue = queue.Queue(16)
super().__init__(target=_generator_worker_loop, args=(dataset, self.idx_queue, self.res_queue, eof, False))
def put(self, item):
"""
Put function for worker index queue. Never block. Raise queue.Full on failure.
"""
self.idx_queue.put_nowait(item)
def get(self):
"""
Get function for worker result queue. Block with timeout.
"""
return self.res_queue.get(timeout=30)
def queue_empty(self):
if not self.idx_queue.empty():
logger.warning("idx_queue is not empty")
return False
if not self.res_queue.empty():
logger.warning("res_queue is not empty")
return False
return True
class _GeneratorWorkerMp(multiprocessing.Process):
"""
Worker process for multiprocess Generator.
"""
def __init__(self, dataset, eof, max_rowsize, queue_size, ppid, count):
self.idx_queue = multiprocessing.Queue(queue_size)
if get_enable_shared_mem():
self.res_queue = _SharedQueue(queue_size, count, max_rowsize=max_rowsize)
else:
self.res_queue = multiprocessing.Queue(queue_size)
self.idx_queue._joincancelled = True # pylint: disable=W0212
self.res_queue._joincancelled = True # pylint: disable=W0212
super().__init__(target=_generator_worker_loop, args=(dataset, self.idx_queue, self.res_queue, eof, True, ppid))
def put(self, item):
"""
Put function for worker index queue. Never block. Raise queue.Full on failure.
"""
self.idx_queue.put_nowait(item)
def get(self):
"""
Get function for worker result queue. Block with timeout.
"""
# Relax 10s to 30s, since it sometimes will cause "Generator worker process timeout"
# when we run too many iterators with infinite epoch(num_epoch=-1)
return self.res_queue.get(timeout=30)
def queue_empty(self):
if not self.idx_queue.empty():
logger.warning("idx_queue is not empty.")
return False
if not self.res_queue.empty():
logger.warning("res_queue is not empty.")
return False
return True
def __del__(self):
# del all the Queue & SharedQueue when the iter had been deleted from ITERATORS_LIST
del self.idx_queue
del self.res_queue
[docs]class GeneratorDataset(MappableDataset, UnionBaseDataset):
"""
A source dataset that generates data from Python by invoking Python data source each epoch.
The column names and column types of generated dataset depend on Python data defined by users.
Args:
source (Union[Callable, Iterable, Random Accessible]):
A generator callable object, an iterable Python object or a random accessible Python object.
Callable source is required to return a tuple of NumPy arrays as a row of the dataset on source().next().
Iterable source is required to return a tuple of NumPy arrays as a row of the dataset on
iter(source).next().
Random accessible source is required to return a tuple of NumPy arrays as a row of the dataset on
source[idx].
column_names (Union[str, list[str]], optional): List of column names of the dataset (default=None). Users are
required to provide either column_names or schema.
column_types (list[mindspore.dtype], optional): List of column data types of the dataset (default=None).
If provided, sanity check will be performed on generator output.
schema (Union[Schema, str], optional): Path to the JSON schema file or schema object (default=None). Users are
required to provide either column_names or schema. If both are provided, schema will be used.
num_samples (int, optional): The number of samples to be included in the dataset
(default=None, all images).
num_parallel_workers (int, optional): Number of subprocesses used to fetch the dataset in parallel (default=1).
shuffle (bool, optional): Whether or not to perform shuffle on the dataset. Random accessible input is required.
(default=None, expected order behavior shown in the table).
sampler (Union[Sampler, Iterable], optional): Object used to choose samples from the dataset. Random accessible
input is required (default=None, expected order behavior shown in the table).
num_shards (int, optional): Number of shards that the dataset will be divided into (default=None).
Random accessible input is required. When this argument is specified, `num_samples` reflects the maximum
sample number of per shard.
shard_id (int, optional): The shard ID within `num_shards` (default=None). This argument must be specified only
when num_shards is also specified. Random accessible input is required.
python_multiprocessing (bool, optional): Parallelize Python operations with multiple worker process. This
option could be beneficial if the Python operation is computational heavy (default=True).
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default 6 MB).
Raises:
RuntimeError: If source raises an exception during execution.
RuntimeError: If len of column_names does not match output len of source.
ValueError: If `num_parallel_workers` exceeds the max thread numbers.
ValueError: If sampler and shuffle are specified at the same time.
ValueError: If sampler and sharding are specified at the same time.
ValueError: If num_shards is specified but shard_id is None.
ValueError: If shard_id is specified but `num_shards` is None.
ValueError: If `shard_id` is invalid (< 0 or >= `num_shards`).
Note:
- Input `source` accepts user-defined Python functions (PyFuncs), Do not add network computing operators from
mindspore.nn and mindspore.ops or others into this `source`.
- This dataset can take in a `sampler`. `sampler` and `shuffle` are mutually exclusive.
The table below shows what input arguments are allowed and their expected behavior.
.. list-table:: Expected Order Behavior of Using `sampler` and `shuffle`
:widths: 25 25 50
:header-rows: 1
* - Parameter `sampler`
- Parameter `shuffle`
- Expected Order Behavior
* - None
- None
- random order
* - None
- True
- random order
* - None
- False
- sequential order
* - Sampler object
- None
- order defined by sampler
* - Sampler object
- True
- not allowed
* - Sampler object
- False
- not allowed
Examples:
>>> import numpy as np
>>>
>>> # 1) Multidimensional generator function as callable input.
>>> def generator_multidimensional():
... for i in range(64):
... yield (np.array([[i, i + 1], [i + 2, i + 3]]),)
>>>
>>> dataset = ds.GeneratorDataset(source=generator_multidimensional, column_names=["multi_dimensional_data"])
>>>
>>> # 2) Multi-column generator function as callable input.
>>> def generator_multi_column():
... for i in range(64):
... yield np.array([i]), np.array([[i, i + 1], [i + 2, i + 3]])
>>>
>>> dataset = ds.GeneratorDataset(source=generator_multi_column, column_names=["col1", "col2"])
>>>
>>> # 3) Iterable dataset as iterable input.
>>> class MyIterable:
... def __init__(self):
... self._index = 0
... self._data = np.random.sample((5, 2))
... self._label = np.random.sample((5, 1))
...
... def __next__(self):
... if self._index >= len(self._data):
... raise StopIteration
... else:
... item = (self._data[self._index], self._label[self._index])
... self._index += 1
... return item
...
... def __iter__(self):
... self._index = 0
... return self
...
... def __len__(self):
... return len(self._data)
>>>
>>> dataset = ds.GeneratorDataset(source=MyIterable(), column_names=["data", "label"])
>>>
>>> # 4) Random accessible dataset as random accessible input.
>>> class MyAccessible:
... def __init__(self):
... self._data = np.random.sample((5, 2))
... self._label = np.random.sample((5, 1))
...
... def __getitem__(self, index):
... return self._data[index], self._label[index]
...
... def __len__(self):
... return len(self._data)
>>>
>>> dataset = ds.GeneratorDataset(source=MyAccessible(), column_names=["data", "label"])
>>>
>>> # list, dict, tuple of Python is also random accessible
>>> dataset = ds.GeneratorDataset(source=[(np.array(0),), (np.array(1),), (np.array(2),)], column_names=["col"])
"""
@check_generatordataset
def __init__(self, source, column_names=None, column_types=None, schema=None, num_samples=None,
num_parallel_workers=1, shuffle=None, sampler=None, num_shards=None, shard_id=None,
python_multiprocessing=True, max_rowsize=6):
super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples,
shuffle=shuffle, num_shards=num_shards, shard_id=shard_id)
if isinstance(source, builtins.zip):
# Although zip is iteratable, it does not have the feature of repeated iteration, so pass it to the array.
self.source = [item for item in source]
else:
self.source = source
self.prepared_source = None # source to be sent to C++
if hasattr(self, 'operator_mixed') and getattr(self, 'operator_mixed') is True:
self.num_parallel_workers = 1
logger.warning(
"Input 'source' of 'GeneratorDataset' includes network computing operators like in mindspore.nn, "
"mindspore.ops, mindspore.numpy module and etc, which do not support multi-thread compiling, recommend"
" to replace it with python implemented operator like numpy etc. Here decrease 'num_parallel_workers' "
"into 1.")
if platform.system().lower() == 'windows' and num_parallel_workers > 1 and python_multiprocessing:
logger.warning("Python multiprocessing is not supported on Windows platform.")
self.python_multiprocessing = python_multiprocessing if platform.system().lower() != 'windows' else False
self.column_names = to_list(column_names)
if column_types is not None:
self.column_types = mstypelist_to_detypelist(column_types)
else:
self.column_types = []
self.schema = schema
if schema is not None:
self.schema = schema
if not isinstance(schema, Schema):
self.schema = Schema(schema)
# Move get dataset_size by len from parse to here, because self.source will
# lose attribution of '__len__' after deepcopy.
self.source_len = -1 # unknown
if hasattr(self.source, "__len__"):
self.source_len = len(self.source)
self.max_rowsize = max_rowsize
self.sample_fn = None
def __deepcopy__(self, memodict):
if id(self) in memodict:
return memodict[id(self)]
new_op = self.__safe_deepcopy__(memodict, exclude=("source", "__transfer_dataset__"))
sample_fn = None
if new_op.sampler is not None and hasattr(self.source, "__getitem__"):
# The reason why there is a try catch here is because when the new op is being constructed with shared
# memory enabled, there will be an exception thrown if there is not enough shared memory available
if self.source_len == -1:
raise RuntimeError("Attempt to construct a random access dataset, '__len__' method is required!")
try:
if new_op.num_parallel_workers > 1:
self.__validate_memory_usage()
sample_fn = SamplerFn(self.source, new_op.num_parallel_workers, self.python_multiprocessing,
self.max_rowsize)
new_op.prepared_source = (lambda sample_ids: _cpp_sampler_fn_mp(sample_ids, sample_fn))
else:
new_op.prepared_source = (lambda sample_ids: _cpp_sampler_fn(sample_ids, self.source))
new_op.sample_fn = sample_fn
except RuntimeError as e:
raise Exception(str(e))
else:
try:
new_op.sampler = None
new_op.sample_fn = sample_fn
new_op.source_len = min(new_op.source_len,
new_op.num_samples) if new_op.num_samples != 0 else new_op.source_len
iter(self.source)
except TypeError:
# Use generator function if input callable
new_op.prepared_source = (lambda: _generator_fn(self.source, new_op.num_samples))
else:
# Use iterator function if input is iterable
# Random accessible input is also iterable
new_op.prepared_source = (lambda: _iter_fn(self.source, new_op.num_samples))
return new_op
def is_shuffled(self):
if self.sampler:
return self.sampler.is_shuffled()
return False
def is_sharded(self):
if self.sampler:
return self.sampler.is_sharded()
return False
def parse(self, children=None):
if self.schema is None:
return cde.GeneratorNode(self.prepared_source, self.column_names, self.column_types, self.source_len,
self.sampler, self.num_parallel_workers)
schema = self.schema
if isinstance(schema, Schema):
schema = self.schema.cpp_schema
return cde.GeneratorNode(self.prepared_source, schema, self.source_len, self.sampler,
self.num_parallel_workers)
def __validate_memory_usage(self):
"""
Check memory usage when mulit-processing mode, when 85% prompt warning and 100% raise error.
"""
if self.python_multiprocessing:
# if use num_parallel_workers is to large when python_multiprocessing=True which would cause
# OOM error get the num_shards
valid_num_shards = 1
if isinstance(self.sampler, samplers.DistributedSampler):
valid_num_shards = self.sampler.num_shards
elif self.num_shards is not None:
valid_num_shards = self.num_shards
# get process memory usage
process = psutil.Process(os.getpid())
process_memory = process.memory_info().rss
sys_memory_free = psutil.virtual_memory().free
total_memory_maybe_used = process_memory * self.num_parallel_workers * valid_num_shards
if total_memory_maybe_used / sys_memory_free > 0.85:
valid_num_worker = math.floor(sys_memory_free * 0.85 / valid_num_shards / process_memory)
valid_num_worker = 1 if valid_num_worker <= 0 else valid_num_worker
info = "GeneratorDataset's num_parallel_workers: {} is too large which may cause a lot of memory " \
"occupation (>85%) or out of memory(OOM) during multiprocessing. Therefore, it is recommended " \
"to reduce num_parallel_workers to {} or smaller.".format(self.num_parallel_workers,
valid_num_worker)
logger.warning(info)
class _NumpySlicesDataset:
"""
Mainly for dealing with several kinds of formats of Python data, and return one row each time.
"""
def __init__(self, data, column_list=None):
self.column_list = None
# Convert dict data into tuple
if isinstance(data, dict):
data = self.process_dict(data)
if isinstance(data, tuple):
self.data = ()
data_len = len(data)
for i in range(data_len):
self.data = self.data + (np.array(data[i]),)
else:
self.data = (np.array(data),)
# check whether the data length in each column is equal
data_len = [len(data_item) for data_item in self.data]
if data_len[1:] != data_len[:-1]:
raise ValueError("Data length in each column is not equal.")
# Init column_name
if column_list is not None:
self.column_list = column_list
elif self.column_list is None:
self.column_list = []
column_num = len(self.data)
for i in range(column_num):
self.column_list.append("column_" + str(i))
def __getitem__(self, index):
data_row = [d[index, ...] for d in self.data]
data_res = tuple(data_row)
return data_res
def __len__(self):
return len(self.data[0])
def process_dict(self, input_data):
"""
Convert the dict like data into tuple format, when input is a tuple of dicts then compose it into a dict first.
"""
# Convert pandas like dict(has "values" column) into General dict
data_keys = list(input_data.keys())
data_col = input_data[data_keys[0]]
if hasattr(data_col, "values"):
new_dict = {}
for key in data_keys:
item1 = input_data.pop(key)
new_dict[key] = item1.values
input_data = new_dict
# Convert the data in dict into tuple
data = ()
keys = list(input_data.keys())
self.column_list = keys
for key in keys:
value = input_data[key]
data = data + (list(value),)
return data
[docs]class NumpySlicesDataset(GeneratorDataset):
"""
Creates a dataset with given data slices, mainly for loading Python data into dataset.
The column names and column types of generated dataset depend on Python data defined by users.
Args:
data (Union[list, tuple, dict]) Input of given data. Supported data types include: list, tuple, dict and other
NumPy formats. Input data will be sliced along the first dimension and generate additional rows, if input is
list, there will be one column in each row, otherwise there tends to be multi columns. Large data is not
recommended to be loaded in this way as data is loading into memory.
column_names (list[str], optional): List of column names of the dataset (default=None). If column_names is not
provided, the output column names will be named as the keys of dict when the input data is a dict,
otherwise they will be named like column_0, column_1 ...
num_samples (int, optional): The number of samples to be included in the dataset (default=None, all samples).
num_parallel_workers (int, optional): Number of subprocesses used to fetch the dataset in parallel (default=1).
shuffle (bool, optional): Whether or not to perform shuffle on the dataset. Random accessible input is required.
(default=None, expected order behavior shown in the table).
sampler (Union[Sampler, Iterable], optional): Object used to choose samples from the dataset. Random accessible
input is required (default=None, expected order behavior shown in the table).
num_shards (int, optional): Number of shards that the dataset will be divided into (default=None).
Random accessible input is required. When this argument is specified, `num_samples` reflects the max
sample number of per shard.
shard_id (int, optional): The shard ID within `num_shards` (default=None). This argument must be specified only
when num_shards is also specified. Random accessible input is required.
Note:
- This dataset can take in a `sampler`. `sampler` and `shuffle` are mutually exclusive.
The table below shows what input arguments are allowed and their expected behavior.
.. list-table:: Expected Order Behavior of Using `sampler` and `shuffle`
:widths: 25 25 50
:header-rows: 1
* - Parameter `sampler`
- Parameter `shuffle`
- Expected Order Behavior
* - None
- None
- random order
* - None
- True
- random order
* - None
- False
- sequential order
* - Sampler object
- None
- order defined by sampler
* - Sampler object
- True
- not allowed
* - Sampler object
- False
- not allowed
Raises:
RuntimeError: If len of column_names does not match output len of data.
ValueError: If `num_parallel_workers` exceeds the max thread numbers.
ValueError: If sampler and shuffle are specified at the same time.
ValueError: If sampler and sharding are specified at the same time.
ValueError: If num_shards is specified but shard_id is None.
ValueError: If shard_id is specified but `num_shards` is None.
ValueError: If `shard_id` is invalid (< 0 or >= `num_shards`).
Examples:
>>> # 1) Input data can be a list
>>> data = [1, 2, 3]
>>> dataset = ds.NumpySlicesDataset(data=data, column_names=["column_1"])
>>>
>>> # 2) Input data can be a dictionary, and column_names will be its keys
>>> data = {"a": [1, 2], "b": [3, 4]}
>>> dataset = ds.NumpySlicesDataset(data=data)
>>>
>>> # 3) Input data can be a tuple of lists (or NumPy arrays), each tuple element refers to data in each column
>>> data = ([1, 2], [3, 4], [5, 6])
>>> dataset = ds.NumpySlicesDataset(data=data, column_names=["column_1", "column_2", "column_3"])
>>>
>>> # 4) Load data from CSV file
>>> import pandas as pd
>>> df = pd.read_csv(filepath_or_buffer=csv_dataset_dir[0])
>>> dataset = ds.NumpySlicesDataset(data=dict(df), shuffle=False)
"""
@check_numpyslicesdataset
def __init__(self, data, column_names=None, num_samples=None, num_parallel_workers=1, shuffle=None, sampler=None,
num_shards=None, shard_id=None):
dataset = _NumpySlicesDataset(data, column_names)
super().__init__(dataset, column_names=dataset.column_list, num_samples=num_samples,
num_parallel_workers=num_parallel_workers, shuffle=shuffle, sampler=sampler,
num_shards=num_shards, shard_id=shard_id)
class _PaddedDataset:
"""
Mainly for combining false samples provided by users into a dataset.
Args:
padded_samples (list(dict)): Data provided by user to be added to the initial Dataset.
"""
def __init__(self, padded_samples):
self.column_names = list(padded_samples[0].keys())
self.padded_samples = padded_samples
def __getitem__(self, item):
return (self.padded_samples[item][key] for key in self.column_names)
def __len__(self):
return len(self.padded_samples)
[docs]class PaddedDataset(GeneratorDataset):
"""
Creates a dataset with filler data provided by user.
Mainly used to add to the original dataset and assign it to the corresponding shard.
Args:
padded_samples (list(dict)): Samples provided by user.
Raises:
TypeError: If padded_samples is not an instance of list.
TypeError: If the element of padded_samples is not an instance of dict.
ValueError: If the padded_samples is empty.
Examples:
>>> import numpy as np
>>> data = [{'image': np.zeros(1, np.uint8)}, {'image': np.zeros(2, np.uint8)}]
>>> dataset = ds.PaddedDataset(padded_samples=data)
"""
@check_paddeddataset
def __init__(self, padded_samples):
dataset = _PaddedDataset(padded_samples)
super().__init__(dataset, column_names=dataset.column_names, num_shards=None, shard_id=None, shuffle=False)
self._dataset_size = len(dataset.padded_samples)
self.padded_samples = padded_samples