Source code for mindspore.boost.grad_freeze

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""grad freeze"""

import numpy as np

from mindspore.nn.cell import Cell
from mindspore.nn.optim import Optimizer
from mindspore.common import Tensor
from mindspore.common import dtype as mstype
from mindspore.nn.optim import LARS
from mindspore.nn.wrap.grad_reducer import DistributedGradReducer
from mindspore.ops import functional as F

from .base import ParameterProcess
from .grad_accumulation import GradientAccumulation

__all__ = ['GradientFreeze', 'FreezeOpt', 'freeze_cell']


CONTINUOUS_STRATEGY = 0
INTERVAL_STRATEGY = 1


[文档]class FreezeOpt(Cell): """ Optimizer that supports gradients freezing training. Args: opt (Cell): non-freezing optimizer instance, such as 'Momentum', 'SGD'. train_parameter_groups (Union[tuple, list]): Groups of parameters for gradients freezing training. train_strategy (Union[tuple(int), list(int), Tensor]): Strategy for gradients freezing training. Supported Platforms: ``Ascend`` """ def __init__(self, opt, train_parameter_groups=None, train_strategy=None): super(FreezeOpt, self).__init__() if not isinstance(opt, Optimizer): raise TypeError( f"The first arg 'opt' must be an Optimizer instance, but got {type(opt)}") if train_strategy is not None and train_parameter_groups is None: raise ValueError("When the 'train_strategy' is specified, the value of 'train_parameter_groups' " "must also be specified") if isinstance(opt, LARS): self.is_lars = True self.opt_class = type(opt.opt) self.opt_init_args = opt.opt.init_args self.lars_init_args = opt.init_args self.single_opt = opt.opt self.parameters = opt.opt.parameters self.learning_rate = opt.opt.init_learning_rate self.dynamic_lr = opt.opt.dynamic_lr else: self.is_lars = False self.opt_class = type(opt) self.opt_init_args = opt.init_args self.single_opt = opt self.parameters = opt.parameters self.learning_rate = opt.init_learning_rate self.dynamic_lr = opt.dynamic_lr self.opts = [] if train_parameter_groups is None: self.groups_num = 10 step = 6 parameters = opt.parameters train_parameter_groups = (tuple(parameters[(i * step):]) for i in range(self.groups_num)) else: if not isinstance(train_parameter_groups, (tuple, list)): raise TypeError( "The specified 'train_parameter_groups' should be tuple or list") self.groups_num = len(train_parameter_groups) self._init_train_strategy(train_strategy) self._create_new_group_learning_rate() self.opt_index = 0 for params in train_parameter_groups: if not isinstance(params, (tuple, list)): raise TypeError("The each element of 'train_parameter_groups' should be tuple or list " "to store the Parameter") # generate one-to-one opt corresponding to the parameter group self.opts.append(self._generate_new_optimizer(params)) self.opt_index += 1 def _init_train_strategy(self, train_strategy): """Init train strategy for gradient freeze.""" if isinstance(train_strategy, (tuple, list)): for ele in train_strategy: if not isinstance(ele, int): raise ValueError( "The element in train_strategy should be int number") self.train_strategy = Tensor(train_strategy, mstype.int32) elif isinstance(train_strategy, Tensor): if train_strategy.ndim != 1 or train_strategy.dtype != mstype.int32: raise ValueError("When train_strategy is a Tensor, the dimension should be 1 and " "the dtype should be int32") self.train_strategy = train_strategy elif train_strategy is None: self.train_strategy = None else: raise TypeError( "The specified 'train_strategy' should be None, tuple, list or Tensor") def _create_new_group_learning_rate(self): """Create new learning rate for different global step.""" self.dynamic_learning_rate = [[] for _ in range(self.groups_num)] if self.learning_rate is None: self.learning_rate = self.single_opt.learning_rate return if self.dynamic_lr and isinstance(self.learning_rate, list) and isinstance(self.train_strategy, Tensor): train_strategy = list(self.train_strategy.asnumpy()) if len(self.learning_rate) <= len(train_strategy): for i, lr in enumerate(self.learning_rate): self.dynamic_learning_rate[train_strategy[i]].append(lr) def _generate_new_optimizer(self, params): """Generate new optimizer.""" if self.dynamic_learning_rate[self.opt_index]: lr = self.dynamic_learning_rate[self.opt_index] else: lr = self.learning_rate if not self.is_lars: opt = self.opt_class(params=params, learning_rate=lr, **self.opt_init_args) opt._update_local_parameters_name("boost_{}".format(self.opt_index)) # pylint: disable=W0212 else: opt = LARS(self.opt_class(params=params, learning_rate=lr, **self.opt_init_args), **self.lars_init_args) opt.opt._update_local_parameters_name("boost_{}".format(self.opt_index)) # pylint: disable=W0212 opt._update_local_parameters_name("boost_{}".format(self.opt_index)) # pylint: disable=W0212 return opt
class _TrainFreezeCell(Cell): r""" Gradient freezing training network. Args: net (Cell): The training network. sens (numbers.Number): The scaling number to be filled as the input of backpropagation. Default value is 1.0. grad (tuple(Tensor)): The gradients of network parameters and inputs. grad_reducer (Cell): Constructs a gradient reducer Cell, which applies communication and average operations on single-process gradient values. use_grad_accumulation (bool): Whether use grad accumulation. optimizer (Union[Cell]): Optimizer for updating the weights. max_accumulation_step (numbers.Number): Max grad accumulation steps. Default: 1.0 Supported Platforms: ``Ascend`` """ def __init__(self, net, sens, grad, grad_reducer, use_grad_accumulation, optimizer, max_accumulation_step=1): super(_TrainFreezeCell, self).__init__(auto_prefix=False) self.net = net self.grad = grad self.grad_reducer = grad_reducer self.opt = optimizer self.parameters = optimizer.parameters self.sens = sens self.use_grad_accumulation = use_grad_accumulation self.max_accumulation_step = max_accumulation_step if use_grad_accumulation: self.grad_accumulation = GradientAccumulation( self.max_accumulation_step, self.optimizer) def construct(self, *inputs): loss = self.net(*inputs) sens = F.fill(loss.dtype, loss.shape, self.sens) grads = self.grad(self.net, self.parameters)(*inputs, sens) grads = self.grad_reducer(grads) if self.use_grad_accumulation: loss = self.grad_accumulation(loss, grads) else: loss = F.depend(loss, self.opt(grads)) return loss
[文档]class GradientFreeze: r""" Freezing the gradients of some layers randomly. The number and probability of frozen layers can be configured by users Args: param_groups (Union[tuple, list]): Groups of parameters for gradients freezing training. freeze_type (int): Strategy of gradients freezing training. freeze_p (float): probability of gradients freezing training. total_steps (numbers.Number): Steps of the whole training. Examples: >>> gradient_freeze_class = boost.GradientFreeze(10, 1, 0.5, 2000) >>> network, optimizer = gradient_freeze_class.freeze_generate(network, optimizer) """ def __init__(self, param_groups, freeze_type, freeze_p, total_steps): self._param_groups = param_groups self._freeze_type = freeze_type self._freeze_p = freeze_p self._total_steps = total_steps self.grad_reducer = F.identity self._param_processer = ParameterProcess()
[文档] def split_parameters_groups(self, net, freeze_para_groups_number): r""" Split parameter groups for gradients freezing training. Args: net (Cell): The training network. freeze_para_groups_number (int): The number of gradient freeze groups. """ grouped_params = [] tmp = [] for para in net.trainable_params(): name = para.name # ensure 'bn' after 'conv' is not split if 'bn' in name or 'bias' in name: tmp.append(para) elif len(tmp) >= 3: grouped_params.append(tmp) tmp = [para] else: tmp.append(para) if tmp: grouped_params.append(tmp) stride = len(grouped_params) // freeze_para_groups_number freeze_grouped_params = [sum(grouped_params[i * stride:], []) for i in range(freeze_para_groups_number)] return freeze_grouped_params
[文档] def generate_freeze_index_sequence(self, parameter_groups_number, freeze_strategy, freeze_p, total_steps): r""" Generate index sequence for gradient freezing training. Args: parameter_groups_number (int): The number of parameter groups. freeze_strategy (int): Gradient freeze grouping strategy, select from [0, 1]. freeze_p (float): Gradient freezing probability. total_steps (int): Total training steps. """ total_step = int(total_steps * 1.01) if parameter_groups_number <= 1: return [0 for _ in range(total_step)] # local continuous freezing training strategy, as '00001234' if freeze_strategy == CONTINUOUS_STRATEGY: zero_cnt = int( freeze_p * (parameter_groups_number - 1) / (1 - freeze_p) + 0.5) sub_idx = [0] * zero_cnt + list(range(1, parameter_groups_number)) freeze_idxes = [] while len(freeze_idxes) < total_step: freeze_idxes += sub_idx return freeze_idxes # interval freezing training strategy, as '01020304' if freeze_strategy == INTERVAL_STRATEGY: index_all = list(range(1, parameter_groups_number)) prob = [x / sum(index_all) for x in index_all] freeze_idxes = [0] zero_cnt = 1 freeze_cnt = 0 while len(freeze_idxes) < total_step: freeze_p_cur = 1.0 * freeze_cnt / (zero_cnt + freeze_cnt) if freeze_p_cur < 1 - freeze_p: freeze_idxes.append( int(np.random.choice(index_all[::-1], p=prob))) freeze_cnt += 1 else: freeze_idxes.append(0) zero_cnt += 1 return freeze_idxes raise ValueError( f"Unsupported freezing training strategy '{freeze_strategy}'")
[文档] def freeze_generate(self, network, optimizer): r""" Generate freeze network and optimizer. Args: network (Cell): The training network. optimizer (Cell): Optimizer for updating the weights. """ train_para_groups = self.split_parameters_groups( network, self._param_groups) for i in range(self._param_groups): train_para_groups[i] = self._param_processer.generate_group_params(train_para_groups[i], optimizer.init_params['params']) train_strategy = self.generate_freeze_index_sequence( self._param_groups, self._freeze_type, self._freeze_p, self._total_steps) optimizer = FreezeOpt(optimizer, train_para_groups, train_strategy) return network, optimizer
[文档]def freeze_cell(reducer_flag, network, optimizer, sens, grad, use_grad_accumulation, mean=None, degree=None, max_accumulation_step=1): r""" Generate freeze network and optimizer. Args: reducer_flag (bool): Reducer flag. network (Cell): The training network. optimizer (Cell): Optimizer for updating the weights. sens (numbers.Number): The scaling number. grad (tuple(Tensor)): Tuple of gradient tensors. use_grad_accumulation (bool): Use gradient accumulation flag. mean (bool): Gradients mean flag. default: None. degree (int): Device number. default: None. max_accumulation_step (int): Max accumulation steps. default: 1. Examples: >>> import numpy as np >>> from mindspore import Tensor, Parameter, nn >>> import mindspore.ops as ops >>> from mindspore.boost.grad_freeze import freeze_cell >>> >>> class Net(nn.Cell): ... def __init__(self, in_features, out_features): ... super(Net, self).__init__() ... self.weight = Parameter(Tensor(np.ones([in_features, out_features]).astype(np.float32)), ... name='weight') ... self.matmul = ops.MatMul() ... ... def construct(self, x): ... output = self.matmul(x, self.weight) ... return output ... >>> in_features, out_features = 16, 10 >>> network = Net(in_features, out_features) >>> optimizer = nn.Momentum(net.trainable_params(), learning_rate=0.1, momentum=0.9) >>> grad = ops.GradOperation(get_by_list=True, sens_param=True) >>> freeze_nets = freeze_cell(False, network, optimizer, 1.0, grad, False, None, None, 1) """ if reducer_flag: param_processer = ParameterProcess() grad_reducers = (DistributedGradReducer(param_processer.assign_parameter_group(opt.parameters), mean, degree) for opt in optimizer.opts) freeze_nets = tuple(_TrainFreezeCell(network, sens, grad, reducer, use_grad_accumulation, opt, max_accumulation_step) for reducer, opt in zip(grad_reducers, optimizer.opts)) else: freeze_nets = tuple(_TrainFreezeCell(network, sens, grad, F.identity, use_grad_accumulation, opt, max_accumulation_step) for opt in optimizer.opts) return freeze_nets