Source code for mindarmour.fuzz_testing.sensitivity_convergence_coverage

# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source code of SensitivityConvergenceCoverage class.
"""
import time
import numpy as np

from mindspore import Tensor
from mindspore.train.summary.summary_record import _get_summary_tensor_data
from mindarmour.fuzz_testing import CoverageMetrics
from mindarmour.utils._check_param import check_numpy_param
from mindarmour.utils.logger import LogUtil



LOGGER = LogUtil.get_instance()
TAG = 'CoverageMetrics'


[docs]class SensitivityConvergenceCoverage(CoverageMetrics): ''' Get the metric of sensitivity convergence coverage: the proportion of neurons that have converged to the threshold. Sensitivity convergence coverage is a metric that can be used to evaluate the convergence of the neuron sensitivity to the threshold. Args: model (Model): Model to be evaluated. threshold (float): Threshold of sensitivity convergence coverage. Default: ``0.5``. incremental (bool): Whether to use incremental mode. Default: ``False``. batch_size (int): Batch size. Default: ``32``. selected_neurons_num (int): Number of neurons selected for sensitivity convergence coverage. Default: ``100``. n_iter (int): Number of iterations. Default: ``1000``. ''' def __init__(self, model, threshold=0.5, incremental=False, batch_size=32, selected_neurons_num=100, n_iter=1000): super().__init__(model, incremental, batch_size) self.threshold = threshold self.total_converged = 0 self.total_size = 0 self.n_iter = n_iter self.selected_neurons_num = selected_neurons_num self.sensitive_neuron_idx = {} self.initial_samples = []
[docs] def get_metrics(self, dataset): ''' Obtain indicators of neuron convergence coverage. SCC measures the proportion of neuron output changes converging to Normal distribution. Args: dataset (numpy.ndarray): Dataset for evaluation. Returns: SCC_value(float), the proportion of neurons that have converged to the threshold. Examples: >>> from mindspore.common.initializer import TruncatedNormal >>> from mindspore.ops import operations as P >>> from mindspore.train import Model >>> from mindspore.ops import TensorSummary >>> from mindarmour.fuzz_testing import SensitivityConvergenceCoverage >>> class Net(nn.Cell): ... def __init__(self): ... super(Net, self).__init__() ... self.conv1 = nn.Conv2d(1, 6, 5, padding=0, weight_init=TruncatedNormal(0.02), pad_mode="valid") ... self.conv2 = nn.Conv2d(6, 16, 5, padding=0, weight_init=TruncatedNormal(0.02), pad_mode="valid") ... self.fc1 = nn.Dense(16 * 5 * 5, 120, TruncatedNormal(0.02), TruncatedNormal(0.02)) ... self.fc2 = nn.Dense(120, 84, TruncatedNormal(0.02), TruncatedNormal(0.02)) ... self.fc3 = nn.Dense(84, 10, TruncatedNormal(0.02), TruncatedNormal(0.02)) ... self.relu = nn.ReLU() ... self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2) ... self.reshape = P.Reshape() ... self.summary = TensorSummary() ... def construct(self, x): ... x = self.conv1(x) ... x = self.relu(x) ... self.summary('conv1', x) ... x = self.max_pool2d(x) ... x = self.conv2(x) ... x = self.relu(x) ... self.summary('conv2', x) ... x = self.max_pool2d(x) ... x = self.reshape(x, (-1, 16 * 5 * 5)) ... x = self.fc1(x) ... x = self.relu(x) ... self.summary('fc1', x) ... x = self.fc2(x) ... x = self.relu(x) ... self.summary('fc2', x) ... x = self.fc3(x) ... self.summary('fc3', x) ... return x >>> batch_size = 32 >>> num_classe = 10 >>> train_images = np.random.rand(32, 1, 32, 32).astype(np.float32) >>> test_images = np.random.rand(batch_size, 1, 32, 32).astype(np.float32) >>> test_labels = np.random.randint(num_classe, size=batch_size).astype(np.int32) >>> test_labels = (np.eye(num_classe)[test_labels]).astype(np.float32) >>> initial_seeds = [] >>> # make initial seeds >>> for img, label in zip(test_images, test_labels): ... initial_seeds.append([img, label]) >>> initial_seeds = initial_seeds[:batch_size] >>> SCC = SensitivityConvergenceCoverage(model,batch_size = batch_size) >>> metrics = SCC.get_metrics(test_images) ''' inputs = check_numpy_param('dataset', dataset) if not self.sensitive_neuron_idx: self._get_sensitive_neruon_idx(dataset) self._model.predict(Tensor(inputs)) time.sleep(0.01) layer_out = _get_summary_tensor_data() for layer, tensor in layer_out.items(): tensor = tensor.asnumpy().reshape(tensor.shape[0], -1) clean, benign = tensor[:tensor.shape[0] // 2], tensor[tensor.shape[0] // 2:] sensitivity = abs(clean-benign) try: sensitivity = sensitivity[:, self.sensitive_neuron_idx[layer]] except KeyError: raise RuntimeError('The layer {} is not in the sensitive_neuron_idx'.format(layer)) converged, size = self._scc(sensitivity, sensitivity.shape[1], self.threshold) self.total_converged += converged self.total_size += size scc_value = self.total_converged/self.total_size return scc_value
def _get_sensitive_neruon_idx(self, dataset): ''' Args: dataset (numpy.ndarray): Dataset for evaluation. ''' inputs = check_numpy_param('dataset', dataset) self._model.predict(Tensor(inputs)) time.sleep(0.01) layer_out = _get_summary_tensor_data() for layer, tensor in layer_out.items(): tensor = tensor.asnumpy().reshape(tensor.shape[0], -1) clean, benign = tensor[:tensor.shape[0] // 2], tensor[tensor.shape[0] // 2:] sensitivity = abs(clean-benign) self.sensitive_neuron_idx[layer] = np.argsort(np.sum(sensitivity, axis=0))[-min(self.selected_neurons_num,\ len(np.sum(sensitivity, axis=0))):] def _scc(self, sensitivity_list, size, threshold=0): ''' Args: sensitivity_list(numpy.ndarray): The sensitivity of each neuron. size(int): The number of neurons. threshold(float): The threshold of sensitivity convergence coverage. Returns: - int, The number of neurons that have converged to the threshold. - int, The number of neurons. ''' converged = 0 for i in range(sensitivity_list.shape[1]): _, acceptance_rate = self._build_mh_chain(sensitivity_list[:, i], np.mean(sensitivity_list[:, i]), self.n_iter, self._log_prob) if acceptance_rate > threshold: converged += 1 return converged, size def _proposal(self, x, stepsize): ''' Args: x(numpy.ndarray): The input of the proposal function. stepsize(float): The stepsize of the proposal function. Returns: numpy.ndarray, The output of the proposal function. ''' return np.random.uniform(low=x - 0.5 * stepsize, high=x + 0.5 * stepsize, size=x.shape) def _p_acc_mh(self, x_new, x_old, log_prob): ''' Args: x_new(numpy.ndarray): The new state. x_old(numpy.ndarray): The old state. log_prob(function): The log probability function. Returns: float, The acceptance probability. ''' return min(1, np.exp(log_prob(x_new) - log_prob(x_old))) def _log_prob(self, x): ''' Args: x(numpy.ndarray): The input of the log probability function. Returns: float, The output of the log probability function. ''' return -0.5 * np.sum(x ** 2) def _sample_mh(self, x_old, log_prob, stepsize): ''' here we determine whether we accept the new state or not: we draw a random number uniformly from [0,1] and compare it with the acceptance probability. Args: x_old(numpy.ndarray): The old state. log_prob(function): The log probability function. stepsize(float): The stepsize of the proposal function. Returns: - bool, Whether to accept the new state. - numpy.ndarray, if bool=True: return new state, else: return old state. ''' x_new = self._proposal(x_old, stepsize) accept = np.random.random() < self._p_acc_mh(x_new, x_old, log_prob) if accept: return accept, x_new return accept, x_old def _build_mh_chain(self, init, stepsize, n_total, log_prob): ''' Args: init(numpy.ndarray): The initial state. stepsize(float): The stepsize of the proposal function. n_total(int): The total number of samples. log_prob(function): The log probability function. Returns: - list, The chain of samples. - float, The acceptance rate of the chain. ''' n_accepted = 0 chain = [init] for _ in range(n_total): accept, state = self._sample_mh(chain[-1], log_prob, stepsize) chain.append(state) n_accepted += accept acceptance_rate = n_accepted / float(n_total) return chain, acceptance_rate