昇思25天学习打卡营第14天|ResNet50图像分类

ResNet50图像分类

本实验使用ResNet50网络模型对CIFAR-10数据集进行分类

背景知识

图像分类

这是一种基础的计算机视觉任务,目标是将输入的图像分配到一个或多个预定义的类别中。在开始训练之前,需要大量已标注(即每张图像都已归类到某个特定类别)的数据。这类的应用非常广泛,可以用于人脸识别、自动驾驶、医学影像分析等。

ResNet50

ResNet系列网络的推出是为了解决随着神经网络层数增加而出现的梯度消失(梯度越来越小)和梯度爆炸(梯度越来越大)的问题。它使用残差块,通过跳跃链接(skip connection)解决这些问题,使网络可以更深(可以突破1000层)。残差块的目的是让每一层都学习到输出和输入之间的残差(而不是输出本身),跳跃连接就是在残差块上,输入可以跳过若干层后直接加到输出上。而ResNet也就是50层的网络,具体的网络架构如下:
1. 卷积层:7x7卷积,64个滤波器,步幅为2。
2. 最大池化层:3x3,步幅为2。
3. 残差块组1:3个残差块。
4. 残差块组2:4个残差块。
5. 残差块组3:6个残差块。
6. 残差块组4:3个残差块。
7. 全局平均池化层。
8. 全连接层:输出图片的具体类别。
其中,每个残差块都有三个卷积层。

CIFAR-10数据集

这是一个广泛使用的图像数据集,它包含六万张彩色32*32的图片,其中5万训练集、1万测试集,每批10000张图,因此构成了5个训练批,1个测试批。共分成十个类别,包括:airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck.

在这里插入图片描述

实验

数据加载

# 数据集下载
from download import download
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"
download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)

使用mindspore.dataset.Cifar10Dataset接口来加载数据集,并进行相关图像增强操作。

data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin"  # 数据集根目录
batch_size = 256  # 批量大小
image_size = 32  # 训练图像空间大小
workers = 4  # 并行线程个数
num_classes = 10  # 分类数量

def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
    data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
                                 usage=usage,
                                 num_parallel_workers=workers,
                                 shuffle=True)
    trans = []
    # 若为训练集,则增加额外的数据增强操作
    # 随机裁剪图像为32*32,边框填充为4个像素。
    # 以50%的概率对图片进行翻转
    if usage == "train":
        trans += [
            vision.RandomCrop((32, 32), (4, 4, 4, 4)),
            vision.RandomHorizontalFlip(prob=0.5)
        ]
	# 统一的预处理操作
	# 调整图片尺寸
	# 归一化
	# 使用给定的均值和标准差对图像标准化
	# 改变通道顺序
    trans += [
        vision.Resize(resize),
        vision.Rescale(1.0 / 255.0, 0.0),
        vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
        vision.HWC2CHW()
    ]

    target_trans = transforms.TypeCast(mstype.int32)

    # 数据映射操作
    data_set = data_set.map(operations=trans,
                            input_columns='image',
                            num_parallel_workers=workers)

    data_set = data_set.map(operations=target_trans,
                            input_columns='label',
                            num_parallel_workers=workers)

    # 批量操作
    data_set = data_set.batch(batch_size)

    return data_set

获取处理后的训练与测试数据集

dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
                                       usage="train",
                                       resize=image_size,
                                       batch_size=batch_size,
                                       workers=workers)
step_size_train = dataset_train.get_dataset_size()

dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
                                     usage="test",
                                     resize=image_size,
                                     batch_size=batch_size,
                                     workers=workers)
step_size_val = dataset_val.get_dataset_size()

网络构建

在这里插入图片描述
这张图展示了残差快的结构
输入:x
x首先通过权重层,再经过ReLU激活函数,再经过第二个权重层,最后得到F(x)
另外,输入x直接加到F(x)上,这个操作叫跳跃连接。得到F(x) + x
最终得到的结果再经过一个ReLU,得到残差快的输出。

残差网络(ResNet)的两种主要结构是Building Block和Bottleneck,它们分别用于不同深度的ResNet网络。

Building Block

在这里插入图片描述
以图中为例,经过33卷积层,再通过ReLU激活函数,再经过33卷积层。输入输出通道都是64。得到结果后再和跳跃连接的特征矩阵相加,经过ReLU后作为最后输出。

代码实现示例如下:

