昇思MindSpore 应用学习-ResNet50图像分类-CSDN

日期

心得

昇思MindSpore 应用学习-ResNet50图像分类学习 (AI 代码解析)

图像分类是最基础的计算机视觉应用,属于有监督学习类别,如给定一张图像(猫、狗、飞机、汽车等等),判断图像所属的类别。本章将介绍使用ResNet50网络对CIFAR-10数据集进行分类。

ResNet网络介绍

ResNet50网络是2015年由微软实验室的何恺明提出,获得ILSVRC2015图像分类竞赛第一名。在ResNet网络提出之前,传统的卷积神经网络都是将一系列的卷积层和池化层堆叠得到的,但当网络堆叠到一定深度时,就会出现退化问题。下图是在CIFAR-10数据集上使用56层网络与20层网络训练误差和测试误差图,由图中数据可以看出,56层网络比20层网络训练误差和测试误差更大,随着网络的加深,其误差并没有如预想的一样减小。

ResNet网络提出了残差网络结构(Residual Network)来减轻退化问题,使用ResNet网络可以实现搭建较深的网络结构(突破1000层)。论文中使用ResNet网络在CIFAR-10数据集上的训练误差与测试误差图如下图所示,图中虚线表示训练误差,实线表示测试误差。由图中数据可以看出,ResNet网络层数越深,其训练误差和测试误差越小。

了解ResNet网络更多详细内容,参见ResNet论文

数据集准备与加载

CIFAR-10数据集共有60000张32*32的彩色图像,分为10个类别,每类有6000张图,数据集一共有50000张训练图片和10000张评估图片。首先,如下示例使用download接口下载并解压,目前仅支持解析二进制版本的CIFAR-10文件(CIFAR-10 binary version)。

from download import download  # 从download模块导入download函数

url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"
# 定义一个字符串变量url,表示需要下载的文件的URL地址

download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)
# 调用download函数,下载指定URL的文件到本地目录"./datasets-cifar10-bin"
# 参数解析:
# 1. url: 要下载文件的URL
# 2. "./datasets-cifar10-bin": 本地保存下载文件的目录
# 3. kind="tar.gz": 指定下载文件的类型为.tar.gz,表示下载后需要进行解压
# 4. replace=True: 如果本地目录已经存在同名文件,是否替换(True表示替换)
  1. 导入模块
from download import download

在代码开始部分,从download模块中导入了download函数,这个函数负责下载文件。

  1. 定义URL变量
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"

这一行代码定义了一个字符串变量url,表示要下载的文件的URL地址。这里的文件是CIFAR-10数据集的tar.gz压缩包。

  1. 调用download函数
download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)

这行代码调用了download函数,并传入了四个参数:

  • url:待下载文件的URL地址。

  • "./datasets-cifar10-bin":下载文件保存的本地目录路径。

  • kind="tar.gz":指定下载文件的类型为tar.gz,指示函数在下载完成后进行解压。

  • replace=True:如果本地已经存在同名文件,是否替换它。True表示替换,False表示不替换。

  • download(url, path, kind, replace)download函数是一个常用的文件下载函数。其具体功能是从指定的URL下载文件,并将其保存到指定的本地目录。该函数还支持对下载的文件进行解压缩(取决于kind参数的值),以及在本地存在同名文件时选择是否替换。

    • url:字符串类型,表示要下载文件的URL。
    • path:字符串类型,表示下载文件的保存路径。
    • kind:字符串类型,表示下载文件的类型,如tar.gzzip等,函数会根据类型进行相应的解压缩操作。
    • replace:布尔类型,表示是否替换本地已有的同名文件。

‘./datasets-cifar10-bin’
下载后的数据集目录结构如下:

datasets-cifar10-bin/cifar-10-batches-bin
├── batches.meta.text
├── data_batch_1.bin
├── data_batch_2.bin
├── data_batch_3.bin
├── data_batch_4.bin
├── data_batch_5.bin
├── readme.html
└── test_batch.bin

然后,使用mindspore.dataset.Cifar10Dataset接口来加载数据集,并进行相关图像增强操作。

import mindspore as ms  # 导入MindSpore框架
import mindspore.dataset as ds  # 导入MindSpore的数据集模块
import mindspore.dataset.vision as vision  # 导入MindSpore的视觉处理模块
import mindspore.dataset.transforms as transforms  # 导入MindSpore的数据变换模块
from mindspore import dtype as mstype  # 导入MindSpore的数据类型模块

data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin"  # 数据集根目录
batch_size = 256  # 批量大小
image_size = 32  # 训练图像空间大小
workers = 4  # 并行线程个数
num_classes = 10  # 分类数量

def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
    """
    创建CIFAR-10数据集
    
    参数:
    dataset_dir: str, 数据集路径
    usage: str, 数据集用途("train"或"test")
    resize: int, 图像调整大小
    batch_size: int, 批量大小
    workers: int, 并行线程个数
    
    返回:
    data_set: MindSpore数据集对象
    """

    # 加载CIFAR-10数据集
    data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
                                 usage=usage,
                                 num_parallel_workers=workers,
                                 shuffle=True)

    trans = []  # 图像变换操作列表
    if usage == "train":
        # 如果是训练集,添加随机裁剪和随机水平翻转
        trans += [
            vision.RandomCrop((32, 32), (4, 4, 4, 4)),
            vision.RandomHorizontalFlip(prob=0.5)
        ]

    # 通用图像变换操作
    trans += [
        vision.Resize(resize),
        vision.Rescale(1.0 / 255.0, 0.0),
        vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
        vision.HWC2CHW()
    ]

    target_trans = transforms.TypeCast(mstype.int32)  # 标签变换操作

    # 对图像数据进行变换
    data_set = data_set.map(operations=trans,
                            input_columns='image',
                            num_parallel_workers=workers)

    # 对标签数据进行类型转换
    data_set = data_set.map(operations=target_trans,
                            input_columns='label',
                            num_parallel_workers=workers)

    # 批量操作
    data_set = data_set.batch(batch_size)

    return data_set

# 获取处理后的训练与测试数据集
dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
                                       usage="train",
                                       resize=image_size,
                                       batch_size=batch_size,
                                       workers=workers)
step_size_train = dataset_train.get_dataset_size()

dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
                                     usage="test",
                                     resize=image_size,
                                     batch_size=batch_size,
                                     workers=workers)
step_size_val = dataset_val.get_dataset_size()
  1. 导入模块
import mindspore as ms
import mindspore.dataset as ds
import mindspore.dataset.vision as vision
import mindspore.dataset.transforms as transforms
from mindspore import dtype as mstype

导入了MindSpore框架及其相关的数据集、视觉处理和数据变换模块。

  1. 定义数据集路径和参数
data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin"  # 数据集根目录
batch_size = 256  # 批量大小
image_size = 32  # 训练图像空间大小
workers = 4  # 并行线程个数
num_classes = 10  # 分类数量

定义了数据集路径和一些参数,包括批量大小、图像尺寸、并行线程个数和分类数量。

  1. 创建数据集函数
def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
    """
    创建CIFAR-10数据集
    
    参数:
    dataset_dir: str, 数据集路径
    usage: str, 数据集用途("train"或"test")
    resize: int, 图像调整大小
    batch_size: int, 批量大小
    workers: int, 并行线程个数
    
    返回:
    data_set: MindSpore数据集对象
    """

定义了一个函数create_dataset_cifar10,用于创建CIFAR-10数据集,函数参数包括数据集路径、用途(训练或测试)、图像调整大小、批量大小和并行线程个数。

  1. 加载和变换数据
data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
                             usage=usage,
                             num_parallel_workers=workers,
                             shuffle=True)

加载CIFAR-10数据集,并根据用途添加相应的图像变换操作。

  1. 图像变换操作
trans = []
if usage == "train":
    trans += [
        vision.RandomCrop((32, 32), (4, 4, 4, 4)),
        vision.RandomHorizontalFlip(prob=0.5)
    ]
trans += [
    vision.Resize(resize),
    vision.Rescale(1.0 / 255.0, 0.0),
    vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
    vision.HWC2CHW()
]
target_trans = transforms.TypeCast(mstype.int32)

根据用途(训练或测试)定义图像变换操作,包括随机裁剪、随机水平翻转、调整大小、归一化和通道变换。

  1. 应用图像和标签变换
data_set = data_set.map(operations=trans,
                        input_columns='image',
                        num_parallel_workers=workers)
data_set = data_set.map(operations=target_trans,
                        input_columns='label',
                        num_parallel_workers=workers)

