# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Checkpoint related classes and functions."""
import os
import stat
import time
import threading
import mindspore.context as context
from mindspore import log as logger
from mindspore._checkparam import check_bool, check_int_non_negative
from mindspore.train._utils import _make_directory
from mindspore.train.serialization import _exec_save_checkpoint, _save_graph
from ._callback import Callback, set_cur_net
_cur_dir = os.getcwd()
_save_dir = _cur_dir
def _check_file_name_prefix(file_name_prefix):
"""
Check file name valid or not.
File name can't include '/'. This file name naming convention only apply to Linux.
"""
if not isinstance(file_name_prefix, str) or file_name_prefix.find('/') >= 0:
return False
return True
def _chg_ckpt_file_name_if_same_exist(directory, prefix):
"""Check if there is a file with the same name."""
files = os.listdir(directory)
suffix_num = 0
pre_len = len(prefix)
for filename in files:
name_ext = os.path.splitext(filename)
if name_ext[-1] != ".ckpt":
continue
# find same prefix file
if filename.find(prefix) == 0 and not filename[pre_len].isalpha():
# add the max suffix + 1
index = filename[pre_len:].find("-")
if index == 0:
suffix_num = max(suffix_num, 1)
elif index != -1:
num = filename[pre_len+1:pre_len+index]
if num.isdigit():
suffix_num = max(suffix_num, int(num)+1)
if suffix_num != 0:
prefix = prefix + "_" + str(suffix_num)
return prefix
[docs]class CheckpointConfig:
"""
The configuration of model checkpoint.
Note:
During the training process, if dataset is transmitted through the data channel,
It is suggested to set 'save_checkpoint_steps' to an integer multiple of loop_size.
Otherwise, the time to save the checkpoint may be biased.
Args:
save_checkpoint_steps (int): Steps to save checkpoint. Default: 1.
save_checkpoint_seconds (int): Seconds to save checkpoint. Default: 0.
Can't be used with save_checkpoint_steps at the same time.
keep_checkpoint_max (int): Maximum number of checkpoint files can be saved. Default: 5.
keep_checkpoint_per_n_minutes (int): Keep one checkpoint every n minutes. Default: 0.
Can't be used with keep_checkpoint_max at the same time.
integrated_save (bool): Whether to perform integrated save function in automatic model parallel scene.
Default: True. Integrated save function is only supported in automatic parallel scene, not supported
in manual parallel.
async_save (bool): Whether asynchronous execution saves the checkpoint to a file. Default: False
Raises:
ValueError: If the input_param is None or 0.
Examples:
>>> config = CheckpointConfig()
>>> ckpoint_cb = ModelCheckpoint(prefix="ck_prefix", directory='./', config=config)
>>> model.train(10, dataset, callbacks=ckpoint_cb)
"""
def __init__(self,
save_checkpoint_steps=1,
save_checkpoint_seconds=0,
keep_checkpoint_max=5,
keep_checkpoint_per_n_minutes=0,
integrated_save=True,
async_save=False):
if save_checkpoint_steps is not None:
save_checkpoint_steps = check_int_non_negative(save_checkpoint_steps)
if save_checkpoint_seconds is not None:
save_checkpoint_seconds = check_int_non_negative(save_checkpoint_seconds)
if keep_checkpoint_max is not None:
keep_checkpoint_max = check_int_non_negative(keep_checkpoint_max)
if keep_checkpoint_per_n_minutes is not None:
keep_checkpoint_per_n_minutes = check_int_non_negative(keep_checkpoint_per_n_minutes)
if not save_checkpoint_steps and not save_checkpoint_seconds and \
not keep_checkpoint_max and not keep_checkpoint_per_n_minutes:
raise ValueError("The input_param can't be all None or 0")
self._save_checkpoint_steps = save_checkpoint_steps
self._save_checkpoint_seconds = save_checkpoint_seconds
if self._save_checkpoint_steps and self._save_checkpoint_steps > 0:
self._save_checkpoint_seconds = None
self._keep_checkpoint_max = keep_checkpoint_max
self._keep_checkpoint_per_n_minutes = keep_checkpoint_per_n_minutes
if self._keep_checkpoint_max and self._keep_checkpoint_max > 0:
self._keep_checkpoint_per_n_minutes = None
else:
if not self._keep_checkpoint_per_n_minutes or self._keep_checkpoint_per_n_minutes == 0:
self._keep_checkpoint_max = 1
self._integrated_save = check_bool(integrated_save)
self._async_save = check_bool(async_save)
@property
def save_checkpoint_steps(self):
"""Get the value of _save_checkpoint_steps."""
return self._save_checkpoint_steps
@property
def save_checkpoint_seconds(self):
"""Get the value of _save_checkpoint_seconds."""
return self._save_checkpoint_seconds
@property
def keep_checkpoint_max(self):
"""Get the value of _keep_checkpoint_max."""
return self._keep_checkpoint_max
@property
def keep_checkpoint_per_n_minutes(self):
"""Get the value of _keep_checkpoint_per_n_minutes."""
return self._keep_checkpoint_per_n_minutes
@property
def integrated_save(self):
"""Get the value of _integrated_save."""
return self._integrated_save
@property
def async_save(self):
"""Get the value of _async_save."""
return self._async_save
[docs] def get_checkpoint_policy(self):
"""Get the policy of checkpoint."""
checkpoint_policy = {'save_checkpoint_steps': self._save_checkpoint_steps,
'save_checkpoint_seconds': self._save_checkpoint_seconds,
'keep_checkpoint_max': self._keep_checkpoint_max,
'keep_checkpoint_per_n_minutes': self._keep_checkpoint_per_n_minutes}
return checkpoint_policy
[docs]class ModelCheckpoint(Callback):
"""
The checkpoint callback class.
It is called to combine with train process and save the model and network parameters after traning.
Args:
prefix (str): The prefix name of checkpoint files. Default: "CKP".
directory (str): The path of the folder which will be saved in the checkpoint file. Default: None.
config (CheckpointConfig): Checkpoint strategy configuration. Default: None.
Raises:
ValueError: If the prefix is invalid.
TypeError: If the config is not CheckpointConfig type.
"""
def __init__(self, prefix='CKP', directory=None, config=None):
super(ModelCheckpoint, self).__init__()
self._latest_ckpt_file_name = ""
self._init_time = time.time()
self._last_time = time.time()
self._last_time_for_keep = time.time()
self._last_triggered_step = 0
if _check_file_name_prefix(prefix):
self._prefix = prefix
else:
raise ValueError("Prefix {} for checkpoint file name invalid, "
"please check and correct it and then continue.".format(prefix))
if directory is not None:
self._directory = _make_directory(directory)
else:
self._directory = _cur_dir
if config is None:
self._config = CheckpointConfig()
else:
if not isinstance(config, CheckpointConfig):
raise TypeError("config should be CheckpointConfig type.")
self._config = config
# get existing checkpoint files
self._manager = CheckpointManager()
self._prefix = _chg_ckpt_file_name_if_same_exist(self._directory, self._prefix)
self._graph_saved = False
[docs] def step_end(self, run_context):
"""
Save the checkpoint at the end of step.
Args:
run_context (RunContext): Context of the train running.
"""
cb_params = run_context.original_args()
# save graph (only once)
if not self._graph_saved:
graph_file_name = os.path.join(self._directory, self._prefix + '-graph.meta')
_save_graph(cb_params.train_network, graph_file_name)
self._graph_saved = True
self._save_ckpt(cb_params)
[docs] def end(self, run_context):
"""
Save the last checkpoint after training finished.
Args:
run_context (RunContext): Context of the train running.
"""
cb_params = run_context.original_args()
_to_save_last_ckpt = True
self._save_ckpt(cb_params, _to_save_last_ckpt)
thread_list = threading.enumerate()
if len(thread_list) > 1:
for thread in thread_list:
if thread.getName() == "asyn_save_ckpt":
thread.join()
from mindspore.parallel._cell_wrapper import destroy_allgather_cell
destroy_allgather_cell()
def _check_save_ckpt(self, cb_params, force_to_save):
"""Check whether save checkpoint files or not."""
if self._config.save_checkpoint_steps and self._config.save_checkpoint_steps > 0:
if cb_params.cur_step_num >= self._last_triggered_step + self._config.save_checkpoint_steps \
or force_to_save is True:
return True
elif self._config.save_checkpoint_seconds and self._config.save_checkpoint_seconds > 0:
self._cur_time = time.time()
if (self._cur_time - self._last_time) > self._config.save_checkpoint_seconds or force_to_save is True:
self._last_time = self._cur_time
return True
return False
def _save_ckpt(self, cb_params, force_to_save=False):
"""Save checkpoint files."""
if cb_params.cur_step_num == self._last_triggered_step:
return
save_ckpt = self._check_save_ckpt(cb_params, force_to_save)
step_num_in_epoch = (cb_params.cur_step_num - 1) % cb_params.batch_num + 1
if save_ckpt:
cur_ckpoint_file = self._prefix + "-" + str(cb_params.cur_epoch_num) + "_" \
+ str(step_num_in_epoch) + ".ckpt"
if os.getenv("MS_ROLE") == "MS_PSERVER":
from mindspore.parallel._ps_utils import _get_ps_mode_rank
cur_ckpoint_file = "PServer_" + str(_get_ps_mode_rank()) + "_" + cur_ckpoint_file
# update checkpoint file list.
self._manager.update_ckpoint_filelist(self._directory, self._prefix)
# keep checkpoint files number equal max number.
if self._config.keep_checkpoint_max and 0 < self._config.keep_checkpoint_max <= self._manager.ckpoint_num:
self._manager.remove_oldest_ckpoint_file()
elif self._config.keep_checkpoint_per_n_minutes and self._config.keep_checkpoint_per_n_minutes > 0:
self._cur_time_for_keep = time.time()
if (self._cur_time_for_keep - self._last_time_for_keep) \
< self._config.keep_checkpoint_per_n_minutes * 60:
self._manager.keep_one_ckpoint_per_minutes(self._config.keep_checkpoint_per_n_minutes,
self._cur_time_for_keep)
# generate the new checkpoint file and rename it.
global _save_dir
_save_dir = self._directory
cur_file = os.path.join(self._directory, cur_ckpoint_file)
self._last_time_for_keep = time.time()
self._last_triggered_step = cb_params.cur_step_num
if context.get_context("enable_ge"):
set_cur_net(cb_params.train_network)
cb_params.train_network.exec_checkpoint_graph()
_exec_save_checkpoint(cb_params.train_network, cur_file, self._config.integrated_save,
self._config.async_save)
self._latest_ckpt_file_name = cur_file
@property
def latest_ckpt_file_name(self):
"""Return the latest checkpoint path and file name."""
return self._latest_ckpt_file_name
class CheckpointManager:
"""Manage checkpoint files according to train_config of checkpoint."""
def __init__(self):
self._ckpoint_filelist = []
@property
def ckpoint_filelist(self):
"""Get all the related checkpoint files managed here."""
return self._ckpoint_filelist
@property
def ckpoint_num(self):
"""Get the number of the related checkpoint files managed here."""
return len(self._ckpoint_filelist)
def update_ckpoint_filelist(self, directory, prefix):
"""Update the checkpoint file list."""
self._ckpoint_filelist = []
files = os.listdir(directory)
for filename in files:
if os.path.splitext(filename)[-1] == ".ckpt" and filename.startswith(prefix):
mid_name = filename[len(prefix):-5]
flag = True
for char in mid_name:
if char.isalpha():
flag = False
if flag:
self._ckpoint_filelist.append(directory + '/' + filename)
def remove_ckpoint_file(self, file_name):
"""Remove the specified checkpoint file from this checkpoint manager and also from the directory."""
try:
os.chmod(file_name, stat.S_IWRITE)
os.remove(file_name)
self._ckpoint_filelist.remove(file_name)
except OSError:
logger.warning("OSError, failed to remove the older ckpt file %s.", file_name)
except ValueError:
logger.warning("ValueError, failed to remove the older ckpt file %s.", file_name)
def remove_oldest_ckpoint_file(self):
"""Remove the oldest checkpoint file from this checkpoint manager and also from the directory."""
ckpoint_files = sorted(self._ckpoint_filelist, key=os.path.getmtime)
self.remove_ckpoint_file(ckpoint_files[0])
def keep_one_ckpoint_per_minutes(self, minutes, cur_time):
"""Only keep the latest one ckpt file per minutes, remove other files generated in [last_time, cur_time]."""
movs = []
oldest_file = ''
oldest_time = cur_time
for ck_file in self._ckpoint_filelist:
modify_time = os.path.getmtime(ck_file)
if cur_time - modify_time < 60 * minutes:
movs.append(ck_file)
if modify_time < oldest_time:
oldest_time = modify_time
oldest_file = ck_file
for mv_file in movs:
if mv_file == oldest_file:
continue
self.remove_ckpoint_file(mv_file)