from typing import Type, Union, List, Optional
import mindspore.nn as nn
from mindspore.common.initializer import Normal

# 初始化卷积层与BatchNorm的参数
weight_init = Normal(mean=0, sigma=0.02)
gamma_init = Normal(mean=1, sigma=0.02)

class ResidualBlockBase(nn.Cell):
# expansion为扩展因子
# 指定为1时,输出通道数与输入通道数相同
    expansion: int = 1  
# 参数:输入通道数、输出通道数、步幅、归一化层、下采样层(用于调整输入的尺寸和通道数)
    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, norm: Optional[nn.Cell] = None,
                 down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlockBase, self).__init__()
        # 若未指定归一化层,则使用批归一化
        if not norm:
            self.norm = nn.BatchNorm2d(out_channel)
        else:
            self.norm = norm
# 定义了两个3*3的卷积层
        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)
        self.conv2 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, weight_init=weight_init)
        self.relu = nn.ReLU()
        self.down_sample = down_sample

# 定义前向传播过程
    def construct(self, x):
    #跳跃连接(shortcuts)
        identity = x 

        out = self.conv1(x)  # 主分支第一层:3*3卷积层
        out = self.norm(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层:3*3卷积层
        out = self.norm(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)
        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out

这里的输入和输出图像尺寸相同。假设输出图像比输入图像小一辈,就要将跳跃连接时的步幅和主分支第一层卷积的步幅都设为2。

Bottleneck

它适用于较深的网络,通过1*1卷积核减少计算量和参数数量。ResNet50使用的残差结构就是Bottleneck。

在这里插入图片描述
输入通道数为256,主分支中使用三层卷积层。第一层使用通道64的11卷积核进行降维。再通过33的卷积核特征提取,最后通过通道64的1*1卷积核进行升维,输出通道数为256。

以下为代码实现示例,

class ResidualBlock(nn.Cell):
    expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=1, weight_init=weight_init)
        self.norm1 = nn.BatchNorm2d(out_channel)
        self.conv2 = nn.Conv2d(out_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=weight_init)
        self.norm2 = nn.BatchNorm2d(out_channel)
        self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
                               kernel_size=1, weight_init=weight_init)
        self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)

        self.relu = nn.ReLU()
        self.down_sample = down_sample

    def construct(self, x):

        identity = x  # shortscuts分支

        out = self.conv1(x)  # 主分支第一层:1*1卷积层
        out = self.norm1(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层:3*3卷积层
        out = self.norm2(out)
        out = self.relu(out)
        out = self.conv3(out)  # 主分支第三层:1*1卷积层
        out = self.norm3(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)

        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out

构建ResNet50网络

在这里插入图片描述
网络结构如上,
第一层通道64,卷积核77,步长2,输出64
第二层3
3最大池化层,步长2,再堆叠3个残差块,输出256
第三层堆叠4个残差块,输出512
第四层堆叠6个残差块,输出1024
第五层堆叠3个残差块,输出2948
最后加上平均池化层、全连接层、softmax输出分类概率

以下定义残差块的构建

# 参数:上一个网络输出的通道数、残差网络的类别、残差网络的输入通道数、残差块堆叠个数、步幅。
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
               channel: int, block_nums: int, stride: int = 1):
    down_sample = None  # shortcuts分支
# 如果步幅不为1或上一层的通道数与当前层的通道数不相等,则需要下采样。
    if stride != 1 or last_out_channel != channel * block.expansion:
# 下采样模块由1*1卷积层和批归一化组成。
        down_sample = nn.SequentialCell([
            nn.Conv2d(last_out_channel, channel * block.expansion,
                      kernel_size=1, stride=stride, weight_init=weight_init),
            nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
        ])

    layers = []
    layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))

    in_channel = channel * block.expansion
    # 在循环中堆叠残差网络
    for _ in range(1, block_nums):
        layers.append(block(in_channel, channel))

    return nn.SequentialCell(layers)

构建ResNet50网络模型

from mindspore import load_checkpoint, load_param_into_net

