# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Carlini-wagner Attack.
"""
import numpy as np
from mindspore import Tensor
from mindspore.nn import Cell
from mindarmour.attacks.attack import Attack
from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_numpy_param, check_model, \
check_pair_numpy_param, check_int_positive, check_param_type, \
check_param_multi_types, check_value_positive, check_equal_shape
from mindarmour.utils.util import GradWrap
from mindarmour.utils.util import jacobian_matrix
LOGGER = LogUtil.get_instance()
TAG = 'CW'
def _best_logits_of_other_class(logits, target_class, value=1):
"""
Choose the index of the largest logits exclude target class.
Args:
logits (numpy.ndarray): Predict logits of samples.
target_class (numpy.ndarray): Target labels.
value (float): Maximum value of output logits. Default: 1.
Returns:
numpy.ndarray, the index of the largest logits exclude the target
class.
Examples:
>>> other_class = _best_logits_of_other_class([[0.2, 0.3, 0.5],
>>> [0.3, 0.4, 0.3]], [2, 1])
"""
LOGGER.debug(TAG, "enter the func _best_logits_of_other_class.")
logits, target_class = check_pair_numpy_param('logits', logits,
'target_class', target_class)
res = np.zeros_like(logits)
for i in range(logits.shape[0]):
res[i][target_class[i]] = value
return np.argmax(logits - res, axis=1)
[docs]class CarliniWagnerL2Attack(Attack):
"""
The Carlini & Wagner attack using L2 norm.
References: `Nicholas Carlini, David Wagner: "Towards Evaluating
the Robustness of Neural Networks" <https://arxiv.org/abs/1608.04644>`_
Args:
network (Cell): Target model.
num_classes (int): Number of labels of model output, which should be
greater than zero.
box_min (float): Lower bound of input of the target model. Default: 0.
box_max (float): Upper bound of input of the target model. Default: 1.0.
bin_search_steps (int): The number of steps for the binary search
used to find the optimal trade-off constant between distance
and confidence. Default: 5.
max_iterations (int): The maximum number of iterations, which should be
greater than zero. Default: 1000.
confidence (float): Confidence of the output of adversarial examples.
Default: 0.
learning_rate (float): The learning rate for the attack algorithm.
Default: 5e-3.
initial_const (float): The initial trade-off constant to use to balance
the relative importance of perturbation norm and confidence
difference. Default: 1e-2.
abort_early_check_ratio (float): Check loss progress every ratio of
all iteration. Default: 5e-2.
targeted (bool): If True, targeted attack. If False, untargeted attack.
Default: False.
fast (bool): If True, return the first found adversarial example.
If False, return the adversarial samples with smaller
perturbations. Default: True.
abort_early (bool): If True, Adam will be aborted if the loss hasn't
decreased for some time. If False, Adam will continue work until the
max iterations is arrived. Default: True.
sparse (bool): If True, input labels are sparse-coded. If False,
input labels are onehot-coded. Default: True.
Examples:
>>> attack = CarliniWagnerL2Attack(network)
"""
def __init__(self, network, num_classes, box_min=0.0, box_max=1.0,
bin_search_steps=5, max_iterations=1000, confidence=0,
learning_rate=5e-3, initial_const=1e-2,
abort_early_check_ratio=5e-2, targeted=False,
fast=True, abort_early=True, sparse=True):
LOGGER.info(TAG, "init CW object.")
super(CarliniWagnerL2Attack, self).__init__()
self._network = check_model('network', network, Cell)
self._network.set_grad(True)
self._num_classes = check_int_positive('num_classes', num_classes)
self._min = check_param_type('box_min', box_min, float)
self._max = check_param_type('box_max', box_max, float)
self._bin_search_steps = check_int_positive('search_steps',
bin_search_steps)
self._max_iterations = check_int_positive('max_iterations',
max_iterations)
self._confidence = check_param_multi_types('confidence', confidence,
[int, float])
self._learning_rate = check_value_positive('learning_rate',
learning_rate)
self._initial_const = check_value_positive('initial_const',
initial_const)
self._abort_early = check_param_type('abort_early', abort_early, bool)
self._fast = check_param_type('fast', fast, bool)
self._abort_early_check_ratio = check_value_positive('abort_early_check_ratio',
abort_early_check_ratio)
self._targeted = check_param_type('targeted', targeted, bool)
self._net_grad = GradWrap(self._network)
self._sparse = check_param_type('sparse', sparse, bool)
self._dtype = None
def _loss_function(self, logits, new_x, org_x, org_or_target_class,
constant, confidence):
"""
Calculate the value of loss function and gradients of loss w.r.t inputs.
Args:
logits (numpy.ndarray): The output of network before softmax.
new_x (numpy.ndarray): Adversarial examples.
org_x (numpy.ndarray): Original benign input samples.
org_or_target_class (numpy.ndarray): Original/target labels.
constant (float): A trade-off constant to use to balance loss
and perturbation norm.
confidence (float): Confidence level of the output of adversarial
examples.
Returns:
numpy.ndarray, norm of perturbation, sum of the loss and the
norm, and gradients of the sum w.r.t inputs.
Raises:
ValueError: If loss is less than 0.
Examples:
>>> L2_loss, total_loss, dldx = self._loss_function([0.2 , 0.3,
>>> 0.5], [0.1, 0.2, 0.2, 0.4], [0.12, 0.2, 0.25, 0.4], [1], 2, 0)
"""
LOGGER.debug(TAG, "enter the func _loss_function.")
logits = check_numpy_param('logits', logits)
org_x = check_numpy_param('org_x', org_x)
new_x, org_or_target_class = check_pair_numpy_param('new_x',
new_x,
'org_or_target_class',
org_or_target_class)
new_x, org_x = check_equal_shape('new_x', new_x, 'org_x', org_x)
other_class_index = _best_logits_of_other_class(
logits, org_or_target_class, value=np.inf)
loss1 = np.sum((new_x - org_x)**2,
axis=tuple(range(len(new_x.shape))[1:]))
loss2 = np.zeros_like(loss1, dtype=self._dtype)
loss2_grade = np.zeros_like(new_x, dtype=self._dtype)
jaco_grad = jacobian_matrix(self._net_grad, new_x, self._num_classes)
if self._targeted:
for i in range(org_or_target_class.shape[0]):
loss2[i] = max(0, logits[i][other_class_index[i]]
- logits[i][org_or_target_class[i]]
+ confidence)
loss2_grade[i] = constant[i]*(jaco_grad[other_class_index[
i]][i] - jaco_grad[org_or_target_class[i]][i])
else:
for i in range(org_or_target_class.shape[0]):
loss2[i] = max(0, logits[i][org_or_target_class[i]]
- logits[i][other_class_index[i]] + confidence)
loss2_grade[i] = constant[i]*(jaco_grad[org_or_target_class[
i]][i] - jaco_grad[other_class_index[i]][i])
total_loss = loss1 + constant*loss2
loss1_grade = 2*(new_x - org_x)
for i in range(org_or_target_class.shape[0]):
if loss2[i] < 0:
msg = 'loss value should greater than or equal to 0, ' \
'but got loss2 {}'.format(loss2[i])
LOGGER.error(TAG, msg)
raise ValueError(msg)
if loss2[i] == 0:
loss2_grade[i, ...] = 0
total_loss_grade = loss1_grade + loss2_grade
return loss1, total_loss, total_loss_grade
def _to_attack_space(self, inputs):
"""
Transform input data into attack space.
Args:
inputs (numpy.ndarray): Input data.
Returns:
numpy.ndarray, transformed data which belongs to attack space.
Examples:
>>> x_att = self._to_attack_space([0.2, 0.3, 0.3])
"""
LOGGER.debug(TAG, "enter the func _to_attack_space.")
inputs = check_numpy_param('inputs', inputs)
mean = (self._min + self._max) / 2
diff = (self._max - self._min) / 2
inputs = (inputs - mean) / diff
inputs = inputs*0.999999
return np.arctanh(inputs)
def _to_model_space(self, inputs):
"""
Transform input data into model space.
Args:
inputs (numpy.ndarray): Input data.
Returns:
numpy.ndarray, transformed data which belongs to model space
and the gradient of x_model w.r.t. x_att.
Examples:
>>> x_att = self._to_model_space([10, 21, 9])
"""
LOGGER.debug(TAG, "enter the func _to_model_space.")
inputs = check_numpy_param('inputs', inputs)
inputs = np.tanh(inputs)
the_grad = 1 - np.square(inputs)
mean = (self._min + self._max) / 2
diff = (self._max - self._min) / 2
inputs = inputs*diff + mean
the_grad = the_grad*diff
return inputs, the_grad
[docs] def generate(self, inputs, labels):
"""
Generate adversarial examples based on input data and targeted labels.
Args:
inputs (numpy.ndarray): Input samples.
labels (numpy.ndarray): The ground truth label of input samples
or target labels.
Returns:
numpy.ndarray, generated adversarial examples.
Examples:
>>> advs = attack.generate([[0.1, 0.2, 0.6], [0.3, 0, 0.4]], [1, 2]]
"""
LOGGER.debug(TAG, "enter the func generate.")
inputs, labels = check_pair_numpy_param('inputs', inputs,
'labels', labels)
if not self._sparse:
labels = np.argmax(labels, axis=1)
self._dtype = inputs.dtype
att_original = self._to_attack_space(inputs)
reconstructed_original, _ = self._to_model_space(att_original)
# find an adversarial sample
const = np.ones_like(labels, dtype=self._dtype)*self._initial_const
lower_bound = np.zeros_like(labels, dtype=self._dtype)
upper_bound = np.ones_like(labels, dtype=self._dtype)*np.inf
adversarial_res = inputs.copy()
adversarial_loss = np.ones_like(labels, dtype=self._dtype)*np.inf
samples_num = labels.shape[0]
adv_flag = np.zeros_like(labels)
for binary_search_step in range(self._bin_search_steps):
if (binary_search_step == self._bin_search_steps - 1) and \
(self._bin_search_steps >= 10):
const = min(1e10, upper_bound)
LOGGER.debug(TAG,
'starting optimization with const = %s',
str(const))
att_perturbation = np.zeros_like(att_original, dtype=self._dtype)
loss_at_previous_check = np.ones_like(labels, dtype=self._dtype)*np.inf
# create a new optimizer to minimize the perturbation
optimizer = _AdamOptimizer(att_perturbation.shape)
for iteration in range(self._max_iterations):
x_input, dxdp = self._to_model_space(
att_original + att_perturbation)
logits = self._network(Tensor(x_input)).asnumpy()
current_l2_loss, current_loss, dldx = self._loss_function(
logits, x_input, reconstructed_original,
labels, const, self._confidence)
# check if attack success (include all examples)
if self._targeted:
is_adv = (np.argmax(logits, axis=1) == labels)
else:
is_adv = (np.argmax(logits, axis=1) != labels)
for i in range(samples_num):
if is_adv[i]:
adv_flag[i] = True
if current_l2_loss[i] < adversarial_loss[i]:
adversarial_res[i] = x_input[i]
adversarial_loss[i] = current_l2_loss[i]
if np.all(adv_flag):
if self._fast:
LOGGER.debug(TAG, "succeed find adversarial examples.")
msg = 'iteration: {}, logits_att: {}, ' \
'loss: {}, l2_dist: {}' \
.format(iteration,
np.argmax(logits, axis=1),
current_loss, current_l2_loss)
LOGGER.debug(TAG, msg)
return adversarial_res
dldx, inputs = check_equal_shape('dldx', dldx, 'inputs', inputs)
gradient = dldx*dxdp
att_perturbation += \
optimizer(gradient, self._learning_rate)
# check if should stop iteration early
flag = True
iter_check = iteration % (np.ceil(
self._max_iterations*self._abort_early_check_ratio))
if self._abort_early and iter_check == 0:
# check progress
for i in range(inputs.shape[0]):
if current_loss[i] <= .9999*loss_at_previous_check[i]:
flag = False
# stop Adam if all samples has no progress
if flag:
LOGGER.debug(TAG,
'step:%d, no progress yet, stop iteration',
binary_search_step)
break
loss_at_previous_check = current_loss
for i in range(samples_num):
# update bound based on search result
if adv_flag[i]:
LOGGER.debug(TAG,
'example %d, found adversarial with const=%f',
i, const[i])
upper_bound[i] = const[i]
else:
LOGGER.debug(TAG,
'example %d, failed to find adversarial'
' with const=%f',
i, const[i])
lower_bound[i] = const[i]
if upper_bound[i] == np.inf:
const[i] *= 10
else:
const[i] = (lower_bound[i] + upper_bound[i]) / 2
return adversarial_res
class _AdamOptimizer:
"""
AdamOptimizer is used to calculate the optimum attack step.
Args:
shape (tuple): The shape of perturbations.
Examples:
>>> optimizer = _AdamOptimizer(att_perturbation.shape)
"""
def __init__(self, shape):
self._m = np.zeros(shape)
self._v = np.zeros(shape)
self._t = 0
def __call__(self, gradient, learning_rate=0.001,
beta1=0.9, beta2=0.999, epsilon=1e-8):
"""
Calculate the optimum perturbation for each iteration.
Args:
gradient (numpy.ndarray): The gradient of the loss w.r.t. to the
variable.
learning_rate (float): The learning rate in the current iteration.
Default: 0.001.
beta1 (float): Decay rate for calculating the exponentially
decaying average of past gradients. Default: 0.9.
beta2 (float): Decay rate for calculating the exponentially
decaying average of past squared gradients. Default: 0.999.
epsilon (float): Small value to avoid division by zero.
Default: 1e-8.
Returns:
numpy.ndarray, perturbations.
Examples:
>>> perturbs = optimizer([0.2, 0.1, 0.15], 0.005)
"""
gradient = check_numpy_param('gradient', gradient)
self._t += 1
self._m = beta1*self._m + (1 - beta1)*gradient
self._v = beta2*self._v + (1 - beta2)*gradient**2
alpha = learning_rate*np.sqrt(1 - beta2**self._t) / (1 - beta1**self._t)
pertur = -alpha*self._m / (np.sqrt(self._v) + epsilon)
return pertur