对图像数据应用定义的变换操作,对标签数据进行类型转换。

  1. 批量操作
data_set = data_set.batch(batch_size)

将数据集按批量大小分批处理。

  1. 获取训练和测试数据集
dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
                                       usage="train",
                                       resize=image_size,
                                       batch_size=batch_size,
                                       workers=workers)
step_size_train = dataset_train.get_dataset_size()

dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
                                     usage="test",
                                     resize=image_size,
                                     batch_size=batch_size,
                                     workers=workers)
step_size_val = dataset_val.get_dataset_size()

调用create_dataset_cifar10函数,获取处理后的训练和测试数据集,并获取数据集大小。
对CIFAR-10训练数据集进行可视化。

import matplotlib.pyplot as plt  # 导入matplotlib库用于绘图
import numpy as np  # 导入numpy库用于数值计算

data_iter = next(dataset_train.create_dict_iterator())  # 从训练数据集中获取下一个批次的数据

images = data_iter["image"].asnumpy()  # 将图像数据转换为numpy数组
labels = data_iter["label"].asnumpy()  # 将标签数据转换为numpy数组
print(f"Image shape: {images.shape}, Label shape: {labels.shape}")  # 打印图像和标签的形状

# 训练数据集中,前六张图片所对应的标签
print(f"Labels: {labels[:6]}")  # 打印前六张图片的标签

classes = []  # 存储类别名称的列表

with open(data_dir + "/batches.meta.txt", "r") as f:  # 打开类别元数据文件
    for line in f:
        line = line.rstrip()  # 去除行尾的空白字符
        if line:
            classes.append(line)  # 将类别名称添加到列表中

# 训练数据集的前六张图片
plt.figure()  # 创建一个新的图形
for i in range(6):
    plt.subplot(2, 3, i + 1)  # 创建子图
    image_trans = np.transpose(images[i], (1, 2, 0))  # 将图像从CHW格式转换为HWC格式
    mean = np.array([0.4914, 0.4822, 0.4465])  # 图像的均值
    std = np.array([0.2023, 0.1994, 0.2010])  # 图像的标准差
    image_trans = std * image_trans + mean  # 反标准化图像
    image_trans = np.clip(image_trans, 0, 1)  # 将像素值限制在0到1之间
    plt.title(f"{classes[labels[i]]}")  # 设置子图标题为对应的类别名称
    plt.imshow(image_trans)  # 显示图像
    plt.axis("off")  # 关闭坐标轴
plt.show()  # 显示图形
  1. 导入库
import matplotlib.pyplot as plt
import numpy as np

导入matplotlib.pyplot用于绘图,导入numpy用于数值计算。

  1. 获取数据
data_iter = next(dataset_train.create_dict_iterator())

从训练数据集中获取下一个批次的数据。

  1. 转换数据格式
images = data_iter["image"].asnumpy()
labels = data_iter["label"].asnumpy()

将图像和标签数据转换为numpy数组。

  1. 打印数据形状和标签
print(f"Image shape: {images.shape}, Label shape: {labels.shape}")
print(f"Labels: {labels[:6]}")

打印图像和标签的形状,以及前六张图片的标签。

  1. 读取类别名称
classes = []
with open(data_dir + "/batches.meta.txt", "r") as f:
    for line in f:
        line = line.rstrip()
        if line:
            classes.append(line)

从类别元数据文件中读取类别名称并存储在列表中。

  1. 绘制图像
plt.figure()
for i in range(6):
    plt.subplot(2, 3, i + 1)
    image_trans = np.transpose(images[i], (1, 2, 0))
    mean = np.array([0.4914, 0.4822, 0.4465])
    std = np.array([0.2023, 0.1994, 0.2010])
    image_trans = std * image_trans + mean
    image_trans = np.clip(image_trans, 0, 1)
    plt.title(f"{classes[labels[i]]}")
    plt.imshow(image_trans)
    plt.axis("off")
plt.show()

创建一个图形,并在其中绘制前六张图片。每张图片经过反标准化处理后显示,并设置对应的类别名称作为标题。最后显示图形。

构建网络

残差网络结构(Residual Network)是ResNet网络的主要亮点,ResNet使用残差网络结构后可有效地减轻退化问题,实现更深的网络结构设计,提高网络的训练精度。本节首先讲述如何构建残差网络结构,然后通过堆叠残差网络来构建ResNet50网络。

构建残差网络结构

残差网络结构图如下图所示,残差网络由两个分支构成:一个主分支,一个shortcuts(图中弧线表示)。主分支通过堆叠一系列的卷积操作得到,shortcuts从输入直接到输出,主分支输出的特征矩阵F(x)𝐹(𝑥)加上shortcuts输出的特征矩阵x𝑥得到F(x)+x𝐹(𝑥)+𝑥,通过Relu激活函数后即为残差网络最后的输出。

残差网络结构主要由两种,一种是Building Block,适用于较浅的ResNet网络,如ResNet18和ResNet34;另一种是Bottleneck,适用于层数较深的ResNet网络,如ResNet50、ResNet101和ResNet152。

Building Block

Building Block结构图如下图所示,主分支有两层卷积网络结构:

  • 主分支第一层网络以输入channel为64为例,首先通过一个3×33×3的卷积层,然后通过Batch Normalization层,最后通过Relu激活函数层,输出channel为64;
  • 主分支第二层网络的输入channel为64,首先通过一个3×33×3的卷积层,然后通过Batch Normalization层,输出channel为64。

最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Building Block最后的输出。

主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为1×11×1的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的stride为2,主分支第一层卷积操作的stride也需设置为2。
如下代码定义ResidualBlockBase类实现Building Block结构。

from typing import Type, Union, List, Optional  # 导入类型提示相关模块
import mindspore.nn as nn  # 导入MindSpore的神经网络模块
from mindspore.common.initializer import Normal  # 导入MindSpore的初始化器

# 初始化卷积层与BatchNorm的参数
weight_init = Normal(mean=0, sigma=0.02)  # 卷积层权重初始化
gamma_init = Normal(mean=1, sigma=0.02)  # BatchNorm层的gamma参数初始化

class ResidualBlockBase(nn.Cell):  # 定义残差块基础类,继承自MindSpore的nn.Cell
    expansion: int = 1  # 最后一个卷积核数量与第一个卷积核数量相等

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, norm: Optional[nn.Cell] = None,
                 down_sample: Optional[nn.Cell] = None) -> None:
        """
        初始化残差块基础类

        参数:
        in_channel: int, 输入通道数
        out_channel: int, 输出通道数
        stride: int, 步幅大小
        norm: Optional[nn.Cell], 可选的归一化层
        down_sample: Optional[nn.Cell], 可选的下采样层
        """
        super(ResidualBlockBase, self).__init__()
        if not norm:
            self.norm = nn.BatchNorm2d(out_channel)  # 如果没有指定归一化层,使用BatchNorm2d
        else:
            self.norm = norm

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)  # 第一个卷积层
        self.conv2 = nn.Conv2d(out_channel, out_channel,
                               kernel_size=3, weight_init=weight_init)  # 第二个卷积层
        self.relu = nn.ReLU()  # ReLU激活函数
        self.down_sample = down_sample  # 下采样层

    def construct(self, x):
        """
        前向传播函数

        参数:
        x: 输入张量

        返回:
        out: 输出张量
        """
        identity = x  # shortcuts分支

        out = self.conv1(x)  # 主分支第一层:3*3卷积层
        out = self.norm(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层:3*3卷积层
        out = self.norm(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)  # 下采样操作
        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out
  1. 导入模块
from typing import Type, Union, List, Optional
import mindspore.nn as nn
from mindspore.common.initializer import Normal

导入了必要的模块和函数,其中包括类型提示相关模块、MindSpore的神经网络模块以及初始化器。

  1. 初始化参数
weight_init = Normal(mean=0, sigma=0.02)
gamma_init = Normal(mean=1, sigma=0.02)

定义了卷积层权重初始化和BatchNorm层的gamma参数初始化。

  1. 定义残差块基础类
class ResidualBlockBase(nn.Cell):
    expansion: int = 1

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, norm: Optional[nn.Cell] = None,
                 down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlockBase, self).__init__()
        if not norm:
            self.norm = nn.BatchNorm2d(out_channel)
        else:
            self.norm = norm

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)
        self.conv2 = nn.Conv2d(out_channel, out_channel,
                               kernel_size=3, weight_init=weight_init)
        self.relu = nn.ReLU()
        self.down_sample = down_sample

