Source code for mindspore.parallel.checkpoint_transform

# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Transform distributed checkpoint"""
from __future__ import absolute_import

import os
import glob
import copy
from collections import defaultdict
import numpy as np
import mindspore as ms
from mindspore.parallel._parallel_serialization import _rank_list_for_transform_parallel_checkpoint, \
    _transform_parallel_checkpoint, _get_device_num_from_strategy, _make_dir, _load_strategy_file, \
    _extract_layout_map, _extract_src_dst_layout_map, _parameter_not_in_local_stage, _extract_pipeline_stage_num


__all__ = ["merge_pipeline_strategys", "rank_list_for_transform", "transform_checkpoint_by_rank",
           "transform_checkpoints"]


[docs]def merge_pipeline_strategys(src_strategy_dirs, dst_strategy_file): """ Merge parallel strategy between all pipeline stages in pipeline parallel mode. Note: Strategy file of each pipeline stage should be included in src_strategy_dirs. Args: src_strategy_dirs (str): The directory of strategy files including all pipeline stage which is saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)' dst_strategy_file (str): The file merged strategy to save. Raises: NotADirectoryError: `src_strategy_dirs` is not a directory. Examples: >>> # src_strategy_dir/stra0.ckpt, src_strategy_dir/stra1.ckpt ... src_strategy_dir/stra127.ckpt >>> merge_pipeline_strategys("./src_strategy_dir", "./dst_strategy.ckpt") """ dst_strategy_dir, _ = os.path.split(dst_strategy_file) if not os.path.exists(dst_strategy_dir): _make_dir(dst_strategy_dir, "path") if not os.path.isdir(src_strategy_dirs): raise NotADirectoryError("src_strategy_dirs {} is not a directory.".format(src_strategy_dirs)) src_strategy_files = os.path.join(src_strategy_dirs, "*.ckpt") dst_parallel_strategy_map = ms.train.node_strategy_pb2.ParallelStrategyMap() merged_stage = [] for src_strategy_file in glob.glob(src_strategy_files): src_parallel_strategy_map = _load_strategy_file(src_strategy_file) strategy_items = src_parallel_strategy_map.parallel_strategy_item layout_items = src_parallel_strategy_map.parallel_layout_item if not strategy_items or not layout_items: raise ValueError("The strategy file {} is empty".format(src_strategy_file)) pipeline_stage = strategy_items[0].parallel_strategys.stage if pipeline_stage in merged_stage: continue for layout_item in layout_items: layout_item.param_name = "-".join([str(pipeline_stage), layout_item.param_name]) dst_parallel_strategy_map.parallel_strategy_item.extend(strategy_items) dst_parallel_strategy_map.parallel_layout_item.extend(layout_items) merged_stage.append(pipeline_stage) dst_parallel_strategy_map.current_stage = 1 with open(dst_strategy_file, "wb") as f: f.write(dst_parallel_strategy_map.SerializeToString())
[docs]def rank_list_for_transform(rank_id, src_strategy_file=None, dst_strategy_file=None): """ List of original distributed checkpoint rank index for obtaining the target checkpoint of a rank_id during the distributed checkpoint conversion. Args: rank_id (int): The rank of which distributed checkpoint needs to be obtained after conversion. src_strategy_file (str): Name of source sharding strategy file which saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)'. when the 'src_strategy_file' is None, it means that the source sharding strategy is without any sharing for each parameter. Default:None. dst_strategy_file (str): Name of destination sharding strategy file which saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)'. when the 'dst_strategy_file' is None, it means that the destination sharding strategy is without any sharing for each parameter. Default:None. Returns: List, the rank list required for converting the distributed checkpoint of rank_id. Raises: ValueError: `src_strategy_file` or dst_strategy_file is incorrect. TypeError: `src_strategy_file` or dst_strategy_file is not a string. TypeError: `rank_id` is not a int. Examples: >>> rank_id = 0 >>> rank_list = rank_list_for_transform(rank_id, "./src_strategy.ckpt", "./dst_strategy.ckpt") >>> checkpoint_files_map = {} >>> for rank in rank_list: >>> checkpoint_files_map[rank] = "./pangu{}-100_2.ckpt".format(rank) """ if not isinstance(rank_id, int): raise TypeError("The rank_id should be a int.") if src_strategy_file is None: return [0] src_strategy_list, dst_strategy_list = _extract_src_dst_layout_map(rank_id, src_strategy_file, dst_strategy_file) src_stage_device_num = np.prod(src_strategy_list.get(list(src_strategy_list.keys())[0])[0]) if src_strategy_list \ is not None else 1 dst_stage_device_num = np.prod(dst_strategy_list.get(list(dst_strategy_list.keys())[0])[0]) if dst_strategy_list \ is not None else 1 if not src_strategy_list: raise ValueError("The src_strategy_file is empty.") local_rank_id = rank_id % dst_stage_device_num if dst_stage_device_num > 1 else rank_id needed_rank_list_in_local_stage = _rank_list_for_transform_parallel_checkpoint(local_rank_id, src_strategy_list, dst_strategy_list) result_set = set() handled_pipeline_stage = [] for _, layout in src_strategy_list.items(): for src_pipeline_stage_id in layout[6]: if src_pipeline_stage_id in handled_pipeline_stage: continue src_rank_id_start = src_pipeline_stage_id * src_stage_device_num result_set.update([src_rank_id_start + rank for rank in needed_rank_list_in_local_stage]) handled_pipeline_stage.append(src_pipeline_stage_id) return list(result_set)
[docs]def transform_checkpoint_by_rank(rank_id, checkpoint_files_map, save_checkpoint_file_name, src_strategy_file=None, dst_strategy_file=None): """ Transform distributed checkpoint from source sharding strategy to destination sharding strategy by rank for a network. Args: rank_id (int): The rank of which distributed checkpoint needs to be obtained after conversion. checkpoint_files_map (dict): The checkpoint files map whose key is the rank id and the value is the checkpoint file name. save_checkpoint_file_name (str): The file name to save the converted checkpoint. src_strategy_file (str): Name of source sharding strategy file which saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)'. when the 'src_strategy_file' is None, it means that the source sharding strategy is without any sharing for each parameter. Default:None. dst_strategy_file (str): Name of destination sharding strategy file which saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)'. when the 'dst_strategy_file' is None, it means that the destination sharding strategy is without any sharing for each parameter. Default:None. Raises: ValueError: `src_strategy_file` or `dst_strategy_file` is incorrect. ValueError: item in `checkpoint_files_map` is incorrect. ValueError: `save_checkpoint_file_name` is not end with ".ckpt". TypeError: `checkpoint_files_map` is not a dict. TypeError: `src_strategy_file` or `dst_strategy_file` is not a string. TypeError: `rank_id` is not a int. TypeError: `save_checkpoint_file_name` is not a string. Examples: >>> dst_device_num = 8 >>> for rank_id in range(dst_device_num) >>> rank_list = rank_list_for_transform(rank_id, "./src_strategy.ckpt", "./dst_strategy.ckpt") >>> checkpoint_files_map = {} >>> for rank in rank_list: >>> checkpoint_files_map[rank] = "./origin_checkpoint_rank{}/pangu{}-100_2.ckpt".format(rank) >>> save_checkpoint_file_name = "./new_checkpoint_rank{}/pangu{}-100_2.ckpt".format(rank_id) >>> transform_checkpoint_by_rank(rank_id, checkpoint_files_map, save_checkpoint_file_name, ... "./src_strategy.ckpt", "./dst_strategy.ckpt") """ if not isinstance(checkpoint_files_map, dict): raise TypeError("The checkpoint_files_map should be a dict.") if not isinstance(rank_id, int): raise TypeError("The rank_id should be a int.") if not isinstance(save_checkpoint_file_name, str): raise TypeError("The save_checkpoint_file_name should be a str.") if save_checkpoint_file_name[-5:] != ".ckpt": raise ValueError("The save_checkpoint_file_name {} should end with .ckpt".format(save_checkpoint_file_name)) if dst_strategy_file and os.path.dirname(dst_strategy_file) and not os.path.exists( os.path.dirname(dst_strategy_file)): raise ValueError("The director of dst_strategy_file: {} is not exists.". format(os.path.dirname(dst_strategy_file))) for rank, local_file in checkpoint_files_map.items(): if not os.path.exists(local_file): raise ValueError("Checkpoint file {} in rank {} not exits: ".format(local_file, rank)) param_total_dict = defaultdict(dict) param_attr_dict = defaultdict(dict) src_strategy_list, dst_strategy_list = _extract_src_dst_layout_map(rank_id, src_strategy_file, dst_strategy_file) # src rank => local rank inside pipeline stage src_stage_device_num = np.prod(src_strategy_list.get(list(src_strategy_list.keys())[0])[0]) if src_strategy_list \ is not None else 1 dst_stage_device_num = np.prod(dst_strategy_list.get(list(dst_strategy_list.keys())[0])[0]) if dst_strategy_list \ is not None else 1 origin_dst_strategy_list = _extract_layout_map(dst_strategy_file) origin_src_strategy_list = _extract_layout_map(src_strategy_file) for rank, file_name in checkpoint_files_map.items(): ckpt_dict = ms.load_checkpoint(file_name) for param_name, param in ckpt_dict.items(): # cut the parameter not in the pipeline stage. if _parameter_not_in_local_stage(param_name, origin_src_strategy_list, src_strategy_list) \ and _parameter_not_in_local_stage(param_name, origin_dst_strategy_list, dst_strategy_list): continue src_rank = rank % src_stage_device_num param_total_dict[param_name][src_rank] = param.data.asnumpy() param_attr_dict[param_name][src_rank] = (param.requires_grad, param.layerwise_parallel) local_rank_id = rank_id % dst_stage_device_num transform_param_list = _transform_parallel_checkpoint(local_rank_id, param_total_dict, param_attr_dict, src_strategy_list, dst_strategy_list) ms.save_checkpoint(transform_param_list, save_checkpoint_file_name)
[docs]def transform_checkpoints(src_checkpoints_dir, dst_checkpoints_dir, ckpt_prefix, src_strategy_file=None, dst_strategy_file=None): """ Transform distributed checkpoint from source sharding strategy to destination sharding strategy for a rank. Note: The src_checkpoints_dir directory structure should be organized like "src_checkpoints_dir/rank_0/a.ckpt", the rank number should be set to a subdirectory and the checkpoint file is stored in this subdirectory. If multiple files exist in a rank directory, the last file in the lexicgraphic order would be selected. Args: src_checkpoints_dir (str): The source checkpoints directory. dst_checkpoints_dir (str): The destination checkpoints directory to save the converted checkpoints. ckpt_prefix (str): The destination checkpoint name prefix. src_strategy_file (str): Name of source sharding strategy file which saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)'. when the 'src_strategy_file' is None, it means that the source sharding strategy is without any sharing for each parameter. Default:None. dst_strategy_file (str): Name of destination sharding strategy file which saved by 'mindspore.set_auto_parallel_context(strategy_ckpt_save_file)'. when the 'dst_strategy_file' is None, it means that the destination sharding strategy is without any sharing for each parameter. Default:None. Raises: ValueError: `src_strategy_file` or `dst_strategy_file` is incorrect. NotADirectoryError: `src_checkpoints_dir` or `dst_checkpoints_dir` is not a directory. ValueError: The checkpoint file is missing in `src_checkpoints_dir`. TypeError: `src_strategy_file` or `dst_strategy_file` is not a string. Examples: >>> transform_checkpoints(src_checkpoints_dir, dst_checkpoints_dir, "dst_checkpoint", ... "./src_strategy.ckpt", "./dst_strategy.ckpt") """ if not os.path.isdir(src_checkpoints_dir): raise NotADirectoryError("src_checkpoints_dir {} is not a directory.".format(src_checkpoints_dir)) _make_dir(dst_checkpoints_dir, "path") if not isinstance(ckpt_prefix, str): raise TypeError("The ckpt_prefix should be a str.") checkpoints_rank_dir_list = os.path.join(src_checkpoints_dir, "rank_[0-9]*") all_checkpoint_files_map = {} for checkpoint_dir in glob.glob(checkpoints_rank_dir_list): if not os.path.isdir(checkpoint_dir): ms.log.warning("{} is not a directory.".format(checkpoint_dir)) continue rank_id_str = checkpoint_dir.split('rank_')[-1] if not rank_id_str.isdigit(): ms.log.warning("{} is not a expected directory, the directory should end with rank_0/rank_1.....". format(checkpoint_dir)) continue rank_id = int(rank_id_str) checkpoint_file_name = os.path.join(checkpoint_dir, "*.ckpt") rank_ckpts = glob.glob(checkpoint_file_name) rank_ckpts.sort() for checkpoint_file in rank_ckpts: if not os.path.isfile(checkpoint_file): ms.log.warning("{} is not a checkpoint file.".format(checkpoint_file)) continue all_checkpoint_files_map[rank_id] = checkpoint_file needed_rank_list_map = defaultdict(list) dst_stage_device_num = _get_device_num_from_strategy(dst_strategy_file) src_stage_device_num = _get_device_num_from_strategy(src_strategy_file) dst_stage_num = _extract_pipeline_stage_num(dst_strategy_file) dst_device_num = dst_stage_device_num * dst_stage_num origin_src_strategy_list = _extract_layout_map(src_strategy_file) origin_dst_strategy_list = _extract_layout_map(dst_strategy_file) for rank in range(dst_device_num): needed_rank_list = rank_list_for_transform(rank, src_strategy_file, dst_strategy_file) for needed_rank in needed_rank_list: if needed_rank not in all_checkpoint_files_map: raise ValueError("The checkpoint file of rank{} is needed for converting rank{}'s checkpoint, " "but it is missing.".format(needed_rank, rank)) needed_rank_list_key = "-".join([str(r) for r in needed_rank_list]) needed_rank_list_map[needed_rank_list_key].append(rank) for needed_rank_list_key, transform_rank_list in needed_rank_list_map.items(): param_total_dict = defaultdict(dict) param_attr_dict = defaultdict(dict) needed_rank_list = needed_rank_list_key.split("-") for needed_rank in needed_rank_list: ckpt_dict = ms.load_checkpoint(all_checkpoint_files_map.get(int(needed_rank))) for param_name, param in ckpt_dict.items(): src_rank = int(needed_rank) % src_stage_device_num param_total_dict[param_name][src_rank] = param.data.asnumpy() param_attr_dict[param_name][src_rank] = (param.requires_grad, param.layerwise_parallel) for transform_rank in transform_rank_list: param_total_dict_copy = copy.deepcopy(param_total_dict) src_strategy_list, dst_strategy_list = _extract_src_dst_layout_map(transform_rank, src_strategy_file, dst_strategy_file) # cut the parameter not in the pipeline stage. for param in list(param_total_dict_copy.keys()): if _parameter_not_in_local_stage(param, origin_src_strategy_list, src_strategy_list) \ and _parameter_not_in_local_stage(param, origin_dst_strategy_list, dst_strategy_list): param_total_dict_copy.pop(param) local_rank_id = transform_rank % dst_stage_device_num transform_param_list = _transform_parallel_checkpoint(local_rank_id, param_total_dict_copy, param_attr_dict, src_strategy_list, dst_strategy_list) save_checkpoint_file = "{}{}.ckpt".format(ckpt_prefix, transform_rank) save_checkpoint_file_dir = os.path.join(dst_checkpoints_dir, "rank_{}".format(transform_rank)) if not os.path.exists(save_checkpoint_file_dir): _make_dir(save_checkpoint_file_dir, "path") save_checkpoint_file_name = os.path.join(save_checkpoint_file_dir, save_checkpoint_file) ms.save_checkpoint(transform_param_list, save_checkpoint_file_name) del param_total_dict_copy del param_total_dict