# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Region-Based detector
"""
import time
import numpy as np
from mindspore import Model
from mindspore import Tensor
from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_numpy_param, check_param_type, \
check_pair_numpy_param, check_model, check_int_positive, \
check_value_positive, check_value_non_negative, check_param_in_range, \
check_equal_shape
from .detector import Detector
LOGGER = LogUtil.get_instance()
TAG = 'RegionBasedDetector'
[文档]class RegionBasedDetector(Detector):
"""
The region-based detector uses the fact that adversarial examples are close
to the classification boundary, and ensembles information around the given example
to predict whether it is an adversarial example or not.
Reference: `Mitigating evasion attacks to deep neural networks via
region-based classification <https://arxiv.org/abs/1709.05583>`_
Args:
model (Model): Target model.
number_points (int): The number of samples generate from the
hyper cube of original sample. Default: ``10``.
initial_radius (float): Initial radius of hyper cube. Default: ``0.0``.
max_radius (float): Maximum radius of hyper cube. Default: ``1.0``.
search_step (float): Incremental during search of radius. Default: ``0.01``.
degrade_limit (float): Acceptable decrease of classification accuracy.
Default: ``0.0``.
sparse (bool): If ``True``, input labels are sparse-encoded. If ``False``,
input labels are one-hot-encoded. Default: ``False``.
Examples:
>>> from mindspore.ops.operations import Add
>>> from mindspore import Model
>>> from mindarmour.adv_robustness.detectors import RegionBasedDetector
>>> class Net(nn.Cell):
... def __init__(self):
... super(Net, self).__init__()
... self.add = Add()
... def construct(self, inputs):
... return self.add(inputs, inputs)
>>> np.random.seed(5)
>>> ori = np.random.rand(4, 4).astype(np.float32)
>>> labels = np.array([[1, 0, 0, 0], [0, 0, 1, 0], [0, 0, 1, 0],
... [0, 1, 0, 0]]).astype(np.int32)
>>> np.random.seed(6)
>>> adv = np.random.rand(4, 4).astype(np.float32)
>>> model = Model(Net())
>>> detector = RegionBasedDetector(model)
>>> radius = detector.fit(ori, labels)
>>> detector.set_radius(radius)
>>> adv_ids = detector.detect(adv)
"""
def __init__(self, model, number_points=10, initial_radius=0.0,
max_radius=1.0, search_step=0.01, degrade_limit=0.0,
sparse=False):
super(RegionBasedDetector, self).__init__()
self._model = check_model('targeted model', model, Model)
self._number_points = check_int_positive('number_points', number_points)
self._initial_radius = check_value_non_negative('initial_radius',
initial_radius)
self._max_radius = check_value_positive('max_radius', max_radius)
self._search_step = check_value_positive('search_step', search_step)
self._degrade_limit = check_value_non_negative('degrade_limit',
degrade_limit)
self._sparse = check_param_type('sparse', sparse, bool)
self._radius = None
[文档] def set_radius(self, radius):
"""
Set radius.
Args:
radius (float): Radius of region.
"""
self._radius = check_param_in_range('radius', radius,
self._initial_radius,
self._max_radius)
[文档] def fit(self, inputs, labels=None):
"""
Train detector to decide the best radius.
Args:
inputs (numpy.ndarray): Benign samples.
labels (numpy.ndarray): Ground truth labels of the input samples.
Default:None.
Returns:
float, the best radius.
"""
inputs, labels = check_pair_numpy_param('inputs', inputs,
'labels', labels)
LOGGER.debug(TAG, 'enter fit() function.')
time_start = time.time()
search_iters = (self._max_radius
- self._initial_radius) / self._search_step
search_iters = np.round(search_iters).astype(int)
radius = self._initial_radius
pred = self._model.predict(Tensor(inputs))
raw_preds = np.argmax(pred.asnumpy(), axis=1)
if not self._sparse:
labels = np.argmax(labels, axis=1)
raw_preds, labels = check_equal_shape('raw_preds', raw_preds, 'labels',
labels)
raw_acc = np.sum(raw_preds == labels) / inputs.shape[0]
for _ in range(search_iters):
rc_preds = self._rc_forward(inputs, radius)
rc_preds, labels = check_equal_shape('rc_preds', rc_preds, 'labels',
labels)
def_acc = np.sum(rc_preds == labels) / inputs.shape[0]
if def_acc >= raw_acc - self._degrade_limit:
radius += self._search_step
continue
break
self._radius = radius - self._search_step
LOGGER.debug(TAG, 'best radius is: %s', self._radius)
LOGGER.debug(TAG,
'time used to train detector of %d samples is: %s seconds',
inputs.shape[0],
time.time() - time_start)
return self._radius
def _generate_hyper_cube(self, inputs, radius):
"""
Generate random samples in the hyper cubes around input samples.
Args:
inputs (numpy.ndarray): Input samples.
radius (float): The scope to generate hyper cubes around input samples.
Returns:
numpy.ndarray, randomly chosen samples in the hyper cubes.
"""
LOGGER.debug(TAG, 'enter _generate_hyper_cube().')
res = []
for _ in range(self._number_points):
res.append(np.clip((inputs + np.random.uniform(
-radius, radius, len(inputs))), 0.0, 1.0).astype(inputs.dtype))
return np.asarray(res)
def _rc_forward(self, inputs, radius):
"""
Generate region-based predictions for input samples.
Args:
inputs (numpy.ndarray): Input samples.
radius (float): The scope to generate hyper cubes around input samples.
Returns:
numpy.ndarray, classification result for input samples.
"""
LOGGER.debug(TAG, 'enter _rc_forward().')
res = []
for _, elem in enumerate(inputs):
hyper_cube_x = self._generate_hyper_cube(elem, radius)
hyper_cube_preds = []
for ite_hyper_cube_x in hyper_cube_x:
model_inputs = Tensor(np.expand_dims(ite_hyper_cube_x, axis=0))
ite_preds = self._model.predict(model_inputs).asnumpy()[0]
hyper_cube_preds.append(ite_preds)
pred_labels = np.argmax(hyper_cube_preds, axis=1)
bin_count = np.bincount(pred_labels)
# count the number of different class and choose the max one
# as final class
hyper_cube_tag = np.argmax(bin_count, axis=0)
res.append(hyper_cube_tag)
return np.asarray(res)
[文档] def detect(self, inputs):
"""
Tell whether input samples are adversarial or not.
Args:
inputs (numpy.ndarray): Suspicious samples to be judged.
Returns:
list[int], whether a sample is adversarial. if res[i]=1, then the
input sample with index i is adversarial.
"""
LOGGER.debug(TAG, 'enter detect().')
self._radius = check_param_type('radius', self._radius, float)
inputs = check_numpy_param('inputs', inputs)
time_start = time.time()
res = [1]*inputs.shape[0]
raw_preds = np.argmax(self._model.predict(Tensor(inputs)).asnumpy(),
axis=1)
rc_preds = self._rc_forward(inputs, self._radius)
for i in range(inputs.shape[0]):
if raw_preds[i] == rc_preds[i]:
res[i] = 0
LOGGER.debug(TAG,
'time used to detect %d samples is : %s seconds',
inputs.shape[0],
time.time() - time_start)
return res
[文档] def detect_diff(self, inputs):
"""
Return raw prediction results and region-based prediction results.
Args:
inputs (numpy.ndarray): Input samples.
Returns:
numpy.ndarray, raw prediction results and region-based prediction results of input samples.
"""
LOGGER.debug(TAG, 'enter detect_diff().')
inputs = check_numpy_param('inputs', inputs)
raw_preds = self._model.predict(Tensor(inputs))
rc_preds = self._rc_forward(inputs, self._radius)
return raw_preds.asnumpy(), rc_preds