定义了一个残差块基础类,包含初始化函数和前向传播函数。

  1. 前向传播函数
def construct(self, x):
    identity = x

    out = self.conv1(x)
    out = self.norm(out)
    out = self.relu(out)
    out = self.conv2(out)
    out = self.norm(out)

    if self.down_sample is not None:
        identity = self.down_sample(x)
    out += identity
    out = self.relu(out)

    return out

定义了前向传播函数construct,该函数包括主分支和shortcuts分支,最后将主分支输出与shortcuts输出相加并通过ReLU激活函数。

Bottleneck

Bottleneck结构图如下图所示,在输入相同的情况下Bottleneck结构相对Building Block结构的参数数量更少,更适合层数较深的网络,ResNet50使用的残差结构就是Bottleneck。该结构的主分支有三层卷积结构,分别为1×11×1的卷积层、3×33×3卷积层和1×11×1的卷积层,其中1×11×1的卷积层分别起降维和升维的作用。

  • 主分支第一层网络以输入channel为256为例,首先通过数量为64,大小为1×11×1的卷积核进行降维,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
  • 主分支第二层网络通过数量为64,大小为3×33×3的卷积核提取特征,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
  • 主分支第三层通过数量为256,大小1×11×1的卷积核进行升维,然后通过Batch Normalization层,其输出channel为256。

最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Bottleneck最后的输出。

主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为1×11×1的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的stride为2,主分支第二层卷积操作的stride也需设置为2。
如下代码定义ResidualBlock类实现Bottleneck结构。

class ResidualBlock(nn.Cell):
    expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
        """
        初始化残差块类
        
        参数:
        in_channel: int, 输入通道数
        out_channel: int, 输出通道数
        stride: int, 步幅大小
        down_sample: Optional[nn.Cell], 可选的下采样层
        """
        super(ResidualBlock, self).__init__()

        # 主分支的卷积层和归一化层
        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=1, weight_init=weight_init)  # 1*1 卷积层
        self.norm1 = nn.BatchNorm2d(out_channel)  # 归一化层

        self.conv2 = nn.Conv2d(out_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)  # 3*3 卷积层
        self.norm2 = nn.BatchNorm2d(out_channel)  # 归一化层

        self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
                               kernel_size=1, weight_init=weight_init)  # 1*1 卷积层
        self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)  # 归一化层

        self.relu = nn.ReLU()  # ReLU激活函数
        self.down_sample = down_sample  # 下采样层

    def construct(self, x):
        """
        前向传播函数
        
        参数:
        x: 输入张量
        
        返回:
        out: 输出张量
        """
        identity = x  # shortcuts分支

        # 主分支第一层:1*1卷积层
        out = self.conv1(x)
        out = self.norm1(out)
        out = self.relu(out)

        # 主分支第二层:3*3卷积层
        out = self.conv2(out)
        out = self.norm2(out)
        out = self.relu(out)

        # 主分支第三层:1*1卷积层
        out = self.conv3(out)
        out = self.norm3(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)  # 对shortcuts分支进行下采样

        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)  # 通过ReLU激活函数

        return out  # 返回输出张量
  1. 定义残差块类
class ResidualBlock(nn.Cell):
    expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlock, self).__init__()
        ...

定义一个残差块类ResidualBlock,继承自MindSpore的nn.Cell。该类的expansion属性设置为4,表示输出通道数是输入通道数的4倍。

  1. 初始化函数
def __init__(self, in_channel: int, out_channel: int,
             stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
    super(ResidualBlock, self).__init__()

    # 主分支的卷积层和归一化层
    self.conv1 = nn.Conv2d(in_channel, out_channel,
                           kernel_size=1, weight_init=weight_init)
    self.norm1 = nn.BatchNorm2d(out_channel)

    self.conv2 = nn.Conv2d(out_channel, out_channel,
                           kernel_size=3, stride=stride,
                           weight_init=weight_init)
    self.norm2 = nn.BatchNorm2d(out_channel)

    self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
                           kernel_size=1, weight_init=weight_init)
    self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)

    self.relu = nn.ReLU()
    self.down_sample = down_sample

初始化函数定义了主分支的卷积层和归一化层,分别是一个1x1卷积层、一个3x3卷积层和一个1x1卷积层,并且使用ReLU激活函数。down_sample参数用于处理短连接分支。

  1. 前向传播函数
def construct(self, x):
    identity = x  # shortcuts分支

    # 主分支第一层:1*1卷积层
    out = self.conv1(x)
    out = self.norm1(out)
    out = self.relu(out)

    # 主分支第二层:3*3卷积层
    out = self.conv2(out)
    out = self.norm2(out)
    out = self.relu(out)

    # 主分支第三层:1*1卷积层
    out = self.conv3(out)
    out = self.norm3(out)

    if self.down_sample is not None:
        identity = self.down_sample(x)  # 对shortcuts分支进行下采样

    out += identity  # 输出为主分支与shortcuts之和
    out = self.relu(out)  # 通过ReLU激活函数

    return out  # 返回输出张量

前向传播函数construct包括主分支和shortcuts分支。主分支依次经过三个卷积层和归一化层,再通过ReLU激活函数。如果down_sample不为空,则对shortcuts分支进行下采样。最后将主分支和shortcuts分支相加,并通过ReLU激活函数,返回最终输出。

构建ResNet50网络

ResNet网络层结构如下图所示,以输入彩色图像224×224为例,首先通过数量64,卷积核大小为7×7,stride为2的卷积层conv1,该层输出图片大小为112×112,输出channel为64;然后通过一个3×3的最大下采样池化层,该层输出图片大小为56×56,输出channel为64;再堆叠4个残差网络块(conv2_x、conv3_x、conv4_x和conv5_x),此时输出图片大小为7×7,输出channel为2048;最后通过一个平均池化层、全连接层和softmax,得到分类概率。

对于每个残差网络块,以ResNet50网络中的conv2_x为例,其由3个Bottleneck结构堆叠而成,每个Bottleneck输入的channel为64,输出channel为256。
如下示例定义make_layer实现残差块的构建,其参数如下所示:

  • last_out_channel:上一个残差网络输出的通道数。
  • block:残差网络的类别,分别为ResidualBlockBaseResidualBlock
  • channel:残差网络输入的通道数。
  • block_nums:残差网络块堆叠的个数。
  • stride:卷积移动的步幅。
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
               channel: int, block_nums: int, stride: int = 1):
    """
    构建残差层函数

    参数:
    last_out_channel: int, 上一层的输出通道数
    block: Type[Union[ResidualBlockBase, ResidualBlock]], 残差块类型,可以是ResidualBlockBase或ResidualBlock
    channel: int, 当前层的通道数
    block_nums: int, 残差块的数量
    stride: int, 步幅大小

    返回:
    nn.SequentialCell, 包含多个残差块的顺序层
    """
    down_sample = None  # shortcuts分支

    if stride != 1 or last_out_channel != channel * block.expansion:
        down_sample = nn.SequentialCell([
            nn.Conv2d(last_out_channel, channel * block.expansion,
                      kernel_size=1, stride=stride, weight_init=weight_init),
            nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
        ])
        # 如果步幅不等于1或者输入输出通道数不匹配,通过1x1卷积和BatchNorm实现下采样

    layers = []
    layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
    # 添加第一个残差块,并进行下采样(如果需要)

    in_channel = channel * block.expansion
    # 更新输入通道数为扩展后的通道数

    for _ in range(1, block_nums):
        layers.append(block(in_channel, channel))
        # 堆叠剩余的残差块

    return nn.SequentialCell(layers)
    # 返回包含多个残差块的顺序层
  1. 函数定义
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
               channel: int, block_nums: int, stride: int = 1):

定义了一个函数make_layer,用于构建包含多个残差块的层。参数包括上一层的输出通道数、残差块类型(可以是ResidualBlockBaseResidualBlock)、当前层的通道数、残差块的数量和步幅大小。

  1. 初始化down_sample
down_sample = None  # shortcuts分支

if stride != 1或 last_out_channel != channel * block.expansion:
    down_sample = nn.SequentialCell([
        nn.Conv2d(last_out_channel, channel * block.expansion,
                  kernel_size=1, stride=stride, weight_init=weight_init),
        nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
    ])

初始化down_sampleNone。如果步幅不等于1或者输入通道数和输出通道数不匹配,通过1x1卷积和BatchNorm实现下采样,确保输入和输出尺寸匹配。

  1. 构建残差块层
