# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Monitor module of differential privacy training. """
import numpy as np
from scipy import special
from mindspore.train.callback import Callback
from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_int_positive, \
check_value_positive, check_param_in_range, check_param_type
LOGGER = LogUtil.get_instance()
TAG = 'DP monitor'
[docs]class PrivacyMonitorFactory:
"""
Factory class of DP training's privacy monitor.
For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/en/r1.9/\
protect_user_privacy_with_differential_privacy.html>`_.
"""
def __init__(self):
pass
[docs] @staticmethod
def create(policy, *args, **kwargs):
"""
Create a privacy monitor class.
Args:
policy (str): Monitor policy, 'rdp' and 'zcdp' are supported
by now. If policy is 'rdp', the monitor will compute the
privacy budget of DP training based on Renyi differential
privacy theory; If policy is 'zcdp', the monitor will compute
the privacy budget of DP training based on zero-concentrated
differential privacy theory. It's worth noting that 'zcdp'
is not suitable for subsampling noise mechanism.
args (Union[int, float, numpy.ndarray, list, str]): Parameters
used for creating a privacy monitor.
kwargs (Union[int, float, numpy.ndarray, list, str]): Keyword
parameters used for creating a privacy monitor.
Returns:
Callback, a privacy monitor.
Examples:
>>> from mindarmour.privacy.diff_privacy import PrivacyMonitorFactory
>>> rdp = PrivacyMonitorFactory.create(policy='rdp', num_samples=60000, batch_size=32)
"""
if policy == 'rdp':
return RDPMonitor(*args, **kwargs)
if policy == 'zcdp':
return ZCDPMonitor(*args, **kwargs)
raise ValueError("The policy must be 'rdp' or 'zcdp', but got {}".format(policy))
[docs]class RDPMonitor(Callback):
r"""
Compute the privacy budget of DP training based on Renyi differential
privacy (RDP) theory. According to the reference below, if a randomized
mechanism is said to have ε'-Renyi differential privacy of order α, it
also satisfies conventional differential privacy (ε, δ) as below:
.. math::
(ε'+\frac{log(1/δ)}{α-1}, δ)
For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/en/r1.9/\
protect_user_privacy_with_differential_privacy.html>`_.
Reference: `Rényi Differential Privacy of the Sampled Gaussian Mechanism
<https://arxiv.org/abs/1908.10530>`_
Args:
num_samples (int): The total number of samples in training data sets.
batch_size (int): The number of samples in a batch while training.
initial_noise_multiplier (Union[float, int]): Ratio of the standard
deviation of Gaussian noise divided by the norm_bound, which will
be used to calculate privacy spent. Default: 1.5.
max_eps (Union[float, int, None]): The maximum acceptable epsilon
budget for DP training, which is used for estimating the max
training epochs. 'None' means there is no limit to epsilon budget.
Default: 10.0.
target_delta (Union[float, int, None]): Target delta budget for DP
training. If target_delta is set to be δ, then the privacy budget
δ would be fixed during the whole training process. Default: 1e-3.
max_delta (Union[float, int, None]): The maximum acceptable delta
budget for DP training, which is used for estimating the max
training epochs. Max_delta must be less than 1 and suggested
to be less than 1e-3, otherwise overflow would be encountered.
'None' means there is no limit to delta budget. Default: None.
target_eps (Union[float, int, None]): Target epsilon budget for DP
training. If target_eps is set to be ε, then the privacy budget
ε would be fixed during the whole training process. Default: None.
orders (Union[None, list[int, float]]): Finite orders used for
computing rdp, which must be greater than 1. The computation result
of privacy budget would be different for various orders. In order
to obtain a tighter (smaller) privacy budget estimation, a list
of orders could be tried. Default: None.
noise_decay_mode (Union[None, str]): Decay mode of adding noise while
training, which can be None, 'Time', 'Step' or 'Exp'. Default: 'Time'.
noise_decay_rate (float): Decay rate of noise while training. Default: 6e-4.
per_print_times (int): The interval steps of computing and printing
the privacy budget. Default: 50.
dataset_sink_mode (bool): If True, all training data would be passed
to device(Ascend) one-time. If False, training data would be passed
to device after each step training. Default: False.
Examples:
>>> from mindarmour.privacy.diff_privacy import PrivacyMonitorFactory
>>> rdp = PrivacyMonitorFactory.create(policy='rdp', num_samples=100, batch_size=32)
"""
def __init__(self, num_samples, batch_size, initial_noise_multiplier=1.5,
max_eps=10.0, target_delta=1e-3, max_delta=None,
target_eps=None, orders=None, noise_decay_mode='Time',
noise_decay_rate=6e-4, per_print_times=50, dataset_sink_mode=False):
super(RDPMonitor, self).__init__()
check_int_positive('num_samples', num_samples)
check_int_positive('batch_size', batch_size)
if batch_size >= num_samples:
msg = 'Batch_size must be less than num_samples.'
LOGGER.error(TAG, msg)
raise ValueError(msg)
check_value_positive('initial_noise_multiplier',
initial_noise_multiplier)
if max_eps is not None:
check_value_positive('max_eps', max_eps)
if target_delta is not None:
check_value_positive('target_delta', target_delta)
if max_delta is not None:
check_value_positive('max_delta', max_delta)
if max_delta >= 1:
msg = 'max_delta must be less than 1.'
LOGGER.error(TAG, msg)
raise ValueError(msg)
if target_eps is not None:
check_value_positive('target_eps', target_eps)
if orders is not None:
for item in orders:
check_value_positive('order', item)
if item <= 1:
msg = 'orders must be greater than 1'
LOGGER.error(TAG, msg)
raise ValueError(msg)
if noise_decay_mode is not None:
if noise_decay_mode not in ('Step', 'Time', 'Exp'):
msg = "Noise decay mode must be in ('Step', 'Time', 'Exp')"
LOGGER.error(TAG, msg)
raise ValueError(msg)
noise_decay_rate = check_param_type('noise_decay_rate', noise_decay_rate, float)
check_param_in_range('noise_decay_rate', noise_decay_rate, 0.0, 1.0)
check_int_positive('per_print_times', per_print_times)
check_param_type('dataset_sink_mode', dataset_sink_mode, bool)
self._num_samples = num_samples
self._batch_size = batch_size
self._initial_noise_multiplier = initial_noise_multiplier
self._max_eps = max_eps
self._target_delta = target_delta
self._max_delta = max_delta
self._target_eps = target_eps
self._orders = orders
self._noise_decay_mode = noise_decay_mode
self._noise_decay_rate = noise_decay_rate
self._rdp = 0
self._per_print_times = per_print_times
if self._target_eps is None and self._target_delta is None:
msg = 'target eps and target delta cannot both be None'
LOGGER.error(TAG, msg)
raise ValueError(msg)
if self._target_eps is not None and self._target_delta is not None:
msg = 'One of target eps and target delta must be None'
LOGGER.error(TAG, msg)
raise ValueError(msg)
if dataset_sink_mode:
self._per_print_times = int(self._num_samples / self._batch_size)
[docs] def max_epoch_suggest(self):
"""
Estimate the maximum training epochs to satisfy the predefined
privacy budget.
Returns:
int, the recommended maximum training epochs.
"""
if self._target_delta is not None and self._max_eps is None:
msg = 'max_eps should be consistent with target_delta, but got None.'
LOGGER.error(TAG, msg)
raise ValueError(msg)
if self._target_eps is not None and self._max_delta is None:
msg = 'max_delta should be consistent with target_eps, but got None.'
LOGGER.error(TAG, msg)
raise ValueError(msg)
epoch = 1
while epoch < 10000:
steps = self._num_samples // self._batch_size
eps, delta = self._compute_privacy_steps(
list(np.arange((epoch - 1)*steps, epoch*steps + 1)))
if self._max_eps is not None:
if eps <= self._max_eps:
epoch += 1
else:
break
if self._max_delta is not None:
if delta <= self._max_delta:
epoch += 1
else:
break
# reset the rdp for model training
self._rdp = 0
return epoch
[docs] def step_end(self, run_context):
"""
Compute privacy budget after each training step.
Args:
run_context (RunContext): Include some information of the model.
"""
cb_params = run_context.original_args()
cur_step = cb_params.cur_step_num
cur_step_in_epoch = (cb_params.cur_step_num - 1) % \
cb_params.batch_num + 1
if cb_params.cur_step_num % self._per_print_times == 0:
steps = np.arange(cur_step - self._per_print_times, cur_step + 1)
eps, delta = self._compute_privacy_steps(list(steps))
if np.isnan(eps) or np.isinf(eps):
msg = 'epoch: {} step: {}, invalid eps, terminating ' \
'training.'.format(
cb_params.cur_epoch_num, cur_step_in_epoch)
LOGGER.error(TAG, msg)
raise ValueError(msg)
if np.isnan(delta) or np.isinf(delta):
msg = 'epoch: {} step: {}, invalid delta, terminating ' \
'training.'.format(
cb_params.cur_epoch_num, cur_step_in_epoch)
LOGGER.error(TAG, msg)
raise ValueError(msg)
print("epoch: %s step: %s, delta is %s, eps is %s" % (
cb_params.cur_epoch_num, cur_step_in_epoch, delta, eps))
def _compute_privacy_steps(self, steps):
"""
Compute privacy budget corresponding to steps.
Args:
steps (list): Training steps.
Returns:
float, privacy budget.
"""
if self._orders is None:
self._orders = (
[1.005, 1.01, 1.02, 1.08, 1.2, 2, 5, 10, 20, 40, 80])
sampling_rate = self._batch_size / self._num_samples
noise_stddev_step = self._initial_noise_multiplier
if self._noise_decay_mode is None:
self._rdp += self._compute_rdp(sampling_rate, noise_stddev_step)*len(
steps)
else:
if self._noise_decay_mode == 'Time':
noise_stddev_step = [self._initial_noise_multiplier / (
1 + self._noise_decay_rate*step) for step in steps]
elif self._noise_decay_mode == 'Step':
noise_stddev_step = [self._initial_noise_multiplier*(
1 - self._noise_decay_rate)**step for step in steps]
elif self._noise_decay_mode == 'Exp':
noise_stddev_step = [self._initial_noise_multiplier*np.exp(
-step*self._noise_decay_rate) for step in steps]
self._rdp += sum(
[self._compute_rdp(sampling_rate, noise) for noise in
noise_stddev_step])
eps, delta = self._compute_privacy_budget(self._rdp)
return eps, delta
def _compute_rdp(self, sample_rate, noise_stddev):
"""
Compute rdp according to sampling rate, added noise and Renyi
divergence orders.
Args:
sample_rate (float): Sampling rate of each batch of samples.
noise_stddev (float): Noise multiplier.
Returns:
float or numpy.ndarray, rdp values.
"""
rdp = np.array(
[_compute_rdp_with_order(sample_rate, noise_stddev, order) for order in self._orders])
return rdp
def _compute_privacy_budget(self, rdp):
"""
Compute delta or eps for given rdp.
Args:
rdp (Union[float, numpy.ndarray]): Renyi differential privacy.
Returns:
float, delta budget or eps budget.
"""
if self._target_eps is not None:
delta = self._compute_delta(rdp)
return self._target_eps, delta
eps = self._compute_eps(rdp)
return eps, self._target_delta
def _compute_delta(self, rdp):
"""
Compute delta for given rdp and eps.
Args:
rdp (Union[float, numpy.ndarray]): Renyi differential privacy.
Returns:
float, delta budget.
"""
orders = np.atleast_1d(self._orders)
rdps = np.atleast_1d(rdp)
deltas = np.exp((rdps - self._target_eps)*(orders - 1))
min_delta = np.min(deltas)
return np.min([min_delta, 1.])
def _compute_eps(self, rdp):
"""
Compute eps for given rdp and delta.
Args:
rdp (Union[float, numpy.ndarray]): Renyi differential privacy.
Returns:
float, eps budget.
"""
orders = np.atleast_1d(self._orders)
rdps = np.atleast_1d(rdp)
eps = rdps - np.log(self._target_delta) / (orders - 1)
return np.min(eps)
[docs]class ZCDPMonitor(Callback):
r"""
Compute the privacy budget of DP training based on zero-concentrated
differential privacy theory (zcdp). According to the reference below,
if a randomized mechanism is said to have ρ-zCDP, it also satisfies
conventional differential privacy (ε, δ) as below:
.. math::
(ρ+2\sqrt{ρ*log(1/δ)}, δ)
It should be noted that ZCDPMonitor is not suitable for subsampling
noise mechanisms(such as NoiseAdaGaussianRandom and NoiseGaussianRandom).
The matching noise mechanism of ZCDP will be developed in the future.
For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/en/r1.9/\
protect_user_privacy_with_differential_privacy.html>`_.
Reference: `Concentrated Differentially Private Gradient Descent with
Adaptive per-Iteration Privacy Budget <https://arxiv.org/abs/1808.09501>`_
Args:
num_samples (int): The total number of samples in training data sets.
batch_size (int): The number of samples in a batch while training.
initial_noise_multiplier (Union[float, int]): Ratio of the standard
deviation of Gaussian noise divided by the norm_bound, which will
be used to calculate privacy spent. Default: 1.5.
max_eps (Union[float, int]): The maximum acceptable epsilon budget for
DP training, which is used for estimating the max training epochs.
Default: 10.0.
target_delta (Union[float, int]): Target delta budget for DP training.
If target_delta is set to be δ, then the privacy budget δ would be
fixed during the whole training process. Default: 1e-3.
noise_decay_mode (Union[None, str]): Decay mode of adding noise while
training, which can be None, 'Time', 'Step' or 'Exp'. Default: 'Time'.
noise_decay_rate (float): Decay rate of noise while training. Default: 6e-4.
per_print_times (int): The interval steps of computing and printing
the privacy budget. Default: 50.
dataset_sink_mode (bool): If True, all training data would be passed
to device(Ascend) one-time. If False, training data would be passed
to device after each step training. Default: False.
Examples:
>>> from mindarmour.privacy.diff_privacy import PrivacyMonitorFactory
>>> zcdp = PrivacyMonitorFactory.create(policy='zcdp',
... num_samples=100,
... batch_size=32,
... initial_noise_multiplier=1.5)
"""
def __init__(self, num_samples, batch_size, initial_noise_multiplier=1.5,
max_eps=10.0, target_delta=1e-3, noise_decay_mode='Time',
noise_decay_rate=6e-4, per_print_times=50, dataset_sink_mode=False):
super(ZCDPMonitor, self).__init__()
check_int_positive('num_samples', num_samples)
check_int_positive('batch_size', batch_size)
if batch_size >= num_samples:
msg = 'Batch_size must be less than num_samples.'
LOGGER.error(TAG, msg)
raise ValueError(msg)
check_value_positive('initial_noise_multiplier',
initial_noise_multiplier)
if noise_decay_mode is not None:
if noise_decay_mode not in ('Step', 'Time', 'Exp'):
msg = "Noise decay mode must be in ('Step', 'Time', 'Exp'), but got {}.".\
format(noise_decay_mode)
LOGGER.error(TAG, msg)
raise ValueError(msg)
noise_decay_rate = check_param_type('noise_decay_rate', noise_decay_rate, float)
check_param_in_range('noise_decay_rate', noise_decay_rate, 0.0, 1.0)
check_int_positive('per_print_times', per_print_times)
check_param_type('dataset_sink_mode', dataset_sink_mode, bool)
self._num_samples = num_samples
self._batch_size = batch_size
self._initial_noise_multiplier = initial_noise_multiplier
self._max_eps = check_value_positive('max_eps', max_eps)
self._target_delta = check_param_in_range('target_delta', target_delta, 0.0, 1.0)
self._noise_decay_mode = noise_decay_mode
self._noise_decay_rate = noise_decay_rate
# initialize zcdp
self._zcdp = 0
self._per_print_times = per_print_times
if dataset_sink_mode:
self._per_print_times = int(self._num_samples / self._batch_size)
[docs] def max_epoch_suggest(self):
"""
Estimate the maximum training epochs to satisfy the predefined
privacy budget.
Returns:
int, the recommended maximum training epochs.
"""
epoch = 1
while epoch < 10000:
steps = self._num_samples // self._batch_size
eps, _ = self._compute_privacy_steps(
list(np.arange((epoch - 1)*steps, epoch*steps + 1)))
if eps <= self._max_eps:
epoch += 1
else:
break
# initialize the zcdp for model training
self._zcdp = 0
return epoch
[docs] def step_end(self, run_context):
"""
Compute privacy budget after each training step.
Args:
run_context (RunContext): Include some information of the model.
"""
cb_params = run_context.original_args()
cur_step = cb_params.cur_step_num
cur_step_in_epoch = (cb_params.cur_step_num - 1) % \
cb_params.batch_num + 1
if cb_params.cur_step_num % self._per_print_times == 0:
steps = np.arange(cur_step - self._per_print_times, cur_step + 1)
eps, delta = self._compute_privacy_steps(list(steps))
if np.isnan(eps) or np.isinf(eps) or np.isnan(delta) or np.isinf(
delta):
msg = 'epoch: {} step: {}, invalid eps, terminating ' \
'training.'.format(
cb_params.cur_epoch_num, cur_step_in_epoch)
LOGGER.error(TAG, msg)
raise ValueError(msg)
print("epoch: %s step: %s, delta is %s, eps is %s" % (
cb_params.cur_epoch_num, cur_step_in_epoch, delta, eps))
def _compute_privacy_steps(self, steps):
"""
Compute privacy budget corresponding to steps.
Args:
steps (list): Training steps.
Returns:
float, privacy budget.
"""
noise_stddev_step = self._initial_noise_multiplier
if self._noise_decay_mode is None:
self._zcdp += self._compute_zcdp(noise_stddev_step)*len(
steps)
else:
if self._noise_decay_mode == 'Time':
noise_stddev_step = [self._initial_noise_multiplier / (
1 + self._noise_decay_rate*step) for step in steps]
elif self._noise_decay_mode == 'Step':
noise_stddev_step = [self._initial_noise_multiplier*(
1 - self._noise_decay_rate)**step for step in steps]
elif self._noise_decay_mode == 'Exp':
noise_stddev_step = [self._initial_noise_multiplier*np.exp(
-step*self._noise_decay_rate) for step in steps]
self._zcdp += sum(
[self._compute_zcdp(noise) for noise in noise_stddev_step])
eps = self._compute_eps(self._zcdp)
return eps, self._target_delta
def _compute_zcdp(self, noise_stddev):
"""
Compute zcdp according to added noise.
Args:
noise_stddev (float): Noise multiplier.
Returns:
float or numpy.ndarray, zcdp values.
"""
zcdp = 1 / (2*noise_stddev**2)
return zcdp
def _compute_eps(self, zcdp):
"""
Compute eps for given zcdp and delta.
Args:
zcdp (Union[float, numpy.ndarray]): zero-concentrated
differential privacy.
Returns:
float, eps budget.
"""
eps = zcdp + 2*np.sqrt(zcdp*np.log(1 / self._target_delta))
return eps
def _compute_rdp_with_order(sample_rate, noise_stddev, order):
"""
Compute rdp for each order.
Args:
sample_rate (float): Sampling probability.
noise_stddev (float): Noise multiplier.
order: The order used for computing rdp.
Returns:
float, rdp value.
"""
if float(order).is_integer():
log_integrate = -np.inf
for k in range(order + 1):
term_k = (np.log(
special.binom(order, k)) + k*np.log(sample_rate) + (
order - k)*np.log(
1 - sample_rate)) + (k*k - k) / (2*(noise_stddev**2))
log_integrate = _log_add(log_integrate, term_k)
return float(log_integrate) / (order - 1)
log_part_0, log_part_1 = -np.inf, -np.inf
k = 0
z0 = noise_stddev**2*np.log(1 / sample_rate - 1) + 1 / 2
while True:
bi_coef = special.binom(order, k)
log_coef = np.log(abs(bi_coef))
j = order - k
term_k_part_0 = log_coef + k*np.log(sample_rate) + j*np.log(1 - sample_rate) + (
k*k - k) / (2*(noise_stddev**2)) + special.log_ndtr(
(z0 - k) / noise_stddev)
term_k_part_1 = log_coef + j*np.log(sample_rate) + k*np.log(1 - sample_rate) + (
j*j - j) / (2*(noise_stddev**2)) + special.log_ndtr(
(j - z0) / noise_stddev)
if bi_coef > 0:
log_part_0 = _log_add(log_part_0, term_k_part_0)
log_part_1 = _log_add(log_part_1, term_k_part_1)
else:
log_part_0 = _log_subtract(log_part_0, term_k_part_0)
log_part_1 = _log_subtract(log_part_1, term_k_part_1)
k += 1
if np.max([term_k_part_0, term_k_part_1]) < -30:
break
return _log_add(log_part_0, log_part_1) / (order - 1)
def _log_add(x, y):
"""
Add x and y in log space.
"""
if x == -np.inf:
return y
if y == -np.inf:
return x
return np.max([x, y]) + np.log1p(np.exp(-abs(x - y)))
def _log_subtract(x, y):
"""
Subtract y from x in log space, x must be greater than y.
"""
if x <= y:
msg = 'The antilog of log functions must be positive'
LOGGER.error(TAG, msg)
raise ValueError(msg)
if y == -np.inf:
return x
return np.log1p(np.exp(y - x)) + x