单节点数据缓存

查看源文件

概述

对于需要重复访问远程的数据集或需要重复从磁盘中读取数据集的情况,可以使用单节点缓存算子将数据集缓存于本地内存中,以加速数据集的读取。

缓存算子依赖于在当前节点启动的缓存服务器,缓存服务器作为守护进程独立于用户的训练脚本而存在,主要用于提供缓存数据的管理,支持包括存储、查找、读取以及发生缓存未命中时对于缓存数据的写入等操作。

若用户的内存空间不足以缓存所有数据集,则用户可以配置缓存算子使其将剩余数据缓存至磁盘。

目前,缓存服务只支持单节点缓存,即客户端和服务器均在同一台机器上。该服务支持以下两类使用场景:

  • 缓存加载好的原始数据集

    用户可以在数据集加载算子中使用缓存。这将把加载完成的数据存到缓存服务器中,后续若需相同数据则可直接从中读取,避免从磁盘中重复加载。

    cache on leaf pipeline

  • 缓存经过数据增强处理后的数据

    用户也可在map算子中使用缓存。这将允许直接缓存数据增强(如图像裁剪、缩放等)处理后的数据,避免数据增强操作重复进行,减少了不必要的计算量。

    cache on map pipeline

缓存基础使用

  1. 配置环境。

    使用缓存服务前,需要安装MindSpore,并设置相关环境变量。以Conda环境为例,设置方法如下:

    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:{path_to_conda}/envs/{your_env_name}/lib/python3.7/site-packages/mindspore:{path_to_conda}/envs/{your_env_name}/lib/python3.7/site-packages/mindspore/lib
    export PATH=$PATH:{path_to_conda}/envs/{your_env_name}/bin
    

    由于使用缓存可能会造成服务器的内存紧张,因此建议用户在使用缓存前增大服务器的交换内存空间至100GB以上,Ubuntu、EulerOS以及CentOS均可参考相关教程了解如何增大交换内存空间。

  2. 启动缓存服务器。

    在使用单节点缓存服务之前,首先需要在命令行输入以下命令,启动缓存服务器:

    cache_admin --start
    

    若输出下列信息,则表示缓存服务器启动成功。

    Cache server startup completed successfully!
    The cache server daemon has been created as process id 10394 and is listening on port 50052
    
    Recommendation:
    Since the server is detached into its own daemon process, monitor the server logs (under /tmp/mindspore/cache/log) for any issues that may happen after startup
    

    cache_admin支持以下命令和参数:

    • --start:启动缓存服务器,支持通过以下参数进行配置:

      • --workers-w:设置缓存服务器的工作线程数量,默认情况下工作线程数量为机器CPU个数的一半。

      • --spilldir-s:设置若缓存数据的大小超过内存空间,则溢出至磁盘的数据文件路径,默认为空(表示不启用数据溢出功能)。

      • --hostname-h:缓存服务器的ip地址,默认为127.0.0.1。

      • --port-p:缓存服务器的端口号,默认为50052。

      • --loglevel-l:设置日志等级,默认为1(WARNING级别)。若设置为0(INFO级别),会输出过多日志,导致性能劣化。

    • --stop:关闭缓存服务器。

    • --generate_session-g:生成一个缓存会话。

    • --destroy_session-d:删除一个缓存会话。

    • --list_sessions:查看当前缓存会话列表和详细信息。

    • --help:查看帮助信息。

    以上命令均可使用-h-p参数来指定服务器,若未指定则默认对ip为127.0.0.1且端口号为50052的服务器执行操作。

    用户可通过ps -ef|grep cache_server命令来检查服务器是否已启动以及查询服务器参数。

    • 设置cache_server初始化参数时,要先确认系统可用内存和待加载数据集大小,cache_server初始化容量或待加载数据集空间占耗超过系统可用内存时,都有可能导致机器宕机/重启、cache_server自动关闭、训练流程执行失败等问题。

    • 若要启用数据溢出功能,则用户在启动缓存服务器时必须使用-s参数对溢出路径进行设置,否则该功能默认关闭。

  3. 创建缓存会话。

    若缓存服务器中不存在缓存会话,则需要创建一个缓存会话,得到缓存会话id:

    $ cache_admin -g
    Session created for server on port 50052: 1456416665
    

    其中1456416665为端口50052的服务器分配的缓存会话id,缓存会话id由服务器分配。

    通过cache_admin --list_sessions命令可以查看当前服务器中现存的所有缓存会话信息。

    $ cache_admin --list_sessions
    Listing sessions for server on port 50052
    
         Session    Cache Id  Mem cached Disk cached  Avg cache size  Numa hit
      1456416665         n/a         n/a         n/a             n/a       n/a
    

    输出参数说明:

    • Session: 缓存会话id。

    • Cache Id: 当前缓存会话中的cache实例id,n/a表示当前尚未创建缓存实例。

    • Mem cached: 缓存在内存中的数据量。

    • Disk cached: 缓存在磁盘中的数据量。

    • Avg cache size:当前缓存的每行数据的平均大小。

    • Numa hit:Numa命中数,该值越高将获得越好的时间性能。

  4. 创建缓存实例。

    在Python训练脚本中使用DatasetCache API来定义一个名为test_cache的缓存实例,并把上一步中创建的缓存会话id传入session_id参数:

    import mindspore.dataset as ds
    
    test_cache = ds.DatasetCache(session_id=1456416665, size=0, spilling=True)
    

    DatasetCache支持以下参数:

    • session_id:缓存会话的id,通过cache_admin -g命令来创建并获取。

    • size:缓存最大内存空间占用,该参数以MB为单位,例如512GB的缓存空间应设置size=524288,默认为0。

    • spilling:当内存空间超出所设置的最大内存空间占用时,是否允许将剩余的数据溢出至磁盘,默认为False。

    • hostname:连接至缓存服务器的ip地址,默认为127.0.0.1。

    • port:连接至缓存服务器的端口号,默认为50052。

    • num_connections:建立的TCP/IP连接数,默认为12。

    • prefetch_size:每次访问获取的行数,默认为20。

    • 在实际使用中,通常应当首先使用cache_admin -g命令从缓存服务器处获得一个缓存会话id并作为session_id的参数,防止发生缓存会话不存在而报错的情况。

    • 设置size=0代表不限制缓存所使用的内存空间,但不超过系统总内存的80%。注意,设置size为0可能会存在机器“out of memory”的风险,因此建议用户根据机器本身的空闲内存大小,给size参数设置一个合理的取值。

    • 若设置spilling=True,则当内存空间不足时,多余数据将写入磁盘中。因此,用户需确保所设置的磁盘路径具有写入权限以及足够的磁盘空间,以存储溢出至磁盘的缓存数据。注意,若启动服务器时未指定溢出路径,则在调用API时设置spilling=True将会导致报错。

    • 若设置spilling=False,则缓存服务器在耗尽所设置的内存空间后将不再写入新的数据。

    • 当使用不支持随机访问的数据集(如TFRecordDataset)进行数据加载并启用缓存服务时,需要保证整个数据集均存放于本地。在该场景下,若本地内存空间不足以存放所有数据,则必须启用溢出,将数据溢出至磁盘。

    • num_connectionsprefetch_size为内部性能调优参数,一般情况下,用户无需设置这两个参数。

  5. 插入缓存实例。

    当前缓存服务既支持对原始数据集的缓存,也可以用于缓存经过数据增强处理后的数据。下例分别展示了两种使用方式。

    需要注意的是,两个例子均需要按照步骤4中的方法分别创建一个缓存实例,并在数据集加载或map算子中将所创建的test_cache作为cache参数分别传入。

    下面两个样例中使用到CIFAR-10数据集。运行样例前,需参照数据集加载中的方法下载并存放CIFAR-10数据集。

    • 缓存原始数据集加载的数据。

      dataset_dir = "cifar-10-batches-bin/"
      
      # apply cache to dataset
      data = ds.Cifar10Dataset(dataset_dir=dataset_dir, num_samples=4, shuffle=False, num_parallel_workers=1, cache=test_cache)
      
      num_iter = 0
      for item in data.create_dict_iterator(num_epochs=1):  # each data is a dictionary
          # in this example, each dictionary has a key "image"
          print("{} image shape: {}".format(num_iter, item["image"].shape))
          num_iter += 1
      

      输出结果:

      0 image shape: (32, 32, 3)
      1 image shape: (32, 32, 3)
      2 image shape: (32, 32, 3)
      3 image shape: (32, 32, 3)
      

      通过cache_admin --list_sessions命令可以查看当前会话有四条数据,说明数据缓存成功。

      $ cache_admin --list_sessions
      Listing sessions for server on port 50052
      
           Session    Cache Id  Mem cached Disk cached  Avg cache size  Numa hit
        1456416665   821590605       4         n/a          3226           4
      
    • 缓存经过数据增强处理后的数据。

      import mindspore.dataset.vision.c_transforms as c_vision
      
      dataset_dir = "cifar-10-batches-bin/"
      
      # apply cache to dataset
      data = ds.Cifar10Dataset(dataset_dir=dataset_dir, num_samples=5, shuffle=False, num_parallel_workers=1)
      
      # apply cache to map
      rescale_op = c_vision.Rescale(1.0 / 255.0, -1.0)
      data = data.map(input_columns=["image"], operations=rescale_op, cache=test_cache)
      
      num_iter = 0
      for item in data.create_dict_iterator(num_epochs=1):  # each data is a dictionary
          # in this example, each dictionary has a keys "image"
          print("{} image shape: {}".format(num_iter, item["image"].shape))
          num_iter += 1
      

      输出结果:

      0 image shape: (32, 32, 3)
      1 image shape: (32, 32, 3)
      2 image shape: (32, 32, 3)
      3 image shape: (32, 32, 3)
      4 image shape: (32, 32, 3)
      

      通过cache_admin --list_sessions命令可以查看当前会话有五条数据,说明数据缓存成功。

      $ cache_admin --list_sessions
      Listing sessions for server on port 50052
      
           Session    Cache Id  Mem cached Disk cached  Avg cache size  Numa hit
        1456416665  3618046178       5         n/a          12442         5
      
  6. 销毁缓存会话。

    在训练结束后,可以选择将当前的缓存销毁并释放内存:

    $ cache_admin --destroy_session 1456416665
    Drop session successfully for server on port 50052
    

    以上命令将销毁端口50052服务器中缓存会话id为1456416665的缓存。

    若选择不销毁缓存,则该缓存会话中的缓存数据将继续存在,用户下次启动训练脚本时可以继续使用该缓存。

  7. 关闭缓存服务器。

    使用完毕后,可以通过以下命令关闭缓存服务器,该操作将销毁当前服务器中存在的所有缓存会话并释放内存。

    $ cache_admin --stop
    Cache server on port 50052 has been stopped successfully.
    

    以上命令将关闭端口50052的服务器。

    若选择不关闭服务器,则服务器中已创建的缓存会话将保留,并供下次使用。下次训练时,用户可以新建缓存会话或重复使用已有缓存。