layers = []
layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))

创建一个列表layers用于存储残差块。首先添加一个残差块,并进行下采样(如果需要)。

  1. 更新输入通道数
in_channel = channel * block.expansion

更新输入通道数为扩展后的通道数。

  1. 堆叠残差块
for _ in range(1, block_nums):
    layers.append(block(in_channel, channel))

使用循环堆叠剩余数量的残差块,构建完整的残差层。

  1. 返回顺序层
return nn.SequentialCell(layers)

返回一个包含多个残差块的顺序层nn.SequentialCell。这个顺序层可以作为一个整体应用于输入张量上,实现残差网络的多层结构。

ResNet50网络共有5个卷积结构,一个平均池化层,一个全连接层,以CIFAR-10数据集为例:

  • conv1:输入图片大小为32×3232×32,输入channel为3。首先经过一个卷积核数量为64,卷积核大小为7×77×7,stride为2的卷积层;然后通过一个Batch Normalization层;最后通过Reul激活函数。该层输出feature map大小为16×1616×16,输出channel为64。
  • conv2_x:输入feature map大小为16×1616×16,输入channel为64。首先经过一个卷积核大小为3×33×3,stride为2的最大下采样池化操作;然后堆叠3个[1×1,64;3×3,64;1×1,256][1×1,64;3×3,64;1×1,256]结构的Bottleneck。该层输出feature map大小为8×88×8,输出channel为256。
  • conv3_x:输入feature map大小为8×88×8,输入channel为256。该层堆叠4个[1×1,128;3×3,128;1×1,512]结构的Bottleneck。该层输出feature map大小为4×44×4,输出channel为512。
  • conv4_x:输入feature map大小为4×44×4,输入channel为512。该层堆叠6个[1×1,256;3×3,256;1×1,1024]结构的Bottleneck。该层输出feature map大小为2×22×2,输出channel为1024。
  • conv5_x:输入feature map大小为2×22×2,输入channel为1024。该层堆叠3个[1×1,512;3×3,512;1×1,2048]结构的Bottleneck。该层输出feature map大小为1×11×1,输出channel为2048。
  • average pool & fc:输入channel为2048,输出channel为分类的类别数。

如下示例代码实现ResNet50模型的构建,通过用调函数resnet50即可构建ResNet50模型,函数resnet50参数如下:

  • num_classes:分类的类别数,默认类别数为1000。
  • pretrained:下载对应的训练模型,并加载预训练模型中的参数到网络中。
from mindspore import load_checkpoint, load_param_into_net

class ResNet(nn.Cell):
    def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                 layer_nums: List[int], num_classes: int, input_channel: int) -> None:
        """
        初始化ResNet模型

        参数:
        block: Type[Union[ResidualBlockBase, ResidualBlock]], 残差块类型
        layer_nums: List[int], 每个残差层的残差块数量
        num_classes: int, 分类数
        input_channel: int, 输入通道数
        """
        super(ResNet, self).__init__()

        self.relu = nn.ReLU()
        # 第一个卷积层,输入channel为3(彩色图像),输出channel为64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)
        self.norm = nn.BatchNorm2d(64)
        # 最大池化层,缩小图片的尺寸
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
        # 各个残差网络结构块定义
        self.layer1 = make_layer(64, block, 64, layer_nums[0])
        self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
        self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
        self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
        # 平均池化层
        self.avg_pool = nn.AvgPool2d()
        # flattern层
        self.flatten = nn.Flatten()
        # 全连接层
        self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)

    def construct(self, x):
        """
        前向传播函数

        参数:
        x: 输入张量

        返回:
        x: 输出张量
        """
        x = self.conv1(x)
        x = self.norm(x)
        x = self.relu(x)
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.fc(x)

        return x
  1. 导入模块
from mindspore import load_checkpoint, load_param_into_net

导入MindSpore的检查点加载和参数加载功能。

  1. 定义ResNet类
class ResNet(nn.Cell):
    def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                 layer_nums: List[int], num_classes: int, input_channel: int) -> None:
        super(ResNet, self).__init__()
        ...

定义ResNet类,继承自MindSpore的nn.Cell。初始化函数接受残差块类型、每个残差层的残差块数量、分类数和输入通道数作为参数。

  1. 初始化网络层
self.relu = nn.ReLU()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)
self.norm = nn.BatchNorm2d(64)
self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
self.layer1 = make_layer(64, block, 64, layer_nums[0])
self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
self.avg_pool = nn.AvgPool2d()
self.flatten = nn.Flatten()
self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)

初始化网络的各个层,包括卷积层、归一化层、池化层、残差层、平均池化层、展平层和全连接层。

  1. 前向传播函数
def construct(self, x):
    x = self.conv1(x)
    x = self.norm(x)
    x = self.relu(x)
    x = self.max_pool(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = self.avg_pool(x)
    x = self.flatten(x)
    x = self.fc(x)

    return x

定义前向传播函数construct,依次通过各个层,最终输出分类结果。

def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
            layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
            input_channel: int):
    """
    构建ResNet模型,并根据需要加载预训练模型

    参数:
    model_url: str, 预训练模型的下载地址
    block: Type[Union[ResidualBlockBase, ResidualBlock]], 残差块类型
    layers: List[int], 每个残差层的残差块数量
    num_classes: int, 分类数
    pretrained: bool, 是否加载预训练模型
    pretrained_ckpt: str, 预训练模型的本地存储路径
    input_channel: int, 输入通道数

    返回:
    model: ResNet模型
    """
    model = ResNet(block, layers, num_classes, input_channel)

    if pretrained:
        # 加载预训练模型
        download(url=model_url, path=pretrained_ckpt, replace=True)
        param_dict = load_checkpoint(pretrained_ckpt)
        load_param_into_net(model, param_dict)

    return model

def resnet50(num_classes: int = 1000, pretrained: bool = False):
    """ResNet50模型

    参数:
    num_classes: int, 分类数,默认为1000
    pretrained: bool, 是否加载预训练模型,默认为False

    返回:
    ResNet50模型
    """
    resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
    resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
    return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
                   pretrained, resnet50_ckpt, 2048)
  1. 定义 _resnet 函数
def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
            layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
            input_channel: int):

定义一个内部函数 _resnet,用于构建 ResNet 模型,并根据需要加载预训练模型。参数包括预训练模型的下载地址、残差块类型、每个残差层的残差块数量、分类数、是否加载预训练模型、预训练模型的本地存储路径和输入通道数。

  1. 初始化 ResNet 模型
model = ResNet(block, layers, num_classes, input_channel)

使用给定的参数初始化 ResNet 模型。

  1. 加载预训练模型
if pretrained:
    # 加载预训练模型
    download(url=model_url, path=pretrained_ckpt, replace=True)
    param_dict = load_checkpoint(pretrained_ckpt)
    load_param_into_net(model, param_dict)

如果 pretrained 参数为 True,则下载预训练模型,并加载到当前模型中。

  1. 返回模型
return model

返回构建好的 ResNet 模型。

  1. 定义 resnet50 函数
def resnet50(num_classes: int = 1000, pretrained: bool = False):
    """ResNet50模型

    参数:
    num_classes: int, 分类数,默认为1000
    pretrained: bool, 是否加载预训练模型,默认为False

    返回:
    ResNet50模型
    """
    resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
    resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
    return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
                   pretrained, resnet50_ckpt, 2048)

定义 resnet50 函数,用于构建 ResNet-50 模型。默认分类数为 1000,是否加载预训练模型默认为 False。构建 ResNet-50 模型时,调用 _resnet 函数,并传入 ResNet-50 相应的参数,包括预训练模型的下载地址和本地存储路径。

模型训练与评估

本节使用ResNet50预训练模型进行微调。调用resnet50构造ResNet50模型,并设置pretrained参数为True,将会自动下载ResNet50预训练模型,并加载预训练模型中的参数到网络中。然后定义优化器和损失函数,逐个epoch打印训练的损失值和评估精度,并保存评估精度最高的ckpt文件(resnet50-best.ckpt)到当前路径的./BestCheckPoint下。
由于预训练模型全连接层(fc)的输出大小(对应参数num_classes)为1000, 为了成功加载预训练权重,我们将模型的全连接输出大小设置为默认的1000。CIFAR10数据集共有10个分类,在使用该数据集进行训练时,需要将加载好预训练权重的模型全连接层输出大小重置为10。
此处我们展示了5个epochs的训练过程,如果想要达到理想的训练效果,建议训练80个epochs。

