Source code for mindarmour.adv_robustness.detectors.region_based_detector

# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Region-Based detector
"""
import time

import numpy as np

from mindspore import Model
from mindspore import Tensor

from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_numpy_param, check_param_type, \
    check_pair_numpy_param, check_model, check_int_positive, \
    check_value_positive, check_value_non_negative, check_param_in_range, \
    check_equal_shape
from .detector import Detector

LOGGER = LogUtil.get_instance()
TAG = 'RegionBasedDetector'


[docs]class RegionBasedDetector(Detector): """ The region-based detector uses the fact that adversarial examples are close to the classification boundary, and ensembles information around the given example to predict whether it is an adversarial example or not. Reference: `Mitigating evasion attacks to deep neural networks via region-based classification <https://arxiv.org/abs/1709.05583>`_ Args: model (Model): Target model. number_points (int): The number of samples generate from the hyper cube of original sample. Default: 10. initial_radius (float): Initial radius of hyper cube. Default: 0.0. max_radius (float): Maximum radius of hyper cube. Default: 1.0. search_step (float): Incremental during search of radius. Default: 0.01. degrade_limit (float): Acceptable decrease of classification accuracy. Default: 0.0. sparse (bool): If True, input labels are sparse-encoded. If False, input labels are one-hot-encoded. Default: False. Examples: >>> from mindspore.ops.operations import Add >>> from mindspore.nn import Cell >>> from mindspore import Model >>> from mindarmour.adv_robustness.detectors import RegionBasedDetector >>> class Net(Cell): ... def __init__(self): ... super(Net, self).__init__() ... self.add = Add() ... def construct(self, inputs): ... return self.add(inputs, inputs) >>> np.random.seed(5) >>> ori = np.random.rand(4, 4).astype(np.float32) >>> labels = np.array([[1, 0, 0, 0], [0, 0, 1, 0], [0, 0, 1, 0], ... [0, 1, 0, 0]]).astype(np.int32) >>> np.random.seed(6) >>> adv = np.random.rand(4, 4).astype(np.float32) >>> model = Model(Net()) >>> detector = RegionBasedDetector(model) >>> radius = detector.fit(ori, labels) >>> detector.set_radius(radius) >>> adv_ids = detector.detect(adv) """ def __init__(self, model, number_points=10, initial_radius=0.0, max_radius=1.0, search_step=0.01, degrade_limit=0.0, sparse=False): super(RegionBasedDetector, self).__init__() self._model = check_model('targeted model', model, Model) self._number_points = check_int_positive('number_points', number_points) self._initial_radius = check_value_non_negative('initial_radius', initial_radius) self._max_radius = check_value_positive('max_radius', max_radius) self._search_step = check_value_positive('search_step', search_step) self._degrade_limit = check_value_non_negative('degrade_limit', degrade_limit) self._sparse = check_param_type('sparse', sparse, bool) self._radius = None
[docs] def set_radius(self, radius): """ Set radius. Args: radius (float): Radius of region. """ self._radius = check_param_in_range('radius', radius, self._initial_radius, self._max_radius)
[docs] def fit(self, inputs, labels=None): """ Train detector to decide the best radius. Args: inputs (numpy.ndarray): Benign samples. labels (numpy.ndarray): Ground truth labels of the input samples. Default:None. Returns: float, the best radius. """ inputs, labels = check_pair_numpy_param('inputs', inputs, 'labels', labels) LOGGER.debug(TAG, 'enter fit() function.') time_start = time.time() search_iters = (self._max_radius - self._initial_radius) / self._search_step search_iters = np.round(search_iters).astype(int) radius = self._initial_radius pred = self._model.predict(Tensor(inputs)) raw_preds = np.argmax(pred.asnumpy(), axis=1) if not self._sparse: labels = np.argmax(labels, axis=1) raw_preds, labels = check_equal_shape('raw_preds', raw_preds, 'labels', labels) raw_acc = np.sum(raw_preds == labels) / inputs.shape[0] for _ in range(search_iters): rc_preds = self._rc_forward(inputs, radius) rc_preds, labels = check_equal_shape('rc_preds', rc_preds, 'labels', labels) def_acc = np.sum(rc_preds == labels) / inputs.shape[0] if def_acc >= raw_acc - self._degrade_limit: radius += self._search_step continue break self._radius = radius - self._search_step LOGGER.debug(TAG, 'best radius is: %s', self._radius) LOGGER.debug(TAG, 'time used to train detector of %d samples is: %s seconds', inputs.shape[0], time.time() - time_start) return self._radius
def _generate_hyper_cube(self, inputs, radius): """ Generate random samples in the hyper cubes around input samples. Args: inputs (numpy.ndarray): Input samples. radius (float): The scope to generate hyper cubes around input samples. Returns: numpy.ndarray, randomly chosen samples in the hyper cubes. """ LOGGER.debug(TAG, 'enter _generate_hyper_cube().') res = [] for _ in range(self._number_points): res.append(np.clip((inputs + np.random.uniform( -radius, radius, len(inputs))), 0.0, 1.0).astype(inputs.dtype)) return np.asarray(res) def _rc_forward(self, inputs, radius): """ Generate region-based predictions for input samples. Args: inputs (numpy.ndarray): Input samples. radius (float): The scope to generate hyper cubes around input samples. Returns: numpy.ndarray, classification result for input samples. """ LOGGER.debug(TAG, 'enter _rc_forward().') res = [] for _, elem in enumerate(inputs): hyper_cube_x = self._generate_hyper_cube(elem, radius) hyper_cube_preds = [] for ite_hyper_cube_x in hyper_cube_x: model_inputs = Tensor(np.expand_dims(ite_hyper_cube_x, axis=0)) ite_preds = self._model.predict(model_inputs).asnumpy()[0] hyper_cube_preds.append(ite_preds) pred_labels = np.argmax(hyper_cube_preds, axis=1) bin_count = np.bincount(pred_labels) # count the number of different class and choose the max one # as final class hyper_cube_tag = np.argmax(bin_count, axis=0) res.append(hyper_cube_tag) return np.asarray(res)
[docs] def detect(self, inputs): """ Tell whether input samples are adversarial or not. Args: inputs (numpy.ndarray): Suspicious samples to be judged. Returns: list[int], whether a sample is adversarial. if res[i]=1, then the input sample with index i is adversarial. """ LOGGER.debug(TAG, 'enter detect().') self._radius = check_param_type('radius', self._radius, float) inputs = check_numpy_param('inputs', inputs) time_start = time.time() res = [1]*inputs.shape[0] raw_preds = np.argmax(self._model.predict(Tensor(inputs)).asnumpy(), axis=1) rc_preds = self._rc_forward(inputs, self._radius) for i in range(inputs.shape[0]): if raw_preds[i] == rc_preds[i]: res[i] = 0 LOGGER.debug(TAG, 'time used to detect %d samples is : %s seconds', inputs.shape[0], time.time() - time_start) return res
[docs] def detect_diff(self, inputs): """ Return raw prediction results and region-based prediction results. Args: inputs (numpy.ndarray): Input samples. Returns: numpy.ndarray, raw prediction results and region-based prediction results of input samples. """ LOGGER.debug(TAG, 'enter detect_diff().') inputs = check_numpy_param('inputs', inputs) raw_preds = self._model.predict(Tensor(inputs)) rc_preds = self._rc_forward(inputs, self._radius) return raw_preds.asnumpy(), rc_preds
[docs] def transform(self, inputs): """ Generate hyper cube for input samples. Args: inputs (numpy.ndarray): Input samples. Returns: numpy.ndarray, hyper cube corresponds to every sample. """ LOGGER.debug(TAG, 'enter transform().') inputs = check_numpy_param('inputs', inputs) res = [] for _, elem in enumerate(inputs): res.append(self._generate_hyper_cube(elem, self._radius)) return np.asarray(res)