# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Region-Based detector
"""
import time
import numpy as np
from mindspore import Model
from mindspore import Tensor
from mindarmour.detectors.detector import Detector
from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_numpy_param, check_param_type, \
check_pair_numpy_param, check_model, check_int_positive, \
check_value_positive, check_value_non_negative, check_param_in_range, \
check_equal_shape
LOGGER = LogUtil.get_instance()
TAG = 'RegionBasedDetector'
[docs]class RegionBasedDetector(Detector):
"""
This class implement a region-based detector.
Reference: `Mitigating evasion attacks to deep neural networks via
region-based classification <https://arxiv.org/abs/1709.05583>`_
Args:
model (Model): Target model.
number_points (int): The number of samples generate from the
hyper cube of original sample. Default: 10.
initial_radius (float): Initial radius of hyper cube. Default: 0.0.
max_radius (float): Maximum radius of hyper cube. Default: 1.0.
search_step (float): Incremental during search of radius. Default: 0.01.
degrade_limit (float): Acceptable decrease of classification accuracy.
Default: 0.0.
sparse (bool): If True, input labels are sparse-encoded. If False,
input labels are one-hot-encoded. Default: False.
Examples:
>>> detector = RegionBasedDetector(model)
>>> detector.fit(Tensor(ori), Tensor(labels))
>>> adv_ids = detector.detect(Tensor(adv))
"""
def __init__(self, model, number_points=10, initial_radius=0.0,
max_radius=1.0, search_step=0.01, degrade_limit=0.0,
sparse=False):
super(RegionBasedDetector, self).__init__()
self._model = check_model('targeted model', model, Model)
self._number_points = check_int_positive('number_points', number_points)
self._initial_radius = check_value_non_negative('initial_radius',
initial_radius)
self._max_radius = check_value_positive('max_radius', max_radius)
self._search_step = check_value_positive('search_step', search_step)
self._degrade_limit = check_value_non_negative('degrade_limit',
degrade_limit)
self._sparse = check_param_type('sparse', sparse, bool)
self._radius = None
[docs] def set_radius(self, radius):
"""Set radius."""
self._radius = check_param_in_range('radius', radius,
self._initial_radius,
self._max_radius)
[docs] def fit(self, inputs, labels=None):
"""
Train detector to decide the best radius.
Args:
inputs (numpy.ndarray): Benign samples.
labels (numpy.ndarray): Ground truth labels of the input samples.
Default:None.
Returns:
float, the best radius.
"""
inputs, labels = check_pair_numpy_param('inputs', inputs,
'labels', labels)
LOGGER.debug(TAG, 'enter fit() function.')
time_start = time.time()
search_iters = (self._max_radius
- self._initial_radius) / self._search_step
search_iters = np.round(search_iters).astype(int)
radius = self._initial_radius
pred = self._model.predict(Tensor(inputs))
raw_preds = np.argmax(pred.asnumpy(), axis=1)
if not self._sparse:
labels = np.argmax(labels, axis=1)
raw_preds, labels = check_equal_shape('raw_preds', raw_preds, 'labels',
labels)
raw_acc = np.sum(raw_preds == labels) / inputs.shape[0]
for _ in range(search_iters):
rc_preds = self._rc_forward(inputs, radius)
rc_preds, labels = check_equal_shape('rc_preds', rc_preds, 'labels',
labels)
def_acc = np.sum(rc_preds == labels) / inputs.shape[0]
if def_acc >= raw_acc - self._degrade_limit:
radius += self._search_step
continue
break
self._radius = radius - self._search_step
LOGGER.debug(TAG, 'best radius is: %s', self._radius)
LOGGER.debug(TAG,
'time used to train detector of %d samples is: %s seconds',
inputs.shape[0],
time.time() - time_start)
return self._radius
def _generate_hyper_cube(self, inputs, radius):
"""
Generate random samples in the hyper cubes around input samples.
Args:
inputs (numpy.ndarray): Input samples.
radius (float): The scope to generate hyper cubes around input samples.
Returns:
numpy.ndarray, randomly chosen samples in the hyper cubes.
"""
LOGGER.debug(TAG, 'enter _generate_hyper_cube().')
res = []
for _ in range(self._number_points):
res.append(np.clip((inputs + np.random.uniform(
-radius, radius, len(inputs))), 0.0, 1.0).astype(inputs.dtype))
return np.asarray(res)
def _rc_forward(self, inputs, radius):
"""
Generate region-based predictions for input samples.
Args:
inputs (numpy.ndarray): Input samples.
radius (float): The scope to generate hyper cubes around input samples.
Returns:
numpy.ndarray, classification result for input samples.
"""
LOGGER.debug(TAG, 'enter _rc_forward().')
res = []
for _, elem in enumerate(inputs):
hyper_cube_x = self._generate_hyper_cube(elem, radius)
hyper_cube_preds = []
for ite_hyper_cube_x in hyper_cube_x:
model_inputs = Tensor(np.expand_dims(ite_hyper_cube_x, axis=0))
ite_preds = self._model.predict(model_inputs).asnumpy()[0]
hyper_cube_preds.append(ite_preds)
pred_labels = np.argmax(hyper_cube_preds, axis=1)
bin_count = np.bincount(pred_labels)
# count the number of different class and choose the max one
# as final class
hyper_cube_tag = np.argmax(bin_count, axis=0)
res.append(hyper_cube_tag)
return np.asarray(res)
[docs] def detect(self, inputs):
"""
Tell whether input samples are adversarial or not.
Args:
inputs (numpy.ndarray): Suspicious samples to be judged.
Returns:
list[int], whether a sample is adversarial. if res[i]=1, then the
input sample with index i is adversarial.
"""
LOGGER.debug(TAG, 'enter detect().')
self._radius = check_param_type('radius', self._radius, float)
inputs = check_numpy_param('inputs', inputs)
time_start = time.time()
res = [1]*inputs.shape[0]
raw_preds = np.argmax(self._model.predict(Tensor(inputs)).asnumpy(),
axis=1)
rc_preds = self._rc_forward(inputs, self._radius)
for i in range(inputs.shape[0]):
if raw_preds[i] == rc_preds[i]:
res[i] = 0
LOGGER.debug(TAG,
'time used to detect %d samples is : %s seconds',
inputs.shape[0],
time.time() - time_start)
return res
[docs] def detect_diff(self, inputs):
"""
Return raw prediction results and region-based prediction results.
Args:
inputs (numpy.ndarray): Input samples.
Returns:
numpy.ndarray, raw prediction results and region-based prediction results of input samples.
"""
LOGGER.debug(TAG, 'enter detect_diff().')
inputs = check_numpy_param('inputs', inputs)
raw_preds = self._model.predict(Tensor(inputs))
rc_preds = self._rc_forward(inputs, self._radius)
return raw_preds.asnumpy(), rc_preds