# 定义ResNet50网络
network = resnet50(pretrained=True)

# 全连接层输入层的大小
in_channel = network.fc.in_channels
fc = nn.Dense(in_channels=in_channel, out_channels=10)
# 重置全连接层
network.fc = fc
  1. 定义ResNet50网络
network = resnet50(pretrained=True)

调用 resnet50 函数,并设置 pretrained=True,以加载预训练的 ResNet-50 模型。

  1. 获取全连接层输入层的大小
in_channel = network.fc.in_channels

获取当前 ResNet-50 模型中全连接层(fc)的输入通道数(in_channels)。

  1. 定义新的全连接层
fc = nn.Dense(in_channels=in_channel, out_channels=10)

创建一个新的全连接层(nn.Dense),输入通道数为 in_channel,输出通道数为 10(假设分类数为 10)。

  1. 重置全连接层
network.fc = fc

将 ResNet-50 模型中的全连接层替换为新定义的全连接层。
通过这些步骤,你可以加载预训练的 ResNet-50 模型,并根据需要调整其全连接层以适应新的分类任务。

# 设置学习率
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
                        step_per_epoch=step_size_train, decay_epoch=num_epochs)
# 定义优化器和损失函数
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')

def forward_fn(inputs, targets):
    logits = network(inputs)
    loss = loss_fn(logits, targets)
    return loss

grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)

def train_step(inputs, targets):
    loss, grads = grad_fn(inputs, targets)
    opt(grads)
    return loss
  1. 设置学习率
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
                        step_per_epoch=step_size_train, decay_epoch=num_epochs)

使用余弦退火学习率调度器(nn.cosine_decay_lr)来设置学习率。min_lrmax_lr 分别是学习率的最小值和最大值,total_step 是总的训练步数,step_per_epoch 是每个 epoch 的步数,decay_epoch 是退火的 epoch 数。

  1. 定义优化器和损失函数
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')

定义优化器(nn.Momentum)和损失函数(nn.SoftmaxCrossEntropyWithLogits)。优化器使用动量法,学习率为 lr,动量参数为 0.9。损失函数为稀疏的 softmax 交叉熵损失,结果取平均值。

  1. 定义前向传播函数
def forward_fn(inputs, targets):
    logits = network(inputs)
    loss = loss_fn(logits, targets)
    return loss

定义前向传播函数 forward_fn,输入为数据和标签,输出为损失值。

  1. 定义梯度计算函数
grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)

使用 MindSpore 的 value_and_grad 函数来定义梯度计算函数 grad_fn,该函数会计算前向传播的损失值和对应的梯度。

  1. 定义训练步骤函数
def train_step(inputs, targets):
    loss, grads = grad_fn(inputs, targets)
    opt(grads)
    return loss

定义训练步骤函数 train_step,输入为数据和标签,输出为损失值。该函数会计算损失和梯度,并使用优化器更新模型参数。
通过这些步骤,你可以设置学习率调度器、定义优化器和损失函数,并实现模型的训练步骤。

import os

# 创建迭代器
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)

# 最佳模型存储路径
best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"

if not os.path.exists(best_ckpt_dir):
    os.mkdir(best_ckpt_dir)
  1. 创建迭代器
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)

使用 create_tuple_iterator 方法为训练集和验证集创建迭代器,num_epochs 参数指定迭代器在训练过程中迭代的 epoch 数。

  1. 最佳模型存储路径
best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"

定义变量 best_acc 用于记录最佳准确率,best_ckpt_dirbest_ckpt_path 用于指定最佳模型的存储目录和路径。

  1. 检查并创建目录
if not os.path.exists(best_ckpt_dir):
    os.mkdir(best_ckpt_dir)

检查 best_ckpt_dir 目录是否存在,如果不存在则使用 os.mkdir 方法创建该目录。
通过这些步骤,你可以为训练集和验证集创建迭代器,并设置最佳模型的存储路径和目录。

import mindspore.ops as ops

def train(data_loader, epoch):
    """模型训练"""
    losses = []
    network.set_train(True)

    for i, (images, labels) in enumerate(data_loader):
        loss = train_step(images, labels)
        if i % 100 == 0 or i == step_size_train - 1:
            print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
                  (epoch + 1, num_epochs, i + 1, step_size_train, loss))
        losses.append(loss)

    return sum(losses) / len(losses)

def evaluate(data_loader):
    """模型验证"""
    network.set_train(False)

    correct_num = 0.0  # 预测正确个数
    total_num = 0.0  # 预测总数

    for images, labels in data_loader:
        logits = network(images)
        pred = logits.argmax(axis=1)  # 预测结果
        correct = ops.equal(pred, labels).reshape((-1,))
        correct_num += correct.sum().asnumpy()
        total_num += correct.shape[0]

    acc = correct_num / total_num  # 准确率

    return acc
  1. 导入必要的库
import mindspore.ops as ops

导入 MindSpore 的操作函数库 ops,用于模型的验证过程。

  1. **训练函数 **train
def train(data_loader, epoch):
    """模型训练"""
    losses = []
    network.set_train(True)

    for i, (images, labels) in enumerate(data_loader):
        loss = train_step(images, labels)
        if i % 100 == 0 or i == step_size_train - 1:
            print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
                  (epoch + 1, num_epochs, i + 1, step_size_train, loss))
        losses.append(loss)

    return sum(losses) / len(losses)
  • 模型设置为训练模式
network.set_train(True)

设置模型为训练模式。

  • 训练数据迭代
for i, (images, labels) in enumerate(data_loader):
    loss = train_step(images, labels)
    if i % 100 == 0 or i == step_size_train - 1:
        print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
              (epoch + 1, num_epochs, i + 1, step_size_train, loss))
    losses.append(loss)

使用训练数据迭代器 data_loader 进行迭代训练,每完成100个步骤或最后一个步骤时打印当前的训练损失。

  • 返回平均损失
return sum(losses) / len(losses)

计算并返回本 epoch 的平均损失。

  1. **验证函数 **evaluate
def evaluate(data_loader):
    """模型验证"""
    network.set_train(False)

    correct_num = 0.0
    total_num = 0.0

    for images, labels in data_loader:
        logits = network(images)
        pred = logits.argmax(axis=1)
        correct = ops.equal(pred, labels).reshape((-1,))
        correct_num += correct.sum().asnumpy()
        total_num += correct.shape[0]

    acc = correct_num / total_num

    return acc
  • 模型设置为评估模式
network.set_train(False)

设置模型为评估模式。

  • 验证数据迭代
correct_num = 0.0
total_num = 0.0

for images, labels in data_loader:
    logits = network(images)
    pred = logits.argmax(axis=1)
    correct = ops.equal(pred, labels).reshape((-1,))
    correct_num += correct.sum().asnumpy()
    total_num += correct.shape[0]

使用验证数据迭代器 data_loader 进行迭代验证。计算预测结果 pred 和真实标签 labels 是否相等,对应正确预测的个数累加到 correct_num,累加总的样本数到 total_num

  • 计算并返回准确率
acc = correct_num / total_num
return acc

计算并返回准确率 acc
通过这些步骤,你可以实现模型的训练和验证过程。训练函数 train 用于模型的训练,每个 epoch 打印部分训练损失。验证函数 evaluate 用于模型的评估,返回评估准确率。

# 开始循环训练
print("Start Training Loop ...")

for epoch in range(num_epochs):
    curr_loss = train(data_loader_train, epoch)
    curr_acc = evaluate(data_loader_val)

    print("-" * 50)
    print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
        epoch + 1, num_epochs, curr_loss, curr_acc
    ))
    print("-" * 50)

    # 保存当前预测准确率最高的模型
    if curr_acc > best_acc:
        best_acc = curr_acc
        ms.save_checkpoint(network, best_ckpt_path)

print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
      f"save the best ckpt file in {best_ckpt_path}", flush=True)
  1. 开始训练循环
print("Start Training Loop ...")

打印开始训练循环的提示信息。

  1. 训练和验证循环
for epoch in range(num_epochs):
    curr_loss = train(data_loader_train, epoch)
    curr_acc = evaluate(data_loader_val)

    print("-" * 50)
    print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
        epoch + 1, num_epochs, curr_loss, curr_acc
    ))
    print("-" * 50)
  • 使用 range(num_epochs) 创建一个循环,迭代指定的 epoch 数。
  • 每个 epoch 调用 train 函数进行训练,并调用 evaluate 函数进行验证。
  • 打印当前 epoch 的平均训练损失和验证准确率。
  1. 保存最佳模型