class ResNet(nn.Cell):
    def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                 layer_nums: List[int], num_classes: int, input_channel: int) -> None:
        super(ResNet, self).__init__()

        self.relu = nn.ReLU()
        # 第一个卷积层,输入channel为3(彩色图像),输出channel为64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)
        self.norm = nn.BatchNorm2d(64)
        # 最大池化层,缩小图片的尺寸
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
        # 各个残差网络结构块定义
        self.layer1 = make_layer(64, block, 64, layer_nums[0])
        self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
        self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
        self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
        # 平均池化层
        self.avg_pool = nn.AvgPool2d()
        # flattern层
        self.flatten = nn.Flatten()
        # 全连接层
        self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)

    def construct(self, x):

        x = self.conv1(x)
        x = self.norm(x)
        x = self.relu(x)
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.fc(x)

        return x

def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
            layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
            input_channel: int):
    model = ResNet(block, layers, num_classes, input_channel)

    if pretrained:
        # 加载预训练模型
        download(url=model_url, path=pretrained_ckpt, replace=True)
        param_dict = load_checkpoint(pretrained_ckpt)
        load_param_into_net(model, param_dict)

    return model


def resnet50(num_classes: int = 1000, pretrained: bool = False):
    """ResNet50模型"""
    resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
    resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
    return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
                   pretrained, resnet50_ckpt, 2048)

模型评估

# 定义ResNet50网络
network = resnet50(pretrained=True)

# 全连接层输入层的大小
in_channel = network.fc.in_channels
fc = nn.Dense(in_channels=in_channel, out_channels=10)
# 重置全连接层
network.fc = fc
# 设置学习率
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
                        step_per_epoch=step_size_train, decay_epoch=num_epochs)
# 定义优化器和损失函数
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')

def forward_fn(inputs, targets):
    logits = network(inputs)
    loss = loss_fn(logits, targets)
    return loss

grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)

def train_step(inputs, targets):
    loss, grads = grad_fn(inputs, targets)
    opt(grads)
    return loss
import os

# 创建迭代器
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)

# 最佳模型存储路径
best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"

if not os.path.exists(best_ckpt_dir):
    os.mkdir(best_ckpt_dir)
import mindspore.ops as ops

def train(data_loader, epoch):
    """模型训练"""
    losses = []
    network.set_train(True)

    for i, (images, labels) in enumerate(data_loader):
        loss = train_step(images, labels)
        if i % 100 == 0 or i == step_size_train - 1:
            print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
                  (epoch + 1, num_epochs, i + 1, step_size_train, loss))
        losses.append(loss)

    return sum(losses) / len(losses)


def evaluate(data_loader):
    """模型验证"""
    network.set_train(False)

    correct_num = 0.0  # 预测正确个数
    total_num = 0.0  # 预测总数

    for images, labels in data_loader:
        logits = network(images)
        pred = logits.argmax(axis=1)  # 预测结果
        correct = ops.equal(pred, labels).reshape((-1, ))
        correct_num += correct.sum().asnumpy()
        total_num += correct.shape[0]

    acc = correct_num / total_num  # 准确率

    return acc
# 开始循环训练
print("Start Training Loop ...")

for epoch in range(num_epochs):
    curr_loss = train(data_loader_train, epoch)
    curr_acc = evaluate(data_loader_val)

    print("-" * 50)
    print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
        epoch+1, num_epochs, curr_loss, curr_acc
    ))
    print("-" * 50)

    # 保存当前预测准确率最高的模型
    if curr_acc > best_acc:
        best_acc = curr_acc
        ms.save_checkpoint(network, best_ckpt_path)

print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
      f"save the best ckpt file in {best_ckpt_path}", flush=True)

总结

本节完整的构建了ResNet50的网络,并完成了模型的训练与评估。

打卡凭证

在这里插入图片描述

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-15 04:24:02       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-15 04:24:02       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-15 04:24:02       45 阅读
  4. Python语言-面向对象

    2024-07-15 04:24:02       55 阅读

热门阅读

  1. 如何评价一个AI系统

    2024-07-15 04:24:02       18 阅读
  2. 查找运行中 sql中bind variable value 绑定变量值

    2024-07-15 04:24:02       18 阅读
  3. appium 实战问题 播放视频时无法定位到元素

    2024-07-15 04:24:02       21 阅读
  4. Django模板语言(简略教程)

    2024-07-15 04:24:02       22 阅读
  5. std::async和std::future异步编程

    2024-07-15 04:24:02       20 阅读
  6. C++智能指针

    2024-07-15 04:24:02       17 阅读
  7. Python面试题:如何在 Python 中进行单元测试?

    2024-07-15 04:24:02       19 阅读
  8. git使用

    2024-07-15 04:24:02       17 阅读