缓存共享

对于单机多卡的分布式训练的场景,缓存算子还允许多个相同的训练脚本共享同一个缓存,共同从缓存中读写数据。

  1. 启动缓存服务器。

    $ cache_admin --start
    Cache server startup completed successfully!
    The cache server daemon has been created as process id 39337 and listening on port 50052
    
    Recommendation:
    Since the server is detached into its own daemon process, monitor the server logs (under /tmp/mindspore/cache/log) for any issues that may happen after startup
    
  2. 创建缓存会话。

    创建启动Python训练的Shell脚本cache.sh,通过以下命令生成一个缓存会话id:

    #!/bin/bash
    # This shell script will launch parallel pipelines
    
    # get path to dataset directory
    if [ $# != 1 ]
    then
            echo "Usage: sh cache.sh DATASET_PATH"
    exit 1
    fi
    dataset_path=$1
    
    # generate a session id that these parallel pipelines can share
    result=$(cache_admin -g 2>&1)
    rc=$?
    if [ $rc -ne 0 ]; then
        echo "some error"
        exit 1
    fi
    
    # grab the session id from the result string
    session_id=$(echo $result | awk '{print $NF}')
    
  3. 将缓存会话id传入训练脚本。

    继续编写Shell脚本,添加以下命令在启动Python训练时将session_id以及其他参数传入:

    # make the session_id available to the python scripts
    num_devices=4
    
    for p in $(seq 0 $((${num_devices}-1))); do
        python my_training_script.py --num_devices "$num_devices" --device "$p" --session_id $session_id --dataset_path $dataset_path
    done
    

    直接获取完整样例代码:cache.sh

  4. 创建并应用缓存实例。

    下面样例中使用到CIFAR-10数据集。运行样例前,需参照数据集加载中的方法下载并存放CIFAR-10数据集。目录结构如下:

    ├─cache.sh
    ├─my_training_script.py
    └─cifar-10-batches-bin
        ├── batches.meta.txt
        ├── data_batch_1.bin
        ├── data_batch_2.bin
        ├── data_batch_3.bin
        ├── data_batch_4.bin
        ├── data_batch_5.bin
        ├── readme.html
        └── test_batch.bin
    

    创建并编写Python脚本my_training_script.py,通过以下代码接收传入的session_id,并在定义缓存实例时将其作为参数传入。

    import argparse
    import mindspore.dataset as ds
    
    parser = argparse.ArgumentParser(description='Cache Example')
    parser.add_argument('--num_devices', type=int, default=1, help='Device num.')
    parser.add_argument('--device', type=int, default=0, help='Device id.')
    parser.add_argument('--session_id', type=int, default=1, help='Session id.')
    parser.add_argument('--dataset_path', type=str, default=None, help='Dataset path')
    args_opt = parser.parse_args()
    
    # apply cache to dataset
    test_cache = ds.DatasetCache(session_id=args_opt.session_id, size=0, spilling=False)
    dataset = ds.Cifar10Dataset(dataset_dir=args_opt.dataset_path, num_samples=4, shuffle=False, num_parallel_workers=1,
                                num_shards=args_opt.num_devices, shard_id=args_opt.device, cache=test_cache)
    num_iter = 0
    for _ in dataset.create_dict_iterator():
        num_iter += 1
    print("Got {} samples on device {}".format(num_iter, args_opt.device))
    

    直接获取完整样例代码:my_training_script.py

  5. 运行训练脚本。

    运行Shell脚本cache.sh开启分布式训练:

    $ sh cache.sh cifar-10-batches-bin/
    Got 4 samples on device 0
    Got 4 samples on device 1
    Got 4 samples on device 2
    Got 4 samples on device 3
    

    通过cache_admin --list_sessions命令可以查看当前会话中只有一组数据,说明缓存共享成功。

    $ cache_admin --list_sessions
    Listing sessions for server on port 50052
    
         Session    Cache Id  Mem cached Disk cached  Avg cache size  Numa hit
      3392558708   821590605          16         n/a            3227        16
    
  6. 销毁缓存会话。

    在训练结束后,可以选择将当前的缓存销毁并释放内存:

    $ cache_admin --destroy_session 3392558708
    Drop session successfully for server on port 50052
    
  7. 关闭缓存服务器。

    使用完毕后,可以选择关闭缓存服务器:

    $ cache_admin --stop
    Cache server on port 50052 has been stopped successfully.
    

当前限制

  • 当前MindDatasetGraphDatasetGeneratorDatasetPaddedDatasetNumpySlicesDataset等数据集类不支持缓存。其中,GeneratorDatasetPaddedDatasetNumpySlicesDataset属于GeneratorOp,在不支持的报错信息中会呈现“There is currently no support for GeneratorOp under cache”。

  • 经过batchconcatfilterrepeatskipsplittakezip处理后的数据不支持缓存。

  • 经过随机数据增强操作(如RandomCrop)后的数据不支持缓存。

  • 不支持在同个数据管道的不同位置嵌套使用同一个缓存实例。

缓存性能调优

使用缓存服务能够在一些场景下获得显著的性能提升,例如:

  • 缓存经过数据增强处理后的数据。在该场景下,用户不需要在每个epoch重复执行数据增强操作,可节省较多时间。

  • 在简单网络的训练和推理过程中使用缓存服务。相比于复杂网络,简单网络的训练耗时占比更小,因此在该场景下应用缓存,能获得更显著的时间性能提升。

然而,在以下场景中使用缓存的收益可能较差,例如:

  • 系统内存不足、缓存未命中等因素将导致缓存服务在时间性能上提升不明显。因此,可在使用缓存前检查可用系统内存是否充足,选择一个适当的缓存大小。

  • 过多缓存溢出会导致时间性能变差。因此,在使用可随机访问的数据集(如ImageFolderDataset)进行数据加载的场景,尽量不要允许缓存溢出至磁盘。