if curr_acc > best_acc:
    best_acc = curr_acc
    ms.save_checkpoint(network, best_ckpt_path)
  • 如果当前 epoch 的验证准确率 curr_acc 高于之前记录的最佳准确率 best_acc,更新 best_acc
  • 使用 ms.save_checkpoint 函数保存当前的模型参数到 best_ckpt_path
  1. 结束训练循环
print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
      f"save the best ckpt file in {best_ckpt_path}", flush=True)
  • 打印提示信息,显示所有 epoch 训练完成后的最佳准确率 best_acc,以及保存的最佳模型文件路径 best_ckpt_path
  • flush=True 确保立即刷新输出缓冲区,将信息打印到控制台。

通过这些步骤,你可以实现模型的训练和验证循环。每个 epoch 完成后,打印训练损失和验证准确率,并保存最佳准确率的模型文件。

可视化模型预测
定义visualize_model函数,使用上述验证精度最高的模型对CIFAR-10测试数据集进行预测,并将预测结果可视化。若预测字体颜色为蓝色表示为预测正确,预测字体颜色为红色则表示预测错误。
由上面的结果可知,5个epochs下模型在验证数据集的预测准确率在70%左右,即一般情况下,6张图片中会有2张预测失败。如果想要达到理想的训练效果,建议训练80个epochs。

import matplotlib.pyplot as plt
import mindspore as ms
import numpy as np
from resnet import resnet50  # 确保已定义或导入resnet50模型

def visualize_model(best_ckpt_path, dataset_val):
    num_class = 10  # 对10类图像进行分类
    # 初始化模型
    net = resnet50(num_class)
    # 加载模型参数
    param_dict = ms.load_checkpoint(best_ckpt_path)
    ms.load_param_into_net(net, param_dict)
    
    # 加载验证集的数据进行验证
    data = next(dataset_val.create_dict_iterator())
    images = data["image"]
    labels = data["label"]
    
    # 预测图像类别
    output = net(images)
    pred = np.argmax(output.asnumpy(), axis=1)

    # 图像分类标签
    classes = []

    with open(data_dir + "/batches.meta.txt", "r") as f:
        for line in f:
            line = line.rstrip()
            if line:
                classes.append(line)

    # 显示图像及图像的预测值
    plt.figure()
    for i in range(6):
        plt.subplot(2, 3, i + 1)
        # 若预测正确,显示为蓝色;若预测错误,显示为红色
        color = 'blue' if pred[i] == labels.asnumpy()[i] else 'red'
        plt.title('predict:{}'.format(classes[pred[i]]), color=color)
        picture_show = np.transpose(images.asnumpy()[i], (1, 2, 0))
        mean = np.array([0.4914, 0.4822, 0.4465])
        std = np.array([0.2023, 0.1994, 0.2010])
        picture_show = std * picture_show + mean
        picture_show = np.clip(picture_show, 0, 1)
        plt.imshow(picture_show)
        plt.axis('off')

    plt.show()

# 使用测试数据集进行验证
visualize_model(best_ckpt_path=best_ckpt_path, dataset_val=dataset_val)
  1. 导入必要的库
import matplotlib.pyplot as plt
import mindspore as ms
import numpy as np
from resnet import resnet50  # 确保已定义或导入resnet50模型

导入 matplotlib.pyplot 用于绘图,mindspore 用于加载和使用模型,numpy 进行数值计算,resnet50 模型用于图像分类。

  1. **定义模型可视化函数 **visualize_model
def visualize_model(best_ckpt_path, dataset_val):
    num_class = 10  # 对10类图像进行分类
    net = resnet50(num_class)
    param_dict = ms.load_checkpoint(best_ckpt_path)
    ms.load_param_into_net(net, param_dict)
  • 定义函数 visualize_model,参数包括最佳模型路径 best_ckpt_path 和验证数据集 dataset_val
  • 初始化 resnet50 模型,并加载指定路径的模型参数。
  1. 加载验证集数据
data = next(dataset_val.create_dict_iterator())
images = data["image"]
labels = data["label"]

使用验证数据集的迭代器获取数据,包括图像和标签。

  1. 预测图像类别
output = net(images)
pred = np.argmax(output.asnumpy(), axis=1)

使用模型对图像进行预测,得到输出后,使用 argmax 获取预测的类别。

  1. 读取图像分类标签
classes = []

with open(data_dir + "/batches.meta.txt", "r") as f:
    for line in f:
        line = line.rstrip()
        if line:
            classes.append(line)

从文件中读取分类标签,存储在 classes 列表中。

  1. 显示图像及其预测结果
plt.figure()
for i in range(6):
    plt.subplot(2, 3, i + 1)
    color = 'blue' if pred[i] == labels.asnumpy()[i] else 'red'
    plt.title('predict:{}'.format(classes[pred[i]]), color=color)
    picture_show = np.transpose(images.asnumpy()[i], (1, 2, 0))
    mean = np.array([0.4914, 0.4822, 0.4465])
    std = np.array([0.2023, 0.1994, 0.2010])
    picture_show = std * picture_show + mean
    picture_show = np.clip(picture_show, 0, 1)
    plt.imshow(picture_show)
    plt.axis('off')

plt.show()
  • 创建一个图形窗口,循环显示6张图像。
  • 如果预测正确,标题颜色为蓝色;如果预测错误,标题颜色为红色。
  • 对图像进行标准化还原并显示。
  1. 调用可视化函数
visualize_model(best_ckpt_path=best_ckpt_path, dataset_val=dataset_val)

调用 visualize_model 函数,使用测试数据集进行验证。
通过这些步骤,你可以加载最佳模型参数,并可视化模型在验证集上的预测结果。每张图像将显示其预测类别及其实际类别的颜色标注。

整体代码

#!/usr/bin/env python
# coding: utf-8

# # ResNet50图像分类

# [![下载Notebook](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image14539567018573547155.png)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/r2.3/tutorials/application/zh_cn/cv/mindspore_resnet50.ipynb) [![下载样例代码](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image10706878759923307322.png)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/r2.3/tutorials/application/zh_cn/cv/mindspore_resnet50.py) [![查看源文件](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image4568957979974431533.png)](https://gitee.com/mindspore/docs/blob/r2.3/tutorials/application/source_zh_cn/cv/resnet50.ipynb)

# 图像分类是最基础的计算机视觉应用,属于有监督学习类别,如给定一张图像(猫、狗、飞机、汽车等等),判断图像所属的类别。本章将介绍使用ResNet50网络对CIFAR-10数据集进行分类。

# ## ResNet网络介绍

# ResNet50网络是2015年由微软实验室的何恺明提出,获得ILSVRC2015图像分类竞赛第一名。在ResNet网络提出之前,传统的卷积神经网络都是将一系列的卷积层和池化层堆叠得到的,但当网络堆叠到一定深度时,就会出现退化问题。下图是在CIFAR-10数据集上使用56层网络与20层网络训练误差和测试误差图,由图中数据可以看出,56层网络比20层网络训练误差和测试误差更大,随着网络的加深,其误差并没有如预想的一样减小。

# ![resnet-1](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image13235098901558598171.png)

# ResNet网络提出了残差网络结构(Residual Network)来减轻退化问题,使用ResNet网络可以实现搭建较深的网络结构(突破1000层)。论文中使用ResNet网络在CIFAR-10数据集上的训练误差与测试误差图如下图所示,图中虚线表示训练误差,实线表示测试误差。由图中数据可以看出,ResNet网络层数越深,其训练误差和测试误差越小。

# ![resnet-4](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image3983657854719650342.png)

# > 了解ResNet网络更多详细内容,参见[ResNet论文](https://arxiv.org/pdf/1512.03385.pdf)。


# ## 数据集准备与加载

# [CIFAR-10数据集](http://www.cs.toronto.edu/~kriz/cifar.html)共有60000张32*32的彩色图像,分为10个类别,每类有6000张图,数据集一共有50000张训练图片和10000张评估图片。首先,如下示例使用`download`接口下载并解压,目前仅支持解析二进制版本的CIFAR-10文件(CIFAR-10 binary version)。

# In[1]:
from download import download

url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"
download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)

# 下载后的数据集目录结构如下:
# 
# ```text
# datasets-cifar10-bin/cifar-10-batches-bin
# ├── batches.meta.text
# ├── data_batch_1.bin
# ├── data_batch_2.bin
# ├── data_batch_3.bin
# ├── data_batch_4.bin
# ├── data_batch_5.bin
# ├── readme.html
# └── test_batch.bin
# 
# ```
# 
# 然后,使用`mindspore.dataset.Cifar10Dataset`接口来加载数据集,并进行相关图像增强操作。

