{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 对抗示例生成\n", "\n", "[![](https://gitee.com/mindspore/docs/raw/r1.3/resource/_static/logo_source.png)](https://gitee.com/mindspore/docs/blob/r1.3/tutorials/source_zh_cn/intermediate/image_and_video/adversarial_example_generation.ipynb) [![](https://gitee.com/mindspore/docs/raw/r1.3/resource/_static/logo_notebook.png)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/r1.3/tutorials/zh_cn/mindspore_adversarial_example_generation.ipynb) [![](https://gitee.com/mindspore/docs/raw/r1.3/resource/_static/logo_modelarts.png)](https://authoring-modelarts-cnnorth4.huaweicloud.com/console/lab?share-url-b64=aHR0cHM6Ly9taW5kc3BvcmUtd2Vic2l0ZS5vYnMuY24tbm9ydGgtNC5teWh1YXdlaWNsb3VkLmNvbS9ub3RlYm9vay9yMS4zL3R1dG9yaWFscy96aF9jbi9taW5kc3BvcmVfYWR2ZXJzYXJpYWxfZXhhbXBsZV9nZW5lcmF0aW9uLmlweW5i&imageid=59a6e9f5-93c0-44dd-85b0-82f390c5d53b)\n", "\n", "近年来随着数据、计算能力、理论的不断发展演进,深度学习在图像、文本、语音、自动驾驶等众多领域都得到了广泛应用。与此同时,人们也越来越关注各类模型在使用过程中的安全问题,因为AI模型很容易受到外界有意无意的攻击而产生错误的结果。在本案例中,我们将以梯度符号攻击FGSM(Fast Gradient Sign Method)为例,演示此类攻击是如何误导模型的。\n", "\n", "> 本篇基于MindSpore v1.3.0,CPU/GPU/Ascend环境运行。\n", "\n", "## 对抗样本定义\n", "\n", "Szegedy在2013年最早提出对抗样本的概念:在原始样本处加入人类无法察觉的微小扰动,使得深度模型性能下降,这种样本即对抗样本。如下图所示,本来预测为“panda”的图像在添加噪声之后,模型就将其预测为“gibbon”,右边的样本就是一个对抗样本:\n", "\n", "![fgsm-panda-image](images/panda.png)\n", "\n", "> 图片来自[Explaining and Harnessing Adversarial Examples](https://arxiv.org/abs/1412.6572)。\n", "\n", "\n", "## 攻击方法\n", "\n", "对模型的攻击方法可以按照以下方法分类:\n", "\n", "1. 攻击者掌握的信息多少:\n", "\n", "1.1 白盒攻击:攻击者具有对模型的全部知识和访问权限,包括模型结构、权重、输入、输出。攻击者在产生对抗性攻击数据的过程中能够与模型系统有所交互。攻击者可以针对被攻击模型的特性设计特定的攻击算法。\n", "\n", "1.2 黑盒攻击:与白盒攻击相反,攻击者仅具有关于模型的有限知识。攻击者对模型的结构权重一无所知,仅了解部分输入输出。\n", "\n", "2. 攻击者的目的:\n", "\n", "2.1 有目标的攻击:攻击者将模型结果误导为特定分类。\n", "\n", "2.2 无目标的攻击:攻击者只想产生错误结果,而不在乎新结果是什么。\n", "\n", "本案例中用到的FGSM是一种白盒攻击方法,既可以是有目标也可以是无目标攻击。\n", "\n", "更多的模型安全功能可参考[MindArmour](https://www.mindspore.cn/mindarmour),现支持FGSM、LLC、Substitute Attack等多种对抗样本生成方法,并提供对抗样本鲁棒性模块、Fuzz Testing模块、隐私保护与评估模块,帮助用户增强模型安全性。\n", "\n", "### 快速梯度符号攻击(FGSM)\n", "\n", "正常分类网络的训练会定义一个损失函数,用于衡量模型输出值与样本真实标签的距离,通过反向传播计算模型梯度,梯度下降更新网络参数,减小损失值,提升模型精度。\n", "\n", "FGSM(Fast Gradient Sign Method)是一种简单高效的对抗样本生成方法。不同于正常分类网络的训练过程,FGSM通过计算loss对于输入的梯度$\\nabla_x J(\\theta ,x ,y)$,这个梯度表征了loss对于输入变化的敏感性。然后在原始输入加上上述梯度,使得loss增大,模型对于改造后的输入样本分类效果变差,达到攻击效果。对抗样本的另一要求是生成样本与原始样本的差异要尽可能的小,使用sign函数可以使得修改图片时尽可能的均匀。\n", "\n", "产生的对抗扰动用公式可以表示为:\n", "\n", "$$ \\eta = \\varepsilon sign(\\nabla_x J(\\theta))$$\n", "\n", "对抗样本可公式化为:\n", "\n", "$$ x' = x + \\epsilon \\times sign(\\nabla_x J(\\theta ,x ,y)) $$\n", "\n", "其中,\n", "- $x$:正确分类为“Pandas”的原始输入图像。\n", "- $y$:是$x$的输出。\n", "- $\\theta$:模型参数。\n", "- $\\varepsilon$:攻击系数。\n", "- $J(\\theta, x, y)$:训练网络的损失。\n", "- $\\nabla_x J(\\theta)$:反向传播梯度。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 实验前准备\n", "\n", "### 导入模型训练需要的库\n", "\n", "本案例将使用MNIST训练一个精度达标的LeNet网络,然后运行上文中所提到的FGSM攻击方法,实现错误分类的效果。\n", "\n", "首先导入模型训练需要的库" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import os\n", "import numpy as np\n", "\n", "from mindspore import Tensor, context, Model, load_checkpoint, load_param_into_net\n", "import mindspore.nn as nn\n", "import mindspore.ops as ops\n", "from mindspore.common.initializer import Normal\n", "from mindspore.train.callback import LossMonitor, ModelCheckpoint, CheckpointConfig\n", "import mindspore.dataset as ds\n", "import mindspore.dataset.transforms.c_transforms as C\n", "import mindspore.dataset.vision.c_transforms as CV\n", "from mindspore.dataset.vision import Inter\n", "from mindspore import dtype as mstype" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 下载数据集\n", "\n", "下载MINIST数据集:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[01;34m./datasets/MNIST_Data\u001b[00m\r\n", "├── \u001b[01;34mtest\u001b[00m\r\n", "│   ├── t10k-images-idx3-ubyte\r\n", "│   └── t10k-labels-idx1-ubyte\r\n", "└── \u001b[01;34mtrain\u001b[00m\r\n", " ├── train-images-idx3-ubyte\r\n", " └── train-labels-idx1-ubyte\r\n", "\r\n", "2 directories, 4 files\r\n" ] } ], "source": [ "!mkdir -p ./datasets/MNIST_Data/train ./datasets/MNIST_Data/test\n", "!wget -NP ./datasets/MNIST_Data/train https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-labels-idx1-ubyte\n", "!wget -NP ./datasets/MNIST_Data/train https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-images-idx3-ubyte\n", "!wget -NP ./datasets/MNIST_Data/test https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/t10k-labels-idx1-ubyte\n", "!wget -NP ./datasets/MNIST_Data/test https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/t10k-images-idx3-ubyte\n", "!tree ./datasets/MNIST_Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 攻击准备\n", "\n", "在完成准备工作之后,开始训练精度达标的LeNet网络。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "采用`GRAPH_MODE`在CPU/GPU/Ascend中运行本案例,下面将硬件设定为Ascend:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "context.set_context(mode=context.GRAPH_MODE, device_target='Ascend')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 训练LeNet网络\n", "\n", "实验中使用LeNet作为演示模型完成图像分类,这里先定义网络并使用MNIST数据集进行训练。\n", "\n", "定义LeNet网络:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class LeNet5(nn.Cell):\n", "\n", " def __init__(self, num_class=10, num_channel=1):\n", " super(LeNet5, self).__init__()\n", " self.conv1 = nn.Conv2d(num_channel, 6, 5, pad_mode='valid')\n", " self.conv2 = nn.Conv2d(6, 16, 5, pad_mode='valid')\n", " self.fc1 = nn.Dense(16 * 5 * 5, 120, weight_init=Normal(0.02))\n", " self.fc2 = nn.Dense(120, 84, weight_init=Normal(0.02))\n", " self.fc3 = nn.Dense(84, num_class, weight_init=Normal(0.02))\n", " self.relu = nn.ReLU()\n", " self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)\n", " self.flatten = nn.Flatten()\n", "\n", " def construct(self, x):\n", " x = self.conv1(x)\n", " x = self.relu(x)\n", " x = self.max_pool2d(x)\n", " x = self.conv2(x)\n", " x = self.relu(x)\n", " x = self.max_pool2d(x)\n", " x = self.flatten(x)\n", " x = self.fc1(x)\n", " x = self.relu(x)\n", " x = self.fc2(x)\n", " x = self.relu(x)\n", " x = self.fc3(x)\n", " return x\n", "\n", "net = LeNet5()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "进行数据处理:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def create_dataset(data_path, batch_size=1, num_parallel_workers=1):\n", "\n", " # 定义数据集\n", " mnist_ds = ds.MnistDataset(data_path)\n", " resize_height, resize_width = 32, 32\n", " rescale = 1.0 / 255.0\n", " shift = 0.0\n", " rescale_nml = 1 / 0.3081\n", " shift_nml = -1 * 0.1307 / 0.3081\n", "\n", " # 定义所需要操作的map映射\n", " resize_op = CV.Resize((resize_height, resize_width), interpolation=Inter.LINEAR)\n", " rescale_nml_op = CV.Rescale(rescale_nml, shift_nml)\n", " rescale_op = CV.Rescale(rescale, shift)\n", " hwc2chw_op = CV.HWC2CHW()\n", " type_cast_op = C.TypeCast(mstype.int32)\n", "\n", " # 使用map映射函数,将数据操作应用到数据集\n", " mnist_ds = mnist_ds.map(operations=type_cast_op, input_columns=\"label\", num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(operations=resize_op, input_columns=\"image\", num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(operations=rescale_op, input_columns=\"image\", num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(operations=rescale_nml_op, input_columns=\"image\", num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(operations=hwc2chw_op, input_columns=\"image\", num_parallel_workers=num_parallel_workers)\n", "\n", " # 进行shuffle、batch操作\n", " buffer_size = 10000\n", " mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size)\n", " mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)\n", "\n", " return mnist_ds" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "定义优化器与损失函数:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')\n", "net_opt = nn.Momentum(net.trainable_params(), learning_rate=0.01, momentum=0.9)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "定义网络参数:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "config_ck = CheckpointConfig(save_checkpoint_steps=1875, keep_checkpoint_max=10)\n", "ckpoint = ModelCheckpoint(prefix=\"checkpoint_lenet\", config=config_ck)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "定义LeNet网络的训练函数和测试函数:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def test_net(model, data_path):\n", " ds_eval = create_dataset(os.path.join(data_path, \"test\"))\n", " acc = model.eval(ds_eval, dataset_sink_mode=False)\n", " print(\"{}\".format(acc))\n", " \n", "def train_net(model, epoch_size, data_path, ckpoint_cb, sink_mode):\n", " ds_train = create_dataset(os.path.join(data_path, \"train\"), 32)\n", " model.train(epoch_size, ds_train, callbacks=[ckpoint_cb, LossMonitor(125)], dataset_sink_mode=sink_mode)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "train_epoch = 1\n", "mnist_path = \"./datasets/MNIST_Data/\"\n", "model = Model(net, net_loss, net_opt, metrics={\"Accuracy\": nn.Accuracy()})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "训练LeNet网络:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "epoch: 1 step: 125, loss is 2.3018382\n", "epoch: 1 step: 250, loss is 2.2910337\n", "epoch: 1 step: 375, loss is 2.2876282\n", "epoch: 1 step: 500, loss is 2.293197\n", "epoch: 1 step: 625, loss is 2.2983356\n", "epoch: 1 step: 750, loss is 0.73134214\n", "epoch: 1 step: 875, loss is 0.39000687\n", "epoch: 1 step: 1000, loss is 0.12004304\n", "epoch: 1 step: 1125, loss is 0.10009943\n", "epoch: 1 step: 1250, loss is 0.31425583\n", "epoch: 1 step: 1375, loss is 0.14330618\n", "epoch: 1 step: 1500, loss is 0.05759584\n", "epoch: 1 step: 1625, loss is 0.18315211\n", "epoch: 1 step: 1750, loss is 0.19758298\n", "epoch: 1 step: 1875, loss is 0.0815863\n" ] } ], "source": [ "train_net(model, train_epoch, mnist_path, ckpoint, False)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "测试此时的网络,可以观察到LeNet已经达到比较高的精度:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'Accuracy': 0.9691}\n" ] } ], "source": [ "test_net(model, mnist_path)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "加载已经训练好的LeNet模型:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "param_dict = load_checkpoint(\"checkpoint_lenet-1_1875.ckpt\")\n", "load_param_into_net(net, param_dict)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "### 实现FGSM\n", "\n", "在得到精准的LeNet网络之后,下面将会采用FSGM攻击方法,在图像中加载噪声后重新进行测试。\n", "\n", "先通过损失函数求取反向梯度:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "class WithLossCell(nn.Cell):\n", " \"\"\"\n", " 包装网络与损失函数\n", " \"\"\"\n", " def __init__(self, network, loss_fn):\n", " super(WithLossCell, self).__init__()\n", " self._network = network\n", " self._loss_fn = loss_fn\n", "\n", " def construct(self, data, label):\n", " out = self._network(data)\n", " return self._loss_fn(out, label)\n", "\n", "\n", "class GradWrapWithLoss(nn.Cell):\n", " \"\"\"\n", " 通过loss求反向梯度\n", " \"\"\"\n", " def __init__(self, network):\n", " super(GradWrapWithLoss, self).__init__()\n", " self._grad_all = ops.composite.GradOperation(get_all=True, sens_param=False)\n", " self._network = network\n", "\n", " def construct(self, inputs, labels):\n", " gout = self._grad_all(self._network)(inputs, labels)\n", " return gout[0]\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "然后根据公式实现FGSM攻击:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "class FastGradientSignMethod:\n", " \"\"\"\n", " 实现FGSM攻击\n", " \"\"\"\n", " def __init__(self, network, eps=0.07, loss_fn=None):\n", " # 变量初始化\n", " self._network = network\n", " self._eps = eps\n", " with_loss_cell = WithLossCell(self._network, loss_fn)\n", " self._grad_all = GradWrapWithLoss(with_loss_cell)\n", " self._grad_all.set_train()\n", "\n", "\n", " def _gradient(self, inputs, labels):\n", " # 求取梯度\n", " out_grad = self._grad_all(inputs, labels)\n", " gradient = out_grad.asnumpy()\n", " gradient = np.sign(gradient)\n", " return gradient\n", " \n", " def generate(self, inputs, labels):\n", " # 实现FGSM\n", " inputs_tensor = Tensor(inputs)\n", " labels_tensor = Tensor(labels) \n", " gradient = self._gradient(inputs_tensor, labels_tensor)\n", " # 产生扰动\n", " perturbation = self._eps*gradient\n", " # 生成受到扰动的图片\n", " adv_x = inputs + perturbation\n", " return adv_x\n", " \n", " def batch_generate(self, inputs, labels, batch_size=32):\n", " # 对数据集进行处理\n", " arr_x = inputs\n", " arr_y = labels\n", " len_x = len(inputs)\n", " batches = int(len_x / batch_size)\n", " rest = len_x - batches*batch_size\n", " res = []\n", " for i in range(batches):\n", " x_batch = arr_x[i*batch_size: (i + 1)*batch_size]\n", " y_batch = arr_y[i*batch_size: (i + 1)*batch_size]\n", " adv_x = self.generate(x_batch, y_batch)\n", " res.append(adv_x)\n", " adv_x = np.concatenate(res, axis=0)\n", " return adv_x \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "再次处理MINIST数据集中测试集的图片:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "images = []\n", "labels = []\n", "test_images = []\n", "test_labels = []\n", "predict_labels = []\n", "\n", "ds_test = create_dataset(os.path.join(mnist_path, \"test\"), batch_size=32).create_dict_iterator(output_numpy=True)\n", "\n", "for data in ds_test:\n", " images = data['image'].astype(np.float32)\n", " labels = data['label']\n", " test_images.append(images)\n", " test_labels.append(labels)\n", " pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(), axis=1)\n", " predict_labels.append(pred_labels)\n", " \n", "test_images = np.concatenate(test_images)\n", "predict_labels = np.concatenate(predict_labels)\n", "true_labels = np.concatenate(test_labels)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 运行攻击\n", "\n", "由FGSM攻击公式中可以看出,攻击系数$\\varepsilon$越大,对梯度的改变就越大。当$\\varepsilon$ 为零时则攻击效果不体现。\n", "\n", "$$ \\eta = \\varepsilon sign(\\nabla_x J(\\theta)) $$\n", "\n", "现在先观察当$\\varepsilon$为零时的攻击效果:" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.9602363782051282\n" ] } ], "source": [ "fgsm = FastGradientSignMethod(net, eps=0.0, loss_fn=net_loss)\n", "advs = fgsm.batch_generate(test_images, true_labels, batch_size=32)\n", "\n", "adv_predicts = model.predict(Tensor(advs)).asnumpy()\n", "adv_predicts = np.argmax(adv_predicts, axis=1)\n", "accuracy = np.mean(np.equal(adv_predicts, true_labels))\n", "print(accuracy)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "再将$\\varepsilon$设定为0.5,尝试运行攻击:" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.3212139423076923\n" ] } ], "source": [ "fgsm = FastGradientSignMethod(net, eps=0.5, loss_fn=net_loss)\n", "advs = fgsm.batch_generate(test_images, true_labels, batch_size=32)\n", "\n", "adv_predicts = model.predict(Tensor(advs)).asnumpy()\n", "adv_predicts = np.argmax(adv_predicts, axis=1)\n", "accuracy = np.mean(np.equal(adv_predicts, true_labels))\n", "print(accuracy)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "此时LeNet模型的精度大幅降低。\n", "\n", "下面演示受攻击照片现在的实际形态,可以看出图片并没有发生明显的改变,然而在精度测试中却有了不一样的结果:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "import matplotlib.pyplot as plt\n", "\n", "adv_examples = np.transpose(advs[:10],[0,2,3,1])\n", "ori_examples = np.transpose(test_images[:10],[0,2,3,1]) \n", "\n", "plt.figure() \n", "for i in range(10):\n", " plt.subplot(2,10,i+1) \n", " plt.imshow(ori_examples[i])\n", " plt.subplot(2,10,i+11) \n", " plt.imshow(adv_examples[i])\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 4 }