# Copyright 2024 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Transform distributed safetensors"""
from __future__ import absolute_import
import os
import time
import glob
import re
import math
import json
from collections import defaultdict
import multiprocessing as mp
import numpy as np
import mindspore as ms
from mindspore.parallel._parallel_serialization import _get_device_num_from_strategy, _make_dir, \
_extract_layout_map, _extract_src_dst_layout_map, _parameter_not_in_local_stage, _extract_pipeline_stage_num, \
_insert_opt_shard_reshape, _extract_src_dst_layout_map_by_src
from mindspore.parallel._tensor import _get_tensor_strategy, _construct_from_to_tensor_layout, \
_get_needed_rank_transform_operator_map_by_layouts, \
_generate_transform_operator_stack, _apply_tensor_transform_operators, _construct_tensor_layout_for_opt_shard, \
_extract_layout_item, _load_tensor_shape
from mindspore.parallel._parallel_serialization import _build_searched_strategy, _load_protobuf_strategy, \
_convert_to_list
from safetensors.numpy import save_file, load_file
from safetensors import safe_open
def _load_and_transform(path, name_map, load_func, transform_func):
if load_func is not None:
param_dict = load_func(path)
else:
param_dict = path
transform_dict = {}
for k, v in param_dict.items():
new_name = name_map.get(k, k) if name_map is not None else k
transform_dict[new_name] = transform_func(v, new_name)
return transform_dict
def _transform_tensor_to_numpy(path, name_map=None):
return _load_and_transform(path, name_map, ms.load_checkpoint, lambda v, new_name: v.asnumpy())
def _transform_numpy_to_tensor(path, name_map=None):
return _load_and_transform(path, name_map, load_file, lambda v, new_name: ms.Parameter(v, name=new_name))
def _process_file(file_info):
cur_ckpt_path, name_map, save_path, file = file_info
param_dict_numpy = _transform_tensor_to_numpy(cur_ckpt_path, name_map)
safetensors_filename = file.replace(".ckpt", ".safetensors")
dst_file = os.path.join(save_path, safetensors_filename)
save_file(param_dict_numpy, dst_file)
def _process_file_safetensors(file_info):
cur_safe_path, name_map, save_path, file = file_info
param_dict_tensor = _transform_numpy_to_tensor(cur_safe_path, name_map)
ckpt_filename = file.replace(".safetensors", ".ckpt")
dst_file = os.path.join(save_path, ckpt_filename)
ms.save_checkpoint(param_dict_tensor, dst_file)
def _gather_tasks(file_path, save_path, file_name_regex, name_map):
"""gather transform rank together"""
tasks = []
for root, dirs, _ in os.walk(file_path):
if root != file_path:
continue
rank_dirs = [d for d in dirs if d.startswith('rank')]
if not rank_dirs:
raise ValueError(
f"For 'ckpt_to_safetensors', no directories starting with 'rank' found in {file_path}")
for rank_dir in rank_dirs:
rank_dir_path = os.path.join(root, rank_dir)
dst_root = os.path.join(save_path,
os.path.relpath(rank_dir_path, file_path)) if save_path else rank_dir_path
os.makedirs(dst_root, exist_ok=True)
tasks.extend(
(os.path.join(rank_dir_path, file), name_map, dst_root, file)
for file in os.listdir(rank_dir_path)
if file.endswith(".ckpt") and (file_name_regex is None or re.findall(file_name_regex, file))
)
return tasks
def _progress_bar(iterable, total=None):
"""
Decorate an iterable object, returning an iterator which acts exactly
like the original iterable, but prints a dynamically updating
progressbar every time a value is requested.
"""
if total is None:
total = len(iterable)
start_time = time.time()
def print_progress_bar(iteration):
percent = f"{100 * (iteration / float(total)):.1f}"
bar_length = 40
filled_length = int(bar_length * iteration // total)
bar = '█' * filled_length + '-' * (bar_length - filled_length)
elapsed_time = time.time() - start_time
estimated_total_time = elapsed_time / iteration * total
remaining_time = estimated_total_time - elapsed_time
elapsed_time_str = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
remaining_time_str = time.strftime("%H:%M:%S", time.gmtime(remaining_time))
print(f'\r{percent}%|{bar}|[{elapsed_time_str}<{remaining_time_str}]', end='')
if iteration == total:
print()
for i, item in enumerate(iterable, start=1):
yield item
print_progress_bar(i)
[docs]def ckpt_to_safetensors(file_path, save_path=None, name_map=None, file_name_regex=None, processes_num=1):
"""
Converts MindSpore checkpoint files into safetensors format and saves them to `save_path`.
Safetensors is a reliable and portable machine learning model storage format introduced by Huggingface,
used for securely storing Tensors with fast speed (zero copy).
Note:
The number of multiprocess settings is related to the size of the host, and it is not recommended to set it
too large, otherwise it may cause freezing.
The safetensors format does not support the enc verification function. If ckpt is enabled to save enc
verification, an error will be generated when performing the conversion.
The safetensors format currently does not support crc verification function. If ckpt contains crc verification
information, the crc verification information will be lost after conversion to safetensors.
Args:
file_path (str): Path to the directory containing checkpoint files or a single checkpoint file (.ckpt).
save_path (str, optional): Directory path where safetensors files will be saved. Defaults: ``None``.
name_map (dict, optional): Dictionary mapping original parameter names to new names. Defaults: ``None``.
file_name_regex (str, optional): Regular expression used to match the file that needs to be converted.
Defaults: ``None``.
processes_num (int, optional): Number of processes to use for parallel processing. Defaults: 1.
Raises:
ValueError: If the input path is invalid or the save_path is not a directory,
or the file_path does not end with '.ckpt'.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> import mindspore as ms
>>> ms.ckpt_to_safetensors("./ckpt_save_path")
>>> ms.ckpt_to_safetensors("./ckpt_save_path/rank0/checkpoint_0.ckpt")
>>> ms.ckpt_to_safetensors(file_path="./ckpt_save_path/rank0/checkpoint_0.ckpt", save_path="./new_path/")
>>> namemap = {"lin.weight":"new_name"}
>>> ms.ckpt_to_safetensors("./ckpt_save_path/rank0/checkpoint_0.ckpt", "./new_path/", namemap)
"""
is_dir = os.path.isdir(file_path)
is_file = os.path.isfile(file_path)
if not is_dir and not is_file:
raise ValueError(f"For 'ckpt_to_safetensors', the input path must be a valid path or file, but got {file_path}")
if save_path and os.path.splitext(save_path)[1]:
raise ValueError(f"For 'ckpt_to_safetensors', the save_path must be a directory, but got '{save_path}'")
if name_map is not None and not isinstance(name_map, dict):
raise ValueError(
f"For 'ckpt_to_safetensors', the type of 'name_map' must be a directory, but got '{type(name_map)}'")
if is_dir:
tasks = _gather_tasks(file_path, save_path, file_name_regex, name_map)
with mp.Pool(processes=processes_num) as pool:
list(_progress_bar(pool.imap(_process_file, tasks), total=len(tasks)))
elif is_file:
if not file_path.endswith(".ckpt"):
raise ValueError(f"For 'ckpt_to_safetensors', the input file must be a .ckpt file, but got {file_path}")
if file_name_regex is not None and not re.findall(file_name_regex, file_path):
raise ValueError(f"For 'ckpt_to_safetensors', the input file does not match the regular expression.")
if save_path and not os.path.exists(save_path):
os.makedirs(save_path, exist_ok=True)
param_dict_numpy = _transform_tensor_to_numpy(file_path, name_map)
safetensors_filename = os.path.basename(file_path).replace(".ckpt", ".safetensors")
dst_file = os.path.join(save_path if save_path else os.path.dirname(file_path), safetensors_filename)
save_file(param_dict_numpy, dst_file)
def _gather_safetensors_tasks(file_path, save_path, file_name_regex, name_map):
"""gather transform rank together"""
tasks = []
for root, dirs, _ in os.walk(file_path):
if root != file_path:
continue
rank_dirs = [d for d in dirs if d.startswith('rank')]
if not rank_dirs:
raise ValueError(
f"For 'safetensors_to_ckpt', no directories starting with 'rank' found in {file_path}")
for rank_dir in rank_dirs:
rank_dir_path = os.path.join(root, rank_dir)
dst_root = os.path.join(save_path,
os.path.relpath(rank_dir_path, file_path)) if save_path else rank_dir_path
os.makedirs(dst_root, exist_ok=True)
tasks.extend(
(os.path.join(rank_dir_path, file), name_map, dst_root, file)
for file in os.listdir(rank_dir_path)
if file.endswith(".safetensors") and (file_name_regex is None or re.findall(file_name_regex, file))
)
return tasks
[docs]def safetensors_to_ckpt(file_path, save_path=None, name_map=None, file_name_regex=None, processes_num=1):
"""
Converts safetensors files into MindSpore checkpoint format and saves them to `save_path`.
Safetensors is a reliable and portable machine learning model storage format introduced by Huggingface,
used for securely storing Tensors with fast speed (zero copy).
Note:
The number of multiprocess settings is related to the size of the host, and it is not recommended to set it
too large, otherwise it may cause freezing.
Args:
file_path (str): Path to the directory containing safetensors files or a single safetensors file (.safetensors).
save_path (str, optional): Directory path where checkpoint files will be saved. Defaults: ``None``.
name_map (dict, optional): Dictionary mapping original parameter names to new names. Defaults: ``None``.
file_name_regex (str, optional): Regular expression used to match the file that needs to be converted.
Defaults: ``None``.
processes_num (int, optional): Number of processes to use for parallel processing. Defaults: 1.
Raises:
ValueError: If the input path is invalid, the save_path is not a directory,
or the file_path does not end with '.safetensors'.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> import mindspore as ms
>>> ms.safetensors_to_ckpt("./safetensors_save_path")
>>> ms.safetensors_to_ckpt("./safetensors_save_path/rank0/checkpoint_0.safetensors")
>>> ms.safetensors_to_ckpt("./safetensors_save_path/rank0/checkpoint_0.safetensors", "./new_path/")
>>> namemap = {"lin.weight":"new_name"}
>>> ms.safetensors_to_ckpt("./safetensors_save_path/rank0/checkpoint_0.safetensors", "./new_path/", namemap)
"""
is_dir = os.path.isdir(file_path)
is_file = os.path.isfile(file_path)
if not is_dir and not is_file:
raise ValueError(f"For 'safetensors_to_ckpt', the input path must be a valid path or file, but got {file_path}")
if save_path and os.path.splitext(save_path)[1]:
raise ValueError(f"For 'safetensors_to_ckpt', the save_path must be a directory, but got '{save_path}'")
if name_map is not None and not isinstance(name_map, dict):
raise ValueError(
f"For 'safetensors_to_ckpt', the type of 'name_map' must be a directory, but got '{type(name_map)}'")
if is_dir:
tasks = _gather_safetensors_tasks(file_path, save_path, file_name_regex, name_map)
with mp.Pool(processes=processes_num) as pool:
list(_progress_bar(pool.imap(_process_file_safetensors, tasks), total=len(tasks)))
elif is_file:
if not file_path.endswith(".safetensors"):
raise ValueError(
f"For 'safetensors_to_ckpt', the input file must be a .safetensors file, but got {file_path}")
if file_name_regex is not None and not re.findall(file_name_regex, file_path):
raise ValueError(f"For 'safetensors_to_ckpt', the input file does not match the regular expression.")
if save_path and not os.path.exists(save_path):
os.makedirs(save_path, exist_ok=True)
param_dict_tensor = _transform_numpy_to_tensor(file_path, name_map)
ckpt_filename = os.path.basename(file_path).replace(".safetensors", ".ckpt")
dst_file = os.path.join(save_path if save_path else os.path.dirname(file_path), ckpt_filename)
ms.save_checkpoint(param_dict_tensor, dst_file)
def _check_transform_safetensors(src_safetensors_dir, ckpt_prefix, src_strategy_file, dst_strategy_file):
"""check _transform_safetensors input"""
if not isinstance(ckpt_prefix, str):
raise TypeError("The ckpt_prefix should be a str.")
if src_strategy_file and os.path.dirname(src_strategy_file) and not os.path.exists(
os.path.dirname(src_strategy_file)):
raise ValueError("The director of src_strategy_file: {} is not exists.".
format(os.path.dirname(src_strategy_file)))
if dst_strategy_file and os.path.dirname(dst_strategy_file) and not os.path.exists(
os.path.dirname(dst_strategy_file)):
raise ValueError("The director of dst_strategy_file: {} is not exists.".
format(os.path.dirname(dst_strategy_file)))
def _check_output_format(output_format):
if output_format not in ["safetensors", "ckpt"]:
raise ValueError(f"For 'transform_safetensors', the output_format must be "
f"'safetensors' or 'ckpt', but got {output_format}.")
def _split_protobuf_strategy(merged_strategy_file):
"""split src_strategy_file by pp"""
dst_parallel_strategy_map = _load_protobuf_strategy(merged_strategy_file)
if not dst_parallel_strategy_map.parallel_strategy_item or not dst_parallel_strategy_map.parallel_layout_item:
raise ValueError(f"The merged strategy file {merged_strategy_file} is empty")
src_dict = {}
for layout_item in dst_parallel_strategy_map.parallel_layout_item:
stage, _ = layout_item.param_name.split('-', 1)
stage = int(stage)
if stage not in src_dict:
src_dict[stage] = {}
parameter_name = layout_item.param_name
layout = layout_item.parallel_layouts
src_dict[stage][parameter_name] = layout
return src_dict
def _transform_safetensors(src_safetensors_dir, dst_safetensors_dir, ckpt_prefix, src_strategy_file=None,
dst_strategy_file=None, process_num=1, output_format="safetensors"):
"""Transform distributed safetensors from source sharding strategy to destination sharding strategy for a rank."""
_check_transform_safetensors(src_safetensors_dir, ckpt_prefix, src_strategy_file, dst_strategy_file)
_check_output_format(output_format)
_make_dir(dst_safetensors_dir, "path")
all_safetensor_files_map = _collect_safetensor_files(src_safetensors_dir)
dst_strategy_dict = _build_searched_strategy(dst_strategy_file)
pipeline_stage_num = _extract_pipeline_stage_num(src_strategy_file)
dst_stage_num = _extract_pipeline_stage_num(dst_strategy_file)
if pipeline_stage_num > 1 and dst_stage_num == 1:
stage_dict = _split_protobuf_strategy(src_strategy_file)
processes = []
manager = mp.Manager()
_transform_param_list = manager.list()
for _, src_strategy_dict in stage_dict.items():
p = mp.Process(target=_transform_stage_safetensors,
args=(src_strategy_dict, dst_strategy_dict, ckpt_prefix,
dst_safetensors_dir, output_format, all_safetensor_files_map, process_num,
_transform_param_list))
p.start()
processes.append(p)
for p in processes:
p.join()
_save_final_safetensors(_transform_param_list, output_format)
else:
src_strategy_dict = _build_searched_strategy(src_strategy_file)
_transform_stage_safetensors(src_strategy_dict, dst_strategy_dict, ckpt_prefix,
dst_safetensors_dir, output_format, all_safetensor_files_map, process_num,
_transform_param_list=None)
def _transform_stage_safetensors(src_strategy_dict, dst_strategy_dict, ckpt_prefix,
dst_safetensors_dir, output_format, all_safetensor_files_map, process_num,
_transform_param_list):
"""Transform distributed safetensors by stage"""
src_stage_device_num = _get_device_num_from_strategy(src_strategy_dict)
dst_stage_device_num = _get_device_num_from_strategy(dst_strategy_dict)
origin_src_strategy_list = _extract_layout_map(src_strategy_dict)
origin_dst_strategy_list = _extract_layout_map(dst_strategy_dict)
needed_rank_list_map = _find_needed_ranks(src_strategy_dict, dst_strategy_dict)
for needed_rank_list, rank in needed_rank_list_map.items():
for needed_rank in needed_rank_list.split("-"):
if int(needed_rank) not in all_safetensor_files_map:
raise ValueError("The safetensor file of rank{} is needed for converting rank{}'s safetensor, "
"but it is missing.".format(needed_rank, rank))
if process_num > len(needed_rank_list_map):
ms.log.warning("The value of process_num cannot be greater than that of needed_rank_list_map.")
process_num = len(needed_rank_list_map)
dst_stage_num = _extract_pipeline_stage_num(dst_strategy_dict)
if len(needed_rank_list_map) == 1 and dst_stage_num > 1:
process_num = dst_stage_num
_transform_safetensors_with_parallel(needed_rank_list_map, all_safetensor_files_map, src_stage_device_num,
dst_stage_device_num, src_strategy_dict, dst_strategy_dict,
origin_src_strategy_list, origin_dst_strategy_list, ckpt_prefix,
dst_safetensors_dir, process_num, output_format,
_transform_param_list)
def _distribute_files_by_size(all_safetensor_files_map, needed_rank_list_map, process_num):
"""
Distributes files across multiple processes based on file size to balance the processing load.
"""
if process_num == 1:
return [needed_rank_list_map]
# Calculate the size of each file.
# if src==1, dst pp>1, split for pp number.
if len(needed_rank_list_map) == 1:
src_rank = next(iter(needed_rank_list_map.keys()))
dst_list = next(iter(needed_rank_list_map.values()))
size = len(dst_list) // process_num
split_list = [dst_list[i:i + size] for i in range(0, len(dst_list), size)]
part_list_dict = [dict() for _ in range(process_num)]
for index in range(process_num):
part_list_dict[index][src_rank] = split_list[index]
return part_list_dict
rank_size = dict()
for rank_id, file_name in all_safetensor_files_map.items():
tmp_size = os.path.getsize(file_name) / 1024 / 1024
rank_size[rank_id] = tmp_size
# Obtain the rank and size required by all parts.
part_total = []
for index, (k, v) in enumerate(needed_rank_list_map.items()):
tmp_part = []
key_ele = k.split("-")
tmp_size = 0
for ele in key_ele:
tmp_size += rank_size[int(ele)]
tmp_part.append(index)
tmp_part.append(tmp_size)
part_total.append(tmp_part)
# Sort each part by size.
part_total = sorted(part_total, key=lambda x: x[1], reverse=True)
part_list = [[] for _ in range(process_num)]
part_size = [[] for _ in range(process_num)]
for [index, size] in part_total:
min_sum = float('inf')
min_idx = -1
for ele in range(process_num):
if sum(part_size[ele]) < min_sum:
min_sum = sum(part_size[ele])
min_idx = ele
part_list[min_idx].append(index)
part_size[min_idx].append(size)
part_list_dict = [dict() for _ in range(process_num)]
for index, (k, v) in enumerate(needed_rank_list_map.items()):
for idd, ele in enumerate(part_list):
if index in ele:
part_list_dict[idd][k] = v
break
return part_list_dict
def _transform_safetensors_with_parallel(needed_rank_list_map, all_safetensor_files_map, src_stage_device_num,
dst_stage_device_num, src_strategy_dict, dst_strategy_dict,
origin_src_strategy_list, origin_dst_strategy_list, ckpt_prefix,
dst_safetensors_dir, process_num, output_format,
_transform_param_list):
"""
Transforms safetensors files to a specified format using parallel processing.
"""
part_list_dict = _distribute_files_by_size(all_safetensor_files_map, needed_rank_list_map, process_num)
# cal param name for every pipeline, save in pipe_param_list.
pipe_num = _extract_pipeline_stage_num(dst_strategy_dict)
pipe_param_list = [None for _ in range(max(pipe_num, process_num))]
if len(needed_rank_list_map) == 1 and pipe_num > 1:
pipe_param_list = [[] for _ in range(pipe_num)]
layout_map = _convert_to_list(dst_strategy_dict)
for name, layout in layout_map.items():
pipe_param_list[layout[6][0]].append(name)
processes = []
for i in range(process_num):
p = mp.Process(target=_transform_safetensors_single, args=(
part_list_dict[i], all_safetensor_files_map, src_stage_device_num, dst_stage_device_num,
src_strategy_dict, dst_strategy_dict, origin_src_strategy_list, origin_dst_strategy_list,
ckpt_prefix, dst_safetensors_dir, output_format, _transform_param_list, pipe_param_list[i]))
p.start()
processes.append(p)
for p in processes:
p.join()
def _transform_safetensors_single(needed_rank_list_map, all_safetensor_files_map, src_stage_device_num,
dst_stage_device_num,
src_strategy_dict, dst_strategy_dict, origin_src_strategy_list,
origin_dst_strategy_list,
ckpt_prefix, dst_safetensors_dir, output_format,
_transform_param_list, pipe_param_list=None, file_index=None, unified_flag=False):
"""
Transforms safetensors files to a specified format without using parallel processing.
"""
src_strategy_list_keys = _convert_to_list(src_strategy_dict).keys() if src_strategy_dict else []
dst_strategy_list_keys = _convert_to_list(dst_strategy_dict).keys() if dst_strategy_dict else []
for needed_rank_list_key, transform_rank_list in needed_rank_list_map.items():
param_total_dict = defaultdict(dict)
param_attr_dict = defaultdict(dict)
needed_rank_list = needed_rank_list_key.split("-")
for needed_rank in needed_rank_list:
if pipe_param_list:
saftensor_dict = dict()
with safe_open(all_safetensor_files_map.get(int(needed_rank)), framework="np") as f:
if not unified_flag:
all_param_name_set = set(f.keys())
src_param_name_set = set(src_strategy_list_keys)
dst_param_name_set = set(dst_strategy_list_keys)
hyper_param_set = all_param_name_set - (src_param_name_set & dst_param_name_set)
pipe_param_list.extend(list(hyper_param_set))
for param_name in pipe_param_list:
if param_name not in f.keys():
# param not in ckpt file, check reason
continue
output = f.get_tensor(param_name)
saftensor_dict[param_name] = output
else:
saftensor_dict = load_file(all_safetensor_files_map.get(int(needed_rank)))
for param_name, param in saftensor_dict.items():
src_rank = int(needed_rank) % src_stage_device_num
param_total_dict[param_name][src_rank] = param
param_attr_dict[param_name][src_rank] = (True, False)
for transform_rank in transform_rank_list:
param_total_dict_keys = list(param_total_dict.keys())
src_strategy_list, dst_strategy_list = _extract_src_dst_layout_map(transform_rank, src_strategy_dict,
dst_strategy_dict)
# cut the parameter not in the pipeline stage.
for param in list(param_total_dict.keys()):
if _parameter_not_in_local_stage(param, origin_src_strategy_list, src_strategy_list) \
and _parameter_not_in_local_stage(param, origin_dst_strategy_list, dst_strategy_list):
param_total_dict_keys.remove(param)
local_rank_id = transform_rank % dst_stage_device_num
transform_param_dict = _transform_parallel_safetensor(local_rank_id, param_total_dict,
param_attr_dict, src_strategy_list, dst_strategy_list,
param_total_dict_keys)
if file_index is not None:
save_safetensor_file = f"part{file_index}.{output_format}"
save_safetensor_file_dir = dst_safetensors_dir
else:
save_safetensor_file = f"{ckpt_prefix}{transform_rank}.{output_format}"
save_safetensor_file_dir = os.path.join(dst_safetensors_dir, "rank_{}".format(transform_rank))
if not os.path.exists(save_safetensor_file_dir):
_make_dir(save_safetensor_file_dir, "path")
save_file_name = os.path.join(save_safetensor_file_dir, save_safetensor_file)
if _transform_param_list is not None:
_transform_param_list.append({save_file_name: transform_param_dict})
else:
if output_format == "safetensors":
save_file(transform_param_dict, save_file_name)
else:
transform_param_dict = _load_and_transform(transform_param_dict, None, None,
transform_func=lambda v, name: ms.Parameter(v,
name=name))
ms.save_checkpoint(transform_param_dict, save_file_name)
del param_total_dict_keys
del param_total_dict
def _save_final_safetensors(_transform_param_list, output_format):
"""save file with list"""
new_transform_dict = {}
for transform_dict in _transform_param_list:
for save_file_name, transform_param_dict in transform_dict.items():
if save_file_name not in new_transform_dict:
new_transform_dict[save_file_name] = transform_param_dict
else:
new_transform_dict[save_file_name].update(transform_param_dict)
for save_file_name, transform_param_dict in new_transform_dict.items():
if output_format == "safetensors":
save_file(transform_param_dict, save_file_name)
else:
transform_param_dict = _load_and_transform(transform_param_dict, None, None,
transform_func=lambda v, name: ms.Parameter(v, name=name))
ms.save_checkpoint(transform_param_dict, save_file_name)
def transform_safetensors_by_stage(src_safetensors_dir, dst_safetensors_dir, ckpt_prefix,
src_strategy_file,
dst_strategy_file=None):
"""Transform safetensor for stage in src_strategy_file"""
param_total_dict = defaultdict(dict)
param_attr_dict = defaultdict(dict)
param_type_dict = defaultdict(dict)
src_strategy_list, dst_strategy_list, stage_id = _extract_src_dst_layout_map_by_src(src_strategy_file, \
dst_strategy_file)
src_stage_device_num = np.prod(src_strategy_list.get(list(src_strategy_list.keys())[0])[0]) if src_strategy_list \
is not None else 1
dst_stage_device_num = np.prod(dst_strategy_list.get(list(dst_strategy_list.keys())[0])[0]) if dst_strategy_list \
is not None else 1
origin_dst_strategy_list = _extract_layout_map(dst_strategy_file)
origin_src_strategy_list = _extract_layout_map(src_strategy_file)
safetensor_files_map = {}
src_rank_id_start = stage_id * src_stage_device_num
for local_rank in range(src_stage_device_num):
rank_id = src_rank_id_start + local_rank
safetensor_file_name = os.path.join(src_safetensors_dir, "rank_{}".format(rank_id), "*.safetensors")
rank_ckpts = glob.glob(safetensor_file_name)
rank_ckpts.sort()
for safetensor_file in rank_ckpts:
if not os.path.isfile(safetensor_file):
continue
safetensor_files_map[rank_id] = safetensor_file
for rank, local_file in safetensor_files_map.items():
if not os.path.exists(local_file):
raise ValueError("safetensor file {} in rank {} not exits: ".format(local_file, rank))
for rank, file_name in safetensor_files_map.items():
saftensor_dict = load_file(file_name)
for param_name, param in saftensor_dict.items():
# cut the parameter not in the pipeline stage.
if _parameter_not_in_local_stage(param_name, origin_src_strategy_list, src_strategy_list) \
and _parameter_not_in_local_stage(param_name, origin_dst_strategy_list, dst_strategy_list):
continue
src_rank = rank % src_stage_device_num
param_type_dict[param_name][src_rank] = str(param.data.dtype)
param_total_dict[param_name][src_rank] = param
param_attr_dict[param_name][src_rank] = (True, False)
for local_rank_id in range(dst_stage_device_num):
transform_param_dict = _transform_parallel_safetensor(local_rank_id, param_total_dict,
param_attr_dict, src_strategy_list, dst_strategy_list,
param_type_dict)
save_safetensor_file = "{}{}_part{}.safetensors".format(ckpt_prefix, local_rank_id, stage_id)
save_safetensor_file_dir = os.path.join(dst_safetensors_dir, "rank_{}".format(local_rank_id))
if not os.path.exists(save_safetensor_file_dir):
_make_dir(save_safetensor_file_dir, "path")
save_safetensor_file_name = os.path.join(save_safetensor_file_dir, save_safetensor_file)
save_file(transform_param_dict, save_safetensor_file_name)
def transform_safetensors_by_rank(rank_id, safetensor_files_map, save_safetensor_file_name,
src_strategy_file=None, dst_strategy_file=None):
"""
Transform distributed checkpoint from source sharding strategy to destination sharding strategy by rank.
"""
if not isinstance(safetensor_files_map, dict):
raise TypeError("The safetensor_files_map should be a dict.")
if not isinstance(rank_id, int):
raise TypeError("The rank_id should be a int.")
if not isinstance(save_safetensor_file_name, str):
raise TypeError("The save_safetensor_file_name should be a str.")
if not save_safetensor_file_name.endswith(".safetensors"):
raise ValueError(
"The save_safetensor_file_name {} should end with .safetensors".format(save_safetensor_file_name))
if dst_strategy_file and os.path.dirname(dst_strategy_file) and not os.path.exists(
os.path.dirname(dst_strategy_file)):
raise ValueError("The director of dst_strategy_file: {} is not exists.".
format(os.path.dirname(dst_strategy_file)))
for rank, local_file in safetensor_files_map.items():
if not os.path.exists(local_file):
raise ValueError("safetensor file {} in rank {} not exits: ".format(local_file, rank))
param_total_dict = defaultdict(dict)
param_attr_dict = defaultdict(dict)
param_type_dict = defaultdict(dict)
src_strategy_list, dst_strategy_list = _extract_src_dst_layout_map(rank_id, src_strategy_file, dst_strategy_file)
# src rank => local rank inside pipeline stage
src_stage_device_num = np.prod(src_strategy_list.get(list(src_strategy_list.keys())[0])[0]) if src_strategy_list \
is not None else 1
dst_stage_device_num = np.prod(dst_strategy_list.get(list(dst_strategy_list.keys())[0])[0]) if dst_strategy_list \
is not None else 1
origin_dst_strategy_list = _extract_layout_map(dst_strategy_file)
origin_src_strategy_list = _extract_layout_map(src_strategy_file)
for rank, file_name in safetensor_files_map.items():
saftensor_dict = load_file(file_name)
for param_name, param in saftensor_dict.items():
# cut the parameter not in the pipeline stage.
if _parameter_not_in_local_stage(param_name, origin_src_strategy_list, src_strategy_list) \
and _parameter_not_in_local_stage(param_name, origin_dst_strategy_list, dst_strategy_list):
continue
src_rank = rank % src_stage_device_num
param_type_dict[param_name][src_rank] = str(param.data.dtype)
# if param.data.dtype == mstype.bfloat16:
# param.set_dtype(mstype.float32)
param_total_dict[param_name][src_rank] = param
param_attr_dict[param_name][src_rank] = (True, False)
local_rank_id = rank_id % dst_stage_device_num
transform_param_dict = _transform_parallel_safetensor(local_rank_id, param_total_dict,
param_attr_dict, src_strategy_list, dst_strategy_list,
param_type_dict)
save_file(transform_param_dict, save_safetensor_file_name)
def _collect_safetensor_files(src_safetensors_dir, format='safetensors'):
"""
Collects all safetensors files from the specified directory and its subdirectories.
"""
if os.path.isfile(src_safetensors_dir) and format == 'safetensors' and src_safetensors_dir.endswith('safetensors'):
return {0: src_safetensors_dir}
safetensors_rank_dir_list = os.path.join(src_safetensors_dir, "rank_[0-9]*")
all_safetensor_files_map = {}
for safetensor_dir in glob.glob(safetensors_rank_dir_list):
if not os.path.isdir(safetensor_dir):
ms.log.warning("{} is not a directory.".format(safetensor_dir))
continue
rank_id_str = safetensor_dir.split('rank_')[-1]
if not rank_id_str.isdigit():
ms.log.warning("{} is not a expected directory, the directory should end with rank_0/rank_1.....".
format(safetensor_dir))
continue
rank_id = int(rank_id_str)
safetensor_file_name = os.path.join(safetensor_dir, f"*.{format}")
rank_ckpts = glob.glob(safetensor_file_name)
rank_ckpts.sort()
for safetensor_file in rank_ckpts:
if not os.path.isfile(safetensor_file):
ms.log.warning("{} is not a safetensor file.".format(safetensor_file))
continue
all_safetensor_files_map[rank_id] = safetensor_file
return all_safetensor_files_map
def _find_needed_ranks(src_strategy_dict, dst_strategy_dict):
"""
Identifies the ranks needed for transformation based on source and destination strategies.
"""
needed_rank_list_map = defaultdict(list)
dst_stage_device_num = _get_device_num_from_strategy(dst_strategy_dict)
dst_stage_num = _extract_pipeline_stage_num(dst_strategy_dict)
dst_device_num = dst_stage_device_num * dst_stage_num
for rank in _progress_bar(range(dst_device_num)):
needed_rank_list = ms.rank_list_for_transform(rank, src_strategy_dict, dst_strategy_dict)
needed_rank_list_key = "-".join([str(r) for r in needed_rank_list])
needed_rank_list_map[needed_rank_list_key].append(rank)
return needed_rank_list_map
def load_file_by_param_name(filename, parme_name_list):
result = {}
with safe_open(filename, framework="np") as f:
for k in parme_name_list:
result[k] = f.get_tensor(k)
return result
def _transform_parallel_safetensor(rank_id, param_total_dict, param_attr_dict, src_strategy_list,
dst_strategy_list, param_total_dict_keys=None):
"""
Transform model parallel dimension for distributed safetensor files.
"""
transform_param_dict = {}
device_num = -1
param_total_dict_keys = list(param_total_dict.keys()) if param_total_dict_keys is None else param_total_dict_keys
for param_name in param_total_dict_keys:
tensor_shape = list(param_total_dict[param_name].values())[0].shape
from_dev_matrix = [1]
from_tensor_map = [-1] * len(tensor_shape)
from_opt_shard_step = 0
from_opt_shard_size = 0
if src_strategy_list is not None:
if param_name not in src_strategy_list:
continue
from_dev_matrix, from_tensor_map, from_opt_shard_step, from_opt_shard_size = _extract_layout_item(
src_strategy_list.get(param_name))
to_dev_matrix_origin = [1]
to_tensor_map_origin = [-1] * len(tensor_shape)
to_opt_shard_step = 0
to_opt_shard_size = 0
if dst_strategy_list is not None:
if param_name not in dst_strategy_list:
continue
to_dev_matrix_origin, to_tensor_map_origin, to_opt_shard_step, to_opt_shard_size = _extract_layout_item(
dst_strategy_list.get(param_name))
# Add optimizer sharding dim for tensor layout
device_num = np.prod(from_dev_matrix)
if device_num < 1:
raise ValueError("None of the parameters in safetensor file are in either src strategy or "
"dst strategy. Please check correctness of strategy files. "
"Param name is: {}, rank_id is {}.".format(param_name, rank_id))
param_strategy = _get_tensor_strategy(from_dev_matrix, from_tensor_map)
origin_tensor_shape = ()
for i, item in enumerate(tensor_shape):
if i == 0 and from_opt_shard_size > 0:
origin_tensor_shape += (item * param_strategy[i] * from_opt_shard_size,)
continue
origin_tensor_shape += (item * param_strategy[i],)
from_dev_matrix, from_tensor_map, from_full_tensor_shape = _construct_tensor_layout_for_opt_shard(
from_dev_matrix, from_tensor_map, from_opt_shard_step, from_opt_shard_size, origin_tensor_shape)
to_dev_matrix, to_tensor_map, to_full_tensor_shape = _construct_tensor_layout_for_opt_shard(
to_dev_matrix_origin, to_tensor_map_origin, to_opt_shard_step, to_opt_shard_size, origin_tensor_shape)
# Convert tensor layout to same device num
from_tensor_layout, to_tensor_layout = _construct_from_to_tensor_layout(from_full_tensor_shape, from_dev_matrix,
from_tensor_map, to_full_tensor_shape,
to_dev_matrix, to_tensor_map)
# when the from_layout is less devices, the safetensor_map for map[device_num] should using map[0]
device_list = list(range(0, np.prod(from_tensor_layout[0])))
if rank_id % device_num not in param_attr_dict[param_name]:
raise ValueError("The safetensor of rank {} is missing.".format(rank_id % device_num))
param_rank_map = _get_needed_rank_transform_operator_map_by_layouts(from_tensor_layout, to_tensor_layout,
device_list, rank_id)
from_info_tuple = (from_opt_shard_size, from_dev_matrix, from_tensor_map, from_full_tensor_shape)
to_info_tuple = (to_opt_shard_size, to_dev_matrix_origin, to_tensor_map_origin, origin_tensor_shape)
_insert_opt_shard_reshape(param_rank_map, from_info_tuple, to_info_tuple)
transform_operator_stack = _generate_transform_operator_stack(param_rank_map, rank_id)
param_total_dict_copy = param_total_dict[param_name].copy()
_apply_tensor_transform_operators(transform_operator_stack, param_total_dict_copy, device_num)
transform_param_dict[param_name] = param_total_dict_copy[rank_id % device_num]
# Handle those parameter like learning_rate, global_step which not in strategy_file.
for param_name in param_total_dict_keys:
if param_name not in transform_param_dict:
transform_para = param_total_dict[param_name][rank_id % device_num]
transform_param_dict[param_name] = transform_para
return transform_param_dict
[docs]def unified_safetensors(src_dir, src_strategy_file, dst_dir):
"""
Merge multiple safetensor files into a unified safetensor file.
Args:
src_dir (str): Source weight saving directory.
src_strategy_file (str): Source weight segmentation strategy file.
dst_dir (str): Target save directory.
Raises:
ValueError: If the safetensors file of rank is missing.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> import mindspore as ms
>>> src_dir = "/usr/safetensors/llama31B/4p_safetensors/"
>>> src_strategy_file = "/usr/safetensors/llama31B/strategy_4p.ckpt"
>>> dst_dir = "/usr/safetensors/llama31B/merge_llama31B_4p/"
>>> ms.unified_safetensors(src_dir, src_strategy_file, dst_dir)
"""
_check_transform_safetensors(src_dir, "", src_strategy_file, None)
_make_dir(dst_dir, "path")
if os.path.isfile(src_dir):
raise ValueError("For 'unified_safetensors', the 'src_dir' can not be a file.")
all_safetensor_files_map = _collect_safetensor_files(src_dir)
all_ckpt_files_map = _collect_safetensor_files(src_dir, format='ckpt')
if all_safetensor_files_map and all_ckpt_files_map:
raise ValueError("For 'unified_safetensors', the 'src_dir' cannot contain "
"both ckpt file and safetensors file simultaneously")
src_strategy_dict = _build_searched_strategy(src_strategy_file)
src_stage_device_num = _get_device_num_from_strategy(src_strategy_dict)
dst_stage_device_num = 1
origin_src_strategy_list = _extract_layout_map(src_strategy_dict)
origin_dst_strategy_list = None
needed_rank_list_map = _find_needed_ranks(src_strategy_dict, dst_strategy_dict=None)
for needed_rank_list, rank in needed_rank_list_map.items():
for needed_rank in needed_rank_list.split("-"):
if int(needed_rank) not in all_safetensor_files_map:
raise ValueError("The safetensor file of rank{} is needed for converting rank{}'s safetensor, "
"but it is missing.".format(needed_rank, rank))
layout_map = _convert_to_list(src_strategy_dict)
total_size = 0
for _, file_name in all_safetensor_files_map.items():
total_size += os.path.getsize(file_name) / 1024 / 1024 / 1024
split_num = math.ceil(total_size / 3)
name_list = list(layout_map.keys())
split_list = _split_list(name_list, split_num)
all_safetensor_files_map = _collect_safetensor_files(src_dir)
with safe_open(all_safetensor_files_map.get(0), framework="np") as f:
all_key = f.keys()
hyper_parameter = set(all_key) - set(name_list)
if hyper_parameter:
hyper_dict = {}
for key in hyper_parameter:
hyper_dict[key] = f.get_tensor(key)
save_file(hyper_dict, os.path.join(dst_dir, "hyper_param.safetensors"))
# save parameter map json
param_name_dict = dict()
for index, part_list in enumerate(split_list):
for name in part_list:
param_name_dict[name] = f"part{index}.safetensors"
json_str = json.dumps(param_name_dict, indent=4)
map_file = os.path.join(dst_dir, "param_name_map.json")
with open(map_file, 'w') as f:
f.write(json_str)
max_process = min(split_num, 100)
res = [i for i in range(split_num)]
res = _split_list(res, max_process)
processes = []
for i in range(max_process):
p = mp.Process(target=_transform_safetensors_single_semaphore, args=(
needed_rank_list_map, all_safetensor_files_map, src_stage_device_num, dst_stage_device_num,
src_strategy_dict, None, origin_src_strategy_list, origin_dst_strategy_list,
"", dst_dir, "safetensors", None, split_list, res[i], True))
p.start()
processes.append(p)
for p in processes:
p.join()
def _transform_safetensors_single_semaphore(needed_rank_list_map, all_safetensor_files_map,
src_stage_device_num,
dst_stage_device_num,
src_strategy_dict, dst_strategy_dict, origin_src_strategy_list,
origin_dst_strategy_list,
ckpt_prefix, dst_safetensors_dir, output_format,
_transform_param_list, pipe_param_list=None, file_index=None,
unified_flag=False):
for i in file_index:
_transform_safetensors_single(needed_rank_list_map, all_safetensor_files_map, src_stage_device_num,
dst_stage_device_num, src_strategy_dict, dst_strategy_dict,
origin_src_strategy_list,
origin_dst_strategy_list, ckpt_prefix, dst_safetensors_dir, output_format,
_transform_param_list, pipe_param_list[i], i, unified_flag)
def _split_list(split_list, split_num):
split_array = np.array_split(split_list, split_num)
return [array.tolist() for array in split_array]
def _load_parallel_checkpoint(total_safetensors_dir, dst_strategy_file, net=None, dst_safetensors_dir=None,
rank_id=None):
"""load parallel safetensors by merged file."""
file_list = os.listdir(total_safetensors_dir)
json_files = [file for file in file_list if file.endswith('.json')]
if len(json_files) != 1:
raise ValueError(f"For 'load_parallel_checkpoint', the number of json files in 'total_safetensors_dir' "
f"must be 1, but got {len(json_files)}.")
param_name_json = os.path.join(total_safetensors_dir, json_files[0])
with open(param_name_json, 'r') as f:
param_name_map = json.load(f)
if dst_strategy_file is not None:
_, dst_strategy_list = _extract_src_dst_layout_map(rank_id, None, dst_strategy_file)
param_list = dst_strategy_list.keys()
else:
dst_strategy_list = None
param_list = param_name_map.keys()
total_param = dict()
for param_name in param_list:
if param_name not in param_name_map:
continue
file_name = os.path.join(total_safetensors_dir, param_name_map[param_name])
with safe_open(file_name, framework="np") as f:
if param_name not in f.keys():
continue
sf_obj = f.get_slice(param_name)
param_dict = dict()
param_dict[param_name] = sf_obj
if dst_strategy_list is not None:
if param_name not in dst_strategy_list:
continue
slice_op, shape = _get_slice(rank_id, sf_obj, param_name, dst_strategy_list)
else:
slice_op, shape = slice(None, None, None), None
slice_param = sf_obj[slice_op]
if shape is not None:
slice_param = slice_param.reshape(shape)
total_param[param_name] = ms.Parameter(slice_param)
if 'hyper_param.safetensors' in file_list:
hyper_parameter_file_name = os.path.join(total_safetensors_dir, "hyper_param.safetensors")
with safe_open(hyper_parameter_file_name, framework="np") as f:
for key in f.keys():
total_param[key] = ms.Parameter(f.get_tensor(key))
if net is not None:
param_not_load, ckpt_not_load = ms.load_param_into_net(net, total_param)
return param_not_load, ckpt_not_load
_make_dir(os.path.join(dst_safetensors_dir, f"rank_{rank_id}"), "path")
ms.save_checkpoint(total_param, os.path.join(dst_safetensors_dir, f"rank_{rank_id}", f"net.safetensors"),
format='safetensors')
return None
def _get_slice(rank_id, sf_obj, param_name, dst_strategy_list):
"""get slice op"""
tensor_shape = sf_obj.get_shape()
to_dev_matrix_origin, to_tensor_map_origin, to_opt_shard_step, to_opt_shard_size = _extract_layout_item(
dst_strategy_list.get(param_name))
# Add optimizer sharding dim for tensor layout
to_dev_matrix, to_tensor_map, _ = _construct_tensor_layout_for_opt_shard(
to_dev_matrix_origin, to_tensor_map_origin, to_opt_shard_step, to_opt_shard_size, tensor_shape)
slice_op = _load_tensor_shape(to_dev_matrix, to_tensor_map, full_shape=tensor_shape, rank_id=rank_id)
shape = None
if to_opt_shard_size > 0:
to_tensor_strategy = _get_tensor_strategy(to_dev_matrix_origin, to_tensor_map_origin)
to_slice_tensor_shape = ()
for i, item in enumerate(tensor_shape):
if i == 0 and to_opt_shard_size > 0:
to_slice_tensor_shape += (item // (to_tensor_strategy[i] * to_opt_shard_size),)
continue
to_slice_tensor_shape += (item // to_tensor_strategy[i],)
shape = list(to_slice_tensor_shape)
return slice_op, shape
__all__ = ["_transform_safetensors", "transform_safetensors_by_stage",
"transform_safetensors_by_rank", "ckpt_to_safetensors", "safetensors_to_ckpt", "unified_safetensors"]