# In[2]:
import mindspore as ms
import mindspore.dataset as ds
import mindspore.dataset.vision as vision
import mindspore.dataset.transforms as transforms
from mindspore import dtype as mstype

data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin"  # 数据集根目录
batch_size = 256  # 批量大小
image_size = 32  # 训练图像空间大小
workers = 4  # 并行线程个数
num_classes = 10  # 分类数量


def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
    data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
                                 usage=usage,
                                 num_parallel_workers=workers,
                                 shuffle=True)

    trans = []
    if usage == "train":
        trans += [
            vision.RandomCrop((32, 32), (4, 4, 4, 4)),
            vision.RandomHorizontalFlip(prob=0.5)
        ]

    trans += [
        vision.Resize(resize),
        vision.Rescale(1.0 / 255.0, 0.0),
        vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
        vision.HWC2CHW()
    ]

    target_trans = transforms.TypeCast(mstype.int32)

    # 数据映射操作
    data_set = data_set.map(operations=trans,
                            input_columns='image',
                            num_parallel_workers=workers)

    data_set = data_set.map(operations=target_trans,
                            input_columns='label',
                            num_parallel_workers=workers)

    # 批量操作
    data_set = data_set.batch(batch_size)

    return data_set


# 获取处理后的训练与测试数据集
dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
                                       usage="train",
                                       resize=image_size,
                                       batch_size=batch_size,
                                       workers=workers)
step_size_train = dataset_train.get_dataset_size()

dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
                                     usage="test",
                                     resize=image_size,
                                     batch_size=batch_size,
                                     workers=workers)
step_size_val = dataset_val.get_dataset_size()

# 对CIFAR-10训练数据集进行可视化。

# In[3]:
import matplotlib.pyplot as plt
import numpy as np

data_iter = next(dataset_train.create_dict_iterator())

images = data_iter["image"].asnumpy()
labels = data_iter["label"].asnumpy()
print(f"Image shape: {images.shape}, Label shape: {labels.shape}")

# 训练数据集中,前六张图片所对应的标签
print(f"Labels: {labels[:6]}")

classes = []

with open(data_dir + "/batches.meta.txt", "r") as f:
    for line in f:
        line = line.rstrip()
        if line:
            classes.append(line)

# 训练数据集的前六张图片
plt.figure()
for i in range(6):
    plt.subplot(2, 3, i + 1)
    image_trans = np.transpose(images[i], (1, 2, 0))
    mean = np.array([0.4914, 0.4822, 0.4465])
    std = np.array([0.2023, 0.1994, 0.2010])
    image_trans = std * image_trans + mean
    image_trans = np.clip(image_trans, 0, 1)
    plt.title(f"{classes[labels[i]]}")
    plt.imshow(image_trans)
    plt.axis("off")
plt.show()

# ## 构建网络
# 
# 残差网络结构(Residual Network)是ResNet网络的主要亮点,ResNet使用残差网络结构后可有效地减轻退化问题,实现更深的网络结构设计,提高网络的训练精度。本节首先讲述如何构建残差网络结构,然后通过堆叠残差网络来构建ResNet50网络。
# 
# ### 构建残差网络结构
# 
# 残差网络结构图如下图所示,残差网络由两个分支构成:一个主分支,一个shortcuts(图中弧线表示)。主分支通过堆叠一系列的卷积操作得到,shortcuts从输入直接到输出,主分支输出的特征矩阵$F(x)$加上shortcuts输出的特征矩阵$x$得到$F(x)+x$,通过Relu激活函数后即为残差网络最后的输出。
# 
# ![residual](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image7215529231224791732.png)
# 
# 残差网络结构主要由两种,一种是Building Block,适用于较浅的ResNet网络,如ResNet18和ResNet34;另一种是Bottleneck,适用于层数较深的ResNet网络,如ResNet50、ResNet101和ResNet152。
# 
# #### Building Block
# 
# Building Block结构图如下图所示,主分支有两层卷积网络结构:
# 
# + 主分支第一层网络以输入channel为64为例,首先通过一个$3\times3$的卷积层,然后通过Batch Normalization层,最后通过Relu激活函数层,输出channel为64;
# + 主分支第二层网络的输入channel为64,首先通过一个$3\times3$的卷积层,然后通过Batch Normalization层,输出channel为64。
# 
# 最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Building Block最后的输出。
# 
# ![building-block-5](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image1825877107845952074.png)
# 
# 主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为$1\times1$的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的`stride`为2,主分支第一层卷积操作的`stride`也需设置为2。
# 
# 如下代码定义`ResidualBlockBase`类实现Building Block结构。

# In[4]:
from typing import Type, Union, List, Optional
import mindspore.nn as nn
from mindspore.common.initializer import Normal

# 初始化卷积层与BatchNorm的参数
weight_init = Normal(mean=0, sigma=0.02)
gamma_init = Normal(mean=1, sigma=0.02)

class ResidualBlockBase(nn.Cell):
    expansion: int = 1  # 最后一个卷积核数量与第一个卷积核数量相等

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, norm: Optional[nn.Cell] = None,
                 down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlockBase, self).__init__()
        if not norm:
            self.norm = nn.BatchNorm2d(out_channel)
        else:
            self.norm = norm

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)
        self.conv2 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, weight_init=weight_init)
        self.relu = nn.ReLU()
        self.down_sample = down_sample

    def construct(self, x):
        """ResidualBlockBase construct."""
        identity = x  # shortcuts分支

        out = self.conv1(x)  # 主分支第一层:3*3卷积层
        out = self.norm(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层:3*3卷积层
        out = self.norm(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)
        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out


# #### Bottleneck
# 
# Bottleneck结构图如下图所示,在输入相同的情况下Bottleneck结构相对Building Block结构的参数数量更少,更适合层数较深的网络,ResNet50使用的残差结构就是Bottleneck。该结构的主分支有三层卷积结构,分别为$1\times1$的卷积层、$3\times3$卷积层和$1\times1$的卷积层,其中$1\times1$的卷积层分别起降维和升维的作用。
# 
# + 主分支第一层网络以输入channel为256为例,首先通过数量为64,大小为$1\times1$的卷积核进行降维,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
# + 主分支第二层网络通过数量为64,大小为$3\times3$的卷积核提取特征,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
# + 主分支第三层通过数量为256,大小$1\times1$的卷积核进行升维,然后通过Batch Normalization层,其输出channel为256。
# 
# 最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Bottleneck最后的输出。
# 
# ![building-block-6](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image18106536310912139455.png)
# 
# 主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为$1\times1$的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的`stride`为2,主分支第二层卷积操作的`stride`也需设置为2。
# 
# 如下代码定义`ResidualBlock`类实现Bottleneck结构。

# In[5]:
class ResidualBlock(nn.Cell):
    expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=1, weight_init=weight_init)
        self.norm1 = nn.BatchNorm2d(out_channel)
        self.conv2 = nn.Conv2d(out_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)
        self.norm2 = nn.BatchNorm2d(out_channel)
        self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
                               kernel_size=1, weight_init=weight_init)
        self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)

        self.relu = nn.ReLU()
        self.down_sample = down_sample

    def construct(self, x):
        identity = x  # shortscuts分支

        out = self.conv1(x)  # 主分支第一层:1*1卷积层
        out = self.norm1(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层:3*3卷积层
        out = self.norm2(out)
        out = self.relu(out)
        out = self.conv3(out)  # 主分支第三层:1*1卷积层
        out = self.norm3(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)

        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out


# #### 构建ResNet50网络
# 
# ResNet网络层结构如下图所示,以输入彩色图像$224\times224$为例,首先通过数量64,卷积核大小为$7\times7$,stride为2的卷积层conv1,该层输出图片大小为$112\times112$,输出channel为64;然后通过一个$3\times3$的最大下采样池化层,该层输出图片大小为$56\times56$,输出channel为64;再堆叠4个残差网络块(conv2_x、conv3_x、conv4_x和conv5_x),此时输出图片大小为$7\times7$,输出channel为2048;最后通过一个平均池化层、全连接层和softmax,得到分类概率。
# 
# ![resnet-layer](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image12829022581112085.png)
# 
# 对于每个```
## 构建ResNet50网络
## 在代码中定义了ResNet的架构,主要包括一个卷积层、四个残差块、一个池化层和一个全连接层
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
               channel: int, block_nums: int, stride: int = 1):
    down_sample = None  # shortcuts分支

    if stride != 1或last_out_channel != channel * block.expansion:
        down_sample = nn.SequentialCell([
            nn.Conv2d(last_out_channel, channel * block.expansion,
                      kernel_size=1, stride=stride, weight_init=weight_init),
            nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
        ])

    layers = []  # 存储残差块层
    layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))

    in_channel = channel * block.expansion
    # 堆叠残差网络
    for _ in range(1, block_nums):
        layers.append(block(in_channel, channel))

    return nn.SequentialCell(layers)


