Source code for mindarmour.privacy.diff_privacy.monitor.monitor

# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Monitor module of differential privacy training. """
import numpy as np
from scipy import special

from mindspore.train.callback import Callback

from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_int_positive, \
    check_value_positive, check_param_in_range, check_param_type

LOGGER = LogUtil.get_instance()
TAG = 'DP monitor'


[docs]class PrivacyMonitorFactory: """ Factory class of DP training's privacy monitor. For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/en/r1.9/\ protect_user_privacy_with_differential_privacy.html>`_. """ def __init__(self): pass
[docs] @staticmethod def create(policy, *args, **kwargs): """ Create a privacy monitor class. Args: policy (str): Monitor policy, 'rdp' and 'zcdp' are supported by now. If policy is 'rdp', the monitor will compute the privacy budget of DP training based on Renyi differential privacy theory; If policy is 'zcdp', the monitor will compute the privacy budget of DP training based on zero-concentrated differential privacy theory. It's worth noting that 'zcdp' is not suitable for subsampling noise mechanism. args (Union[int, float, numpy.ndarray, list, str]): Parameters used for creating a privacy monitor. kwargs (Union[int, float, numpy.ndarray, list, str]): Keyword parameters used for creating a privacy monitor. Returns: Callback, a privacy monitor. Examples: >>> from mindarmour.privacy.diff_privacy import PrivacyMonitorFactory >>> rdp = PrivacyMonitorFactory.create(policy='rdp', num_samples=60000, batch_size=32) """ if policy == 'rdp': return RDPMonitor(*args, **kwargs) if policy == 'zcdp': return ZCDPMonitor(*args, **kwargs) raise ValueError("The policy must be 'rdp' or 'zcdp', but got {}".format(policy))
[docs]class RDPMonitor(Callback): r""" Compute the privacy budget of DP training based on Renyi differential privacy (RDP) theory. According to the reference below, if a randomized mechanism is said to have ε'-Renyi differential privacy of order α, it also satisfies conventional differential privacy (ε, δ) as below: .. math:: (ε'+\frac{log(1/δ)}{α-1}, δ) For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/en/r1.9/\ protect_user_privacy_with_differential_privacy.html>`_. Reference: `Rényi Differential Privacy of the Sampled Gaussian Mechanism <https://arxiv.org/abs/1908.10530>`_ Args: num_samples (int): The total number of samples in training data sets. batch_size (int): The number of samples in a batch while training. initial_noise_multiplier (Union[float, int]): Ratio of the standard deviation of Gaussian noise divided by the norm_bound, which will be used to calculate privacy spent. Default: 1.5. max_eps (Union[float, int, None]): The maximum acceptable epsilon budget for DP training, which is used for estimating the max training epochs. 'None' means there is no limit to epsilon budget. Default: 10.0. target_delta (Union[float, int, None]): Target delta budget for DP training. If target_delta is set to be δ, then the privacy budget δ would be fixed during the whole training process. Default: 1e-3. max_delta (Union[float, int, None]): The maximum acceptable delta budget for DP training, which is used for estimating the max training epochs. Max_delta must be less than 1 and suggested to be less than 1e-3, otherwise overflow would be encountered. 'None' means there is no limit to delta budget. Default: None. target_eps (Union[float, int, None]): Target epsilon budget for DP training. If target_eps is set to be ε, then the privacy budget ε would be fixed during the whole training process. Default: None. orders (Union[None, list[int, float]]): Finite orders used for computing rdp, which must be greater than 1. The computation result of privacy budget would be different for various orders. In order to obtain a tighter (smaller) privacy budget estimation, a list of orders could be tried. Default: None. noise_decay_mode (Union[None, str]): Decay mode of adding noise while training, which can be None, 'Time', 'Step' or 'Exp'. Default: 'Time'. noise_decay_rate (float): Decay rate of noise while training. Default: 6e-4. per_print_times (int): The interval steps of computing and printing the privacy budget. Default: 50. dataset_sink_mode (bool): If True, all training data would be passed to device(Ascend) one-time. If False, training data would be passed to device after each step training. Default: False. Examples: >>> from mindarmour.privacy.diff_privacy import PrivacyMonitorFactory >>> rdp = PrivacyMonitorFactory.create(policy='rdp', num_samples=100, batch_size=32) """ def __init__(self, num_samples, batch_size, initial_noise_multiplier=1.5, max_eps=10.0, target_delta=1e-3, max_delta=None, target_eps=None, orders=None, noise_decay_mode='Time', noise_decay_rate=6e-4, per_print_times=50, dataset_sink_mode=False): super(RDPMonitor, self).__init__() check_int_positive('num_samples', num_samples) check_int_positive('batch_size', batch_size) if batch_size >= num_samples: msg = 'Batch_size must be less than num_samples.' LOGGER.error(TAG, msg) raise ValueError(msg) check_value_positive('initial_noise_multiplier', initial_noise_multiplier) if max_eps is not None: check_value_positive('max_eps', max_eps) if target_delta is not None: check_value_positive('target_delta', target_delta) if max_delta is not None: check_value_positive('max_delta', max_delta) if max_delta >= 1: msg = 'max_delta must be less than 1.' LOGGER.error(TAG, msg) raise ValueError(msg) if target_eps is not None: check_value_positive('target_eps', target_eps) if orders is not None: for item in orders: check_value_positive('order', item) if item <= 1: msg = 'orders must be greater than 1' LOGGER.error(TAG, msg) raise ValueError(msg) if noise_decay_mode is not None: if noise_decay_mode not in ('Step', 'Time', 'Exp'): msg = "Noise decay mode must be in ('Step', 'Time', 'Exp')" LOGGER.error(TAG, msg) raise ValueError(msg) noise_decay_rate = check_param_type('noise_decay_rate', noise_decay_rate, float) check_param_in_range('noise_decay_rate', noise_decay_rate, 0.0, 1.0) check_int_positive('per_print_times', per_print_times) check_param_type('dataset_sink_mode', dataset_sink_mode, bool) self._num_samples = num_samples self._batch_size = batch_size self._initial_noise_multiplier = initial_noise_multiplier self._max_eps = max_eps self._target_delta = target_delta self._max_delta = max_delta self._target_eps = target_eps self._orders = orders self._noise_decay_mode = noise_decay_mode self._noise_decay_rate = noise_decay_rate self._rdp = 0 self._per_print_times = per_print_times if self._target_eps is None and self._target_delta is None: msg = 'target eps and target delta cannot both be None' LOGGER.error(TAG, msg) raise ValueError(msg) if self._target_eps is not None and self._target_delta is not None: msg = 'One of target eps and target delta must be None' LOGGER.error(TAG, msg) raise ValueError(msg) if dataset_sink_mode: self._per_print_times = int(self._num_samples / self._batch_size)
[docs] def max_epoch_suggest(self): """ Estimate the maximum training epochs to satisfy the predefined privacy budget. Returns: int, the recommended maximum training epochs. """ if self._target_delta is not None and self._max_eps is None: msg = 'max_eps should be consistent with target_delta, but got None.' LOGGER.error(TAG, msg) raise ValueError(msg) if self._target_eps is not None and self._max_delta is None: msg = 'max_delta should be consistent with target_eps, but got None.' LOGGER.error(TAG, msg) raise ValueError(msg) epoch = 1 while epoch < 10000: steps = self._num_samples // self._batch_size eps, delta = self._compute_privacy_steps( list(np.arange((epoch - 1)*steps, epoch*steps + 1))) if self._max_eps is not None: if eps <= self._max_eps: epoch += 1 else: break if self._max_delta is not None: if delta <= self._max_delta: epoch += 1 else: break # reset the rdp for model training self._rdp = 0 return epoch
[docs] def step_end(self, run_context): """ Compute privacy budget after each training step. Args: run_context (RunContext): Include some information of the model. """ cb_params = run_context.original_args() cur_step = cb_params.cur_step_num cur_step_in_epoch = (cb_params.cur_step_num - 1) % \ cb_params.batch_num + 1 if cb_params.cur_step_num % self._per_print_times == 0: steps = np.arange(cur_step - self._per_print_times, cur_step + 1) eps, delta = self._compute_privacy_steps(list(steps)) if np.isnan(eps) or np.isinf(eps): msg = 'epoch: {} step: {}, invalid eps, terminating ' \ 'training.'.format( cb_params.cur_epoch_num, cur_step_in_epoch) LOGGER.error(TAG, msg) raise ValueError(msg) if np.isnan(delta) or np.isinf(delta): msg = 'epoch: {} step: {}, invalid delta, terminating ' \ 'training.'.format( cb_params.cur_epoch_num, cur_step_in_epoch) LOGGER.error(TAG, msg) raise ValueError(msg) print("epoch: %s step: %s, delta is %s, eps is %s" % ( cb_params.cur_epoch_num, cur_step_in_epoch, delta, eps))
def _compute_privacy_steps(self, steps): """ Compute privacy budget corresponding to steps. Args: steps (list): Training steps. Returns: float, privacy budget. """ if self._orders is None: self._orders = ( [1.005, 1.01, 1.02, 1.08, 1.2, 2, 5, 10, 20, 40, 80]) sampling_rate = self._batch_size / self._num_samples noise_stddev_step = self._initial_noise_multiplier if self._noise_decay_mode is None: self._rdp += self._compute_rdp(sampling_rate, noise_stddev_step)*len( steps) else: if self._noise_decay_mode == 'Time': noise_stddev_step = [self._initial_noise_multiplier / ( 1 + self._noise_decay_rate*step) for step in steps] elif self._noise_decay_mode == 'Step': noise_stddev_step = [self._initial_noise_multiplier*( 1 - self._noise_decay_rate)**step for step in steps] elif self._noise_decay_mode == 'Exp': noise_stddev_step = [self._initial_noise_multiplier*np.exp( -step*self._noise_decay_rate) for step in steps] self._rdp += sum( [self._compute_rdp(sampling_rate, noise) for noise in noise_stddev_step]) eps, delta = self._compute_privacy_budget(self._rdp) return eps, delta def _compute_rdp(self, sample_rate, noise_stddev): """ Compute rdp according to sampling rate, added noise and Renyi divergence orders. Args: sample_rate (float): Sampling rate of each batch of samples. noise_stddev (float): Noise multiplier. Returns: float or numpy.ndarray, rdp values. """ rdp = np.array( [_compute_rdp_with_order(sample_rate, noise_stddev, order) for order in self._orders]) return rdp def _compute_privacy_budget(self, rdp): """ Compute delta or eps for given rdp. Args: rdp (Union[float, numpy.ndarray]): Renyi differential privacy. Returns: float, delta budget or eps budget. """ if self._target_eps is not None: delta = self._compute_delta(rdp) return self._target_eps, delta eps = self._compute_eps(rdp) return eps, self._target_delta def _compute_delta(self, rdp): """ Compute delta for given rdp and eps. Args: rdp (Union[float, numpy.ndarray]): Renyi differential privacy. Returns: float, delta budget. """ orders = np.atleast_1d(self._orders) rdps = np.atleast_1d(rdp) deltas = np.exp((rdps - self._target_eps)*(orders - 1)) min_delta = np.min(deltas) return np.min([min_delta, 1.]) def _compute_eps(self, rdp): """ Compute eps for given rdp and delta. Args: rdp (Union[float, numpy.ndarray]): Renyi differential privacy. Returns: float, eps budget. """ orders = np.atleast_1d(self._orders) rdps = np.atleast_1d(rdp) eps = rdps - np.log(self._target_delta) / (orders - 1) return np.min(eps)
[docs]class ZCDPMonitor(Callback): r""" Compute the privacy budget of DP training based on zero-concentrated differential privacy theory (zcdp). According to the reference below, if a randomized mechanism is said to have ρ-zCDP, it also satisfies conventional differential privacy (ε, δ) as below: .. math:: (ρ+2\sqrt{ρ*log(1/δ)}, δ) It should be noted that ZCDPMonitor is not suitable for subsampling noise mechanisms(such as NoiseAdaGaussianRandom and NoiseGaussianRandom). The matching noise mechanism of ZCDP will be developed in the future. For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/en/r1.9/\ protect_user_privacy_with_differential_privacy.html>`_. Reference: `Concentrated Differentially Private Gradient Descent with Adaptive per-Iteration Privacy Budget <https://arxiv.org/abs/1808.09501>`_ Args: num_samples (int): The total number of samples in training data sets. batch_size (int): The number of samples in a batch while training. initial_noise_multiplier (Union[float, int]): Ratio of the standard deviation of Gaussian noise divided by the norm_bound, which will be used to calculate privacy spent. Default: 1.5. max_eps (Union[float, int]): The maximum acceptable epsilon budget for DP training, which is used for estimating the max training epochs. Default: 10.0. target_delta (Union[float, int]): Target delta budget for DP training. If target_delta is set to be δ, then the privacy budget δ would be fixed during the whole training process. Default: 1e-3. noise_decay_mode (Union[None, str]): Decay mode of adding noise while training, which can be None, 'Time', 'Step' or 'Exp'. Default: 'Time'. noise_decay_rate (float): Decay rate of noise while training. Default: 6e-4. per_print_times (int): The interval steps of computing and printing the privacy budget. Default: 50. dataset_sink_mode (bool): If True, all training data would be passed to device(Ascend) one-time. If False, training data would be passed to device after each step training. Default: False. Examples: >>> from mindarmour.privacy.diff_privacy import PrivacyMonitorFactory >>> zcdp = PrivacyMonitorFactory.create(policy='zcdp', ... num_samples=100, ... batch_size=32, ... initial_noise_multiplier=1.5) """ def __init__(self, num_samples, batch_size, initial_noise_multiplier=1.5, max_eps=10.0, target_delta=1e-3, noise_decay_mode='Time', noise_decay_rate=6e-4, per_print_times=50, dataset_sink_mode=False): super(ZCDPMonitor, self).__init__() check_int_positive('num_samples', num_samples) check_int_positive('batch_size', batch_size) if batch_size >= num_samples: msg = 'Batch_size must be less than num_samples.' LOGGER.error(TAG, msg) raise ValueError(msg) check_value_positive('initial_noise_multiplier', initial_noise_multiplier) if noise_decay_mode is not None: if noise_decay_mode not in ('Step', 'Time', 'Exp'): msg = "Noise decay mode must be in ('Step', 'Time', 'Exp'), but got {}.".\ format(noise_decay_mode) LOGGER.error(TAG, msg) raise ValueError(msg) noise_decay_rate = check_param_type('noise_decay_rate', noise_decay_rate, float) check_param_in_range('noise_decay_rate', noise_decay_rate, 0.0, 1.0) check_int_positive('per_print_times', per_print_times) check_param_type('dataset_sink_mode', dataset_sink_mode, bool) self._num_samples = num_samples self._batch_size = batch_size self._initial_noise_multiplier = initial_noise_multiplier self._max_eps = check_value_positive('max_eps', max_eps) self._target_delta = check_param_in_range('target_delta', target_delta, 0.0, 1.0) self._noise_decay_mode = noise_decay_mode self._noise_decay_rate = noise_decay_rate # initialize zcdp self._zcdp = 0 self._per_print_times = per_print_times if dataset_sink_mode: self._per_print_times = int(self._num_samples / self._batch_size)
[docs] def max_epoch_suggest(self): """ Estimate the maximum training epochs to satisfy the predefined privacy budget. Returns: int, the recommended maximum training epochs. """ epoch = 1 while epoch < 10000: steps = self._num_samples // self._batch_size eps, _ = self._compute_privacy_steps( list(np.arange((epoch - 1)*steps, epoch*steps + 1))) if eps <= self._max_eps: epoch += 1 else: break # initialize the zcdp for model training self._zcdp = 0 return epoch
[docs] def step_end(self, run_context): """ Compute privacy budget after each training step. Args: run_context (RunContext): Include some information of the model. """ cb_params = run_context.original_args() cur_step = cb_params.cur_step_num cur_step_in_epoch = (cb_params.cur_step_num - 1) % \ cb_params.batch_num + 1 if cb_params.cur_step_num % self._per_print_times == 0: steps = np.arange(cur_step - self._per_print_times, cur_step + 1) eps, delta = self._compute_privacy_steps(list(steps)) if np.isnan(eps) or np.isinf(eps) or np.isnan(delta) or np.isinf( delta): msg = 'epoch: {} step: {}, invalid eps, terminating ' \ 'training.'.format( cb_params.cur_epoch_num, cur_step_in_epoch) LOGGER.error(TAG, msg) raise ValueError(msg) print("epoch: %s step: %s, delta is %s, eps is %s" % ( cb_params.cur_epoch_num, cur_step_in_epoch, delta, eps))
def _compute_privacy_steps(self, steps): """ Compute privacy budget corresponding to steps. Args: steps (list): Training steps. Returns: float, privacy budget. """ noise_stddev_step = self._initial_noise_multiplier if self._noise_decay_mode is None: self._zcdp += self._compute_zcdp(noise_stddev_step)*len( steps) else: if self._noise_decay_mode == 'Time': noise_stddev_step = [self._initial_noise_multiplier / ( 1 + self._noise_decay_rate*step) for step in steps] elif self._noise_decay_mode == 'Step': noise_stddev_step = [self._initial_noise_multiplier*( 1 - self._noise_decay_rate)**step for step in steps] elif self._noise_decay_mode == 'Exp': noise_stddev_step = [self._initial_noise_multiplier*np.exp( -step*self._noise_decay_rate) for step in steps] self._zcdp += sum( [self._compute_zcdp(noise) for noise in noise_stddev_step]) eps = self._compute_eps(self._zcdp) return eps, self._target_delta def _compute_zcdp(self, noise_stddev): """ Compute zcdp according to added noise. Args: noise_stddev (float): Noise multiplier. Returns: float or numpy.ndarray, zcdp values. """ zcdp = 1 / (2*noise_stddev**2) return zcdp def _compute_eps(self, zcdp): """ Compute eps for given zcdp and delta. Args: zcdp (Union[float, numpy.ndarray]): zero-concentrated differential privacy. Returns: float, eps budget. """ eps = zcdp + 2*np.sqrt(zcdp*np.log(1 / self._target_delta)) return eps
def _compute_rdp_with_order(sample_rate, noise_stddev, order): """ Compute rdp for each order. Args: sample_rate (float): Sampling probability. noise_stddev (float): Noise multiplier. order: The order used for computing rdp. Returns: float, rdp value. """ if float(order).is_integer(): log_integrate = -np.inf for k in range(order + 1): term_k = (np.log( special.binom(order, k)) + k*np.log(sample_rate) + ( order - k)*np.log( 1 - sample_rate)) + (k*k - k) / (2*(noise_stddev**2)) log_integrate = _log_add(log_integrate, term_k) return float(log_integrate) / (order - 1) log_part_0, log_part_1 = -np.inf, -np.inf k = 0 z0 = noise_stddev**2*np.log(1 / sample_rate - 1) + 1 / 2 while True: bi_coef = special.binom(order, k) log_coef = np.log(abs(bi_coef)) j = order - k term_k_part_0 = log_coef + k*np.log(sample_rate) + j*np.log(1 - sample_rate) + ( k*k - k) / (2*(noise_stddev**2)) + special.log_ndtr( (z0 - k) / noise_stddev) term_k_part_1 = log_coef + j*np.log(sample_rate) + k*np.log(1 - sample_rate) + ( j*j - j) / (2*(noise_stddev**2)) + special.log_ndtr( (j - z0) / noise_stddev) if bi_coef > 0: log_part_0 = _log_add(log_part_0, term_k_part_0) log_part_1 = _log_add(log_part_1, term_k_part_1) else: log_part_0 = _log_subtract(log_part_0, term_k_part_0) log_part_1 = _log_subtract(log_part_1, term_k_part_1) k += 1 if np.max([term_k_part_0, term_k_part_1]) < -30: break return _log_add(log_part_0, log_part_1) / (order - 1) def _log_add(x, y): """ Add x and y in log space. """ if x == -np.inf: return y if y == -np.inf: return x return np.max([x, y]) + np.log1p(np.exp(-abs(x - y))) def _log_subtract(x, y): """ Subtract y from x in log space, x must be greater than y. """ if x <= y: msg = 'The antilog of log functions must be positive' LOGGER.error(TAG, msg) raise ValueError(msg) if y == -np.inf: return x return np.log1p(np.exp(y - x)) + x