mindspore.boost.grad_freeze 源代码

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""grad freeze"""
from __future__ import absolute_import
from __future__ import division

import numpy as np
from mindspore.nn.cell import Cell
from mindspore.nn.optim import Optimizer
from mindspore.common import Tensor
from mindspore.common import dtype as mstype
from mindspore.nn.optim import LARS
from mindspore.nn.wrap.grad_reducer import DistributedGradReducer
from mindspore.ops import functional as F

from mindspore.boost.base import ParameterProcess
from mindspore.boost.grad_accumulation import GradientAccumulation

__all__ = ['GradientFreeze', 'FreezeOpt', 'freeze_cell']


CONTINUOUS_STRATEGY = 0
INTERVAL_STRATEGY = 1


[文档]class FreezeOpt(Cell): """ Optimizer that supports gradients freezing training. Args: opt (Cell): non-freezing optimizer instance, such as 'Momentum', 'SGD'. train_parameter_groups (Union[tuple, list]): Groups of parameters for gradients freezing training. train_strategy (Union[tuple(int), list(int), Tensor]): Strategy for gradients freezing training. Supported Platforms: ``Ascend`` """ def __init__(self, opt, train_parameter_groups=None, train_strategy=None): super(FreezeOpt, self).__init__() if not isinstance(opt, Optimizer): raise TypeError( f"The first arg 'opt' must be an Optimizer instance, but got {type(opt)}") if train_strategy is not None and train_parameter_groups is None: raise ValueError("When the 'train_strategy' is specified, the value of 'train_parameter_groups' " "must also be specified") if isinstance(opt, LARS): self.is_lars = True self.opt_class = type(opt.opt) self.opt_init_args = opt.opt.init_args self.lars_init_args = opt.init_args self.single_opt = opt.opt self.parameters = opt.opt.parameters self.learning_rate = opt.opt.init_learning_rate self.dynamic_lr = opt.opt.dynamic_lr else: self.is_lars = False self.opt_class = type(opt) self.opt_init_args = opt.init_args self.single_opt = opt self.parameters = opt.parameters self.learning_rate = opt.init_learning_rate self.dynamic_lr = opt.dynamic_lr self.opts = [] if train_parameter_groups is None: self.groups_num = 1 step = 1 parameters = opt.parameters train_parameter_groups = (tuple(parameters[(i * step):]) for i in range(self.groups_num)) else: if not isinstance(train_parameter_groups, (tuple, list)): raise TypeError( "The specified 'train_parameter_groups' should be tuple or list") self.groups_num = len(train_parameter_groups) self._init_train_strategy(train_strategy) self._create_new_group_learning_rate() self.opt_index = 0 for params in train_parameter_groups: if not isinstance(params, (tuple, list)): raise TypeError("The each element of 'train_parameter_groups' should be tuple or list " "to store the Parameter") # generate one-to-one opt corresponding to the parameter group self.opts.append(self._generate_new_optimizer(params)) self.opt_index += 1 def _init_train_strategy(self, train_strategy): """Init train strategy for gradient freeze.""" if isinstance(train_strategy, (tuple, list)): for ele in train_strategy: if not isinstance(ele, int): raise ValueError( "The element in train_strategy should be int number") self.train_strategy = Tensor(train_strategy, mstype.int32) elif isinstance(train_strategy, Tensor): if train_strategy.ndim != 1 or train_strategy.dtype != mstype.int32: raise ValueError("When train_strategy is a Tensor, the dimension should be 1 and " "the dtype should be int32") self.train_strategy = train_strategy elif train_strategy is None: self.train_strategy = None else: raise TypeError( "The specified 'train_strategy' should be None, tuple, list or Tensor") def _create_new_group_learning_rate(self): """Create new learning rate for different global step.""" self.dynamic_learning_rate = [[] for _ in range(self.groups_num)] if self.learning_rate is None: self.learning_rate = self.single_opt.learning_rate return if self.dynamic_lr and isinstance(self.learning_rate, list) and isinstance(self.train_strategy, Tensor): train_strategy = list(self.train_strategy.asnumpy()) if len(self.learning_rate) <= len(train_strategy): for i, lr in enumerate(self.learning_rate): self.dynamic_learning_rate[train_strategy[i]].append(lr) def _generate_new_optimizer(self, params): """Generate new optimizer.""" if self.dynamic_learning_rate[self.opt_index]: lr = self.dynamic_learning_rate[self.opt_index] else: lr = self.learning_rate if not self.is_lars: opt = self.opt_class(params=params, learning_rate=lr, **self.opt_init_args) opt._update_local_parameters_name("boost_{}".format(self.opt_index)) # pylint: disable=W0212 else: opt = LARS(self.opt_class(params=params, learning_rate=lr, **self.opt_init_args), **self.lars_init_args) opt.opt._update_local_parameters_name("boost_{}".format(self.opt_index)) # pylint: disable=W0212 opt._update_local_parameters_name("boost_{}".format(self.opt_index)) # pylint: disable=W0212 return opt
class _TrainFreezeCell(Cell): r""" Gradient freezing training network. Args: net (Cell): The training network. sens (numbers.Number): The scaling number to be filled as the input of backpropagation. Default value is 1.0. grad (tuple(Tensor)): The gradients of network parameters and inputs. grad_reducer (Cell): Constructs a gradient reducer Cell, which applies communication and average operations on single-process gradient values. use_grad_accumulation (bool): Whether use grad accumulation. optimizer (Union[Cell]): Optimizer for updating the weights. max_accumulation_step (numbers.Number): Max grad accumulation steps. Default: 1.0 Supported Platforms: ``Ascend`` """ def __init__(self, net, sens, grad, grad_reducer, use_grad_accumulation, optimizer, max_accumulation_step=1): super(_TrainFreezeCell, self).__init__(auto_prefix=False) self.net = net self.grad = grad self.grad_reducer = grad_reducer self.opt = optimizer self.parameters = optimizer.parameters self.sens = sens self.use_grad_accumulation = use_grad_accumulation self.max_accumulation_step = max_accumulation_step if use_grad_accumulation: self.grad_accumulation = GradientAccumulation( self.max_accumulation_step, self.optimizer) def construct(self, *inputs): loss = self.net(*inputs) sens = F.fill(loss.dtype, loss.shape, self.sens) grads = self.grad(self.net, self.parameters)(*inputs, sens) grads = self.grad_reducer(grads) if self.use_grad_accumulation: loss = self.grad_accumulation(loss, grads) else: loss = F.depend(loss, self.opt(grads)) return loss
[文档]class GradientFreeze: r""" Gradients freezing algorithm, freezing the gradients of some layers randomly, to improve network training performance. The number and probability of frozen layers can be configured by users. Args: param_groups (Union[tuple, list]): Groups of parameters for gradients freezing training. freeze_type (int): Strategy of gradients freezing training. freeze_p (float): probability of gradients freezing training. total_steps (int): Steps of the whole training. Examples: >>> gradient_freeze_class = boost.GradientFreeze(10, 1, 0.5, 2000) >>> network, optimizer = gradient_freeze_class.freeze_generate(network, optimizer) """ def __init__(self, param_groups, freeze_type, freeze_p, total_steps): self._param_groups = param_groups self._freeze_type = freeze_type self._freeze_p = freeze_p self._total_steps = total_steps self.grad_reducer = F.identity
[文档] def split_parameters_groups(self, net, freeze_para_groups_number): r""" Split parameter groups for gradients freezing training. Args: net (Cell): The training network. freeze_para_groups_number (int): The number of gradient freeze groups. """ grouped_params = [] tmp = [] for para in net.trainable_params(): name = para.name # ensure 'bn' after 'conv' is not split if 'bn' in name or 'bias' in name: tmp.append(para) elif len(tmp) >= 3: grouped_params.append(tmp) tmp = [para] else: tmp.append(para) if tmp: grouped_params.append(tmp) stride = len(grouped_params) // freeze_para_groups_number freeze_grouped_params = [sum(grouped_params[i * stride:], []) for i in range(freeze_para_groups_number)] return freeze_grouped_params
[文档] def generate_freeze_index_sequence(self, parameter_groups_number, freeze_strategy, freeze_p, total_steps): r""" Generate index sequence for gradient freezing training. Args: parameter_groups_number (int): The number of parameter groups. freeze_strategy (int): Gradient freeze grouping strategy, select from [0, 1]. freeze_p (float): Gradient freezing probability. total_steps (int): Total training steps. """ total_step = int(total_steps * 1.01) if parameter_groups_number <= 1: return [0 for _ in range(total_step)] # local continuous freezing training strategy, as '00001234' if freeze_strategy == CONTINUOUS_STRATEGY: zero_cnt = int( freeze_p * (parameter_groups_number - 1) // (1 - freeze_p) + 0.5) sub_idx = [0] * zero_cnt + list(range(1, parameter_groups_number)) freeze_idxes = [] while len(freeze_idxes) < total_step: freeze_idxes += sub_idx return freeze_idxes # interval freezing training strategy, as '01020304' if freeze_strategy == INTERVAL_STRATEGY: index_all = list(range(1, parameter_groups_number)) prob = [x / sum(index_all) for x in index_all] freeze_idxes = [0] zero_cnt = 1 freeze_cnt = 0 while len(freeze_idxes) < total_step: freeze_p_cur = 1.0 * freeze_cnt / (zero_cnt + freeze_cnt) if freeze_p_cur < 1 - freeze_p: freeze_idxes.append( int(np.random.choice(index_all[::-1], p=prob))) freeze_cnt += 1 else: freeze_idxes.append(0) zero_cnt += 1 return freeze_idxes raise ValueError( f"Unsupported freezing training strategy '{freeze_strategy}'")
[文档] def freeze_generate(self, network, optimizer): r""" Generate freeze network and optimizer. Args: network (Cell): The training network. optimizer (Cell): Optimizer for updating the weights. """ train_para_groups = self.split_parameters_groups( network, self._param_groups) for i in range(self._param_groups): train_para_groups[i] = ParameterProcess.generate_group_params(train_para_groups[i], \ optimizer.init_params['params']) train_strategy = self.generate_freeze_index_sequence( self._param_groups, self._freeze_type, self._freeze_p, self._total_steps) optimizer = FreezeOpt(optimizer, train_para_groups, train_strategy) return network, optimizer
[文档]def freeze_cell(reducer_flag, network, optimizer, sens, grad, use_grad_accumulation, mean=None, degree=None, max_accumulation_step=1): r""" Generate freeze network and optimizer. Args: reducer_flag (bool): Reducer flag. network (Cell): The training network. optimizer (Cell): Optimizer for updating the weights. sens (numbers.Number): The scaling number. grad (tuple(Tensor)): Tuple of gradient tensors. use_grad_accumulation (bool): Use gradient accumulation flag. mean (bool): Gradients mean flag. default: None. degree (int): Device number. default: None. max_accumulation_step (int): Max accumulation steps. default: 1. Examples: >>> import numpy as np >>> from mindspore import Tensor, Parameter, nn >>> import mindspore.ops as ops >>> from mindspore.boost.grad_freeze import freeze_cell, FreezeOpt >>> >>> class Net(nn.Cell): ... def __init__(self, in_features, out_features): ... super(Net, self).__init__() ... self.weight = Parameter(Tensor(np.ones([in_features, out_features]).astype(np.float32)), ... name='weight') ... self.matmul = ops.MatMul() ... ... def construct(self, x): ... output = self.matmul(x, self.weight) ... return output ... >>> in_features, out_features = 16, 10 >>> network = Net(in_features, out_features) >>> optimizer = nn.Momentum(network.trainable_params(), learning_rate=0.1, momentum=0.9) >>> optimizer = FreezeOpt(optimizer) >>> grad = ops.GradOperation(get_by_list=True, sens_param=True) >>> freeze_nets = freeze_cell(False, network, optimizer, 1.0, grad, False, None, None, 1) """ if reducer_flag: param_processer = ParameterProcess() grad_reducers = (DistributedGradReducer(param_processer.assign_parameter_group(opt.parameters), mean, degree) for opt in optimizer.opts) freeze_nets = tuple(_TrainFreezeCell(network, sens, grad, reducer, use_grad_accumulation, opt, max_accumulation_step) for reducer, opt in zip(grad_reducers, optimizer.opts)) else: freeze_nets = tuple(_TrainFreezeCell(network, sens, grad, F.identity, use_grad_accumulation, opt, max_accumulation_step) for opt in optimizer.opts) return freeze_nets