## 定义ResNet类
class ResNet(nn.Cell):
    def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                 layer_nums: List[int], num_classes: int, input_channel: int) -> None:
        super(ResNet, self).__init__()

        self.relu = nn.ReLU()  # 激活函数
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)  # 第一个卷积层,输入通道为3(彩色图像),输出通道为64
        self.norm = nn.BatchNorm2d(64)  # 批标准化层
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')  # 最大池化层

        # 定义各个残差块
        self.layer1 = make_layer(64, block, 64, layer_nums[0])
        self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
        self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
        self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
        self.avg_pool = nn.AvgPool2d()  # 平均池化层
        self.flatten = nn.Flatten()  # 平坦层
        self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)  # 全连接层

    def construct(self, x):
        x = self.conv1(x)
        x = self.norm(x)
        x = self.relu(x)
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.fc(x)

        return x


def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
            layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
            input_channel: int):
    model = ResNet(block, layers, num_classes, input_channel)

    if pretrained:
        download(url=model_url, path=pretrained_ckpt, replace=True)
        param_dict = load_checkpoint(pretrained_ckpt)
        load_param_into_net(model, param_dict)

    return model


def resnet50(num_classes: int = 1000, pretrained: bool = False):
    resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
    resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
    return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes, pretrained, resnet50_ckpt, 2048)


## 模型训练与评估
## 定义了调用ResNet50网络的函数,并加载预训练模型进行微调
network = resnet50(pretrained=True)

# 全连接层的输入大小
in_channel = network.fc.in_channels
fc = nn.Dense(in_channels=in_channel, out_channels=10)  # 重新定义全连接层
network.fc = fc

## 设置学习率
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
                        step_per_epoch=step_size_train, decay_epoch=num_epochs)

## 定义优化器和损失函数
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')

def forward_fn(inputs, targets):
    logits = network(inputs)
    loss = loss_fn(logits, targets)
    return loss

grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)

def train_step(inputs, targets):
    loss, grads = grad_fn(inputs, targets)
    opt(grads)
    return loss

## 通过数据迭代器进行训练和验证
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)

best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"

if not os.path.exists(best_ckpt_dir):
    os.mkdir(best_ckpt_dir)

import mindspore.ops as ops

## 训练函数
def train(data_loader, epoch):
    losses = []
    network.set_train(True)

    for i, (images, labels) in enumerate(data_loader):
        loss = train_step(images, labels)
        if i % 100 == 0 or i == step_size_train - 1:
            print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
                  (epoch + 1, num_epochs, i + 1, step_size_train, loss))
        losses.append(loss)

    return sum(losses) / len(losses)

## 验证函数
def evaluate(data_loader):
    network.set_train(False)
    correct_num = 0.0  # 预测正确个数
    total_num = 0.0  # 预测总数

    for images, labels in data_loader:
        logits = network(images)
        pred = logits.argmax(axis=1)  # 预测结果
        correct = ops.equal(pred, labels).reshape((-1, ))
        correct_num += correct.sum().asnumpy()
        total_num += correct.shape[0]

    acc = correct_num / total_num  # 准确率
    return acc

## 开始循环训练
print("Start Training Loop ...")
for epoch in range(num_epochs):
    curr_loss = train(data_loader_train, epoch)
    curr_acc = evaluate(data_loader_val)

    print("-" * 50)
    print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
        epoch+1, num_epochs, curr_loss, curr_acc
    ))
    print("-" * 50)

    ## 保存当前预测准确率最高的模型
    if curr_acc > best_acc:
        best_acc = curr_acc
        ms.save_checkpoint(network, best_ckpt_path)

print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
      f"save the best ckpt file in {best_ckpt_path}", flush=True)


## 模型预测可视化
## 定义了一个函数,用来加载最佳模型并对验证集进行预测,并将预测结果可视化显示
import matplotlib.pyplot as plt

def visualize_model(best_ckpt_path, dataset_val):
    num_class = 10  # 对10类图像进行分类
    net = resnet50(num_class)
    ## 加载模型参数
    param_dict = ms.load_checkpoint(best_ckpt_path)
    ms.load_param_into_net(net, param_dict)
    ## 加载验证集的数据进行验证
    data = next(dataset_val.create_dict_iterator())
    images = data["image"]
    labels = data["label"]
    ## 预测图像类别
    output = net(data['image'])
    pred = np.argmax(output.asnumpy(), axis=1)

    ## 图像分类标签
    classes = []
    with open(data_dir + "/batches.meta.txt", "r") as f:
        for line in f:
            line = line.rstrip()
            if line:
                classes.append(line)

    ## 显示图像及图像的预测值
    plt.figure()
    for i in range(6):
        plt.subplot(2, 3, i + 1)
        ## 若预测正确,显示为蓝色;若预测错误,显示为红色
        color = 'blue' if pred[i] == labels.asnumpy()[i] else 'red'
        plt.title('predict:{}'.format(classes[pred[i]]), color=color)
        picture_show = np.transpose(images.asnumpy()[i], (1, 2, 0))
        mean = np.array([0.4914, 0.4822, 0.4465])
        std = np.array([0.2023, 0.1994, 0.2010])
        picture_show = std * picture_show + mean
        picture_show = np.clip(picture_show, 0, 1)
        plt.imshow(picture_show)
        plt.axis('off')

    plt.show()

## 使用测试数据集进行验证
visualize_model(best_ckpt_path=best_ckpt_path, dataset_val=dataset_val)

解析

  1. 函数定义与初始化
    • 函数 make_layer 用于创建一个堆叠了多个残差块的网络层,接受参数包括上一个残差块的输出通道数、残差块类型、当前残差块的输入通道数、残差块的数量以及卷积步幅。
    • ResNet 定义了 ResNet 网络结构,包括初始化各层和前向传播逻辑。
  2. 加载预训练模型
    • 函数 _resnet 用于加载预训练模型。
    • 函数 resnet50 用于创建一个 ResNet50 模型,默认分类数为1000,并提供选择是否加载预训练模型的选项。
  3. 训练与评估
    • 设置了学习率、优化器和损失函数,并定义了训练步骤 train_step 和验证步骤 evaluate
    • train 函数用于在数据加载器中进行模型训练,evaluate 函数用于验证模型在验证集上的准确率。
    • 循环训练并保存最佳模型参数。
  4. 模型预测可视化
    • 函数 visualize_model 用于加载最佳模型并对验证集进行预测,将预测结果可视化显示。预测正确的显示为蓝色,预测错误的显示为红色。

通过这些步骤,你可以实现对 CIFAR-10 数据集的训练,并使用 ResNet50 模型进行图像分类和预测可视化。预测结果显示在图形窗口中,能够直观地看到模型的预测效果以及错误预测。

相关推荐

  1. MindSpore 应用学习-ResNet50迁移学习-CSDN

    2024-07-21 01:46:03       17 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-21 01:46:03       50 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-21 01:46:03       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-21 01:46:03       43 阅读
  4. Python语言-面向对象

    2024-07-21 01:46:03       54 阅读

热门阅读

  1. DAY05 CSS

    DAY05 CSS

    2024-07-21 01:46:03      16 阅读
  2. MacOS命令行运行fortran程序|编程私教解答

    2024-07-21 01:46:03       17 阅读
  3. 类与对象-多态-案例3-电脑组装具体实现

    2024-07-21 01:46:03       17 阅读
  4. OpenPyXL 写入 Excel 文件

    2024-07-21 01:46:03       15 阅读
  5. 量化机器人如何实现无缝交易?

    2024-07-21 01:46:03       16 阅读
  6. Redis 深度历险:核心原理与应用实践 - 读书笔记

    2024-07-21 01:46:03       15 阅读
  7. Head size 160 is not supported by PagedAttention.

    2024-07-21 01:46:03       14 阅读
  8. 数据仓库中的数据治理

    2024-07-21 01:46:03       18 阅读