【PyTorch】现代卷积神经网络

1. 理论介绍

1.1. 深度卷积神经网络(AlexNet)

1.1.1. 概述

  • 特征本身应该被学习,而且在合理地复杂性前提下,特征应该由多个共同学习的神经网络层组成,每个层都有可学习的参数。在机器视觉中,最底层可能检测边缘、颜色和纹理,更高层建立在这些底层表示的基础上,以表示更大的特征。
  • 包含许多特征的深度模型需要大量的有标签数据,才能显著优于基于凸优化的传统方法(如线性方法和核方法)。ImageNet挑战赛为研究人员提高超大规模的数据集。
  • 深度学习对计算资源要求很高,训练可能需要数百个迭代轮数,每次迭代都需要通过代价高昂的许多线性代数层传递数据。用GPU训练神经网络大大提高了训练速度,相比于CPU,GPU由100~1000个小的处理单元(核心)组成,庞大的核心数量使GPU比CPU快几个数量级;GPU核心更简单,功耗更低;GPU拥有更高的内存带宽;卷积和矩阵乘法,都是可以在GPU上并行化的操作。
  • AlexNet首次证明了学习到的特征可以超越手工设计的特征。

1.1.2. 模型设计

模型设计

  • AlexNet使用ReLU而不是sigmoid作为其激活函数。
  • 在最后一个卷积层后有两个全连接层,分别有4096个输出。 这两个巨大的全连接层拥有将近1GB的模型参数。 由于早期GPU显存有限,原版的AlexNet采用了双数据流设计,使得每个GPU只负责存储和计算模型的一半参数。
  • AlexNet通过暂退法控制全连接层的模型复杂度。
  • 为了进一步扩充数据,AlexNet在训练时增加了大量的图像增强数据,如翻转、裁切和变色。 这使得模型更健壮,更大的样本量有效地减少了过拟合。

1.2. 使用块的网络(VGG)

  • 经典卷积神经网络的基本组成部分
    • 带填充以保持分辨率的卷积层
    • 非线性激活函数
    • 池化层
  • VGG块由一系列卷积层组成,后面再加上用于空间下采样的最大池化层。深层且窄的卷积(即 3 × 3 3\times3 3×3)比较浅层且宽的卷积更有效。不同的VGG模型可通过每个VGG块中卷积层数量和输出通道数量的差异来定义。
  • VGG网络由几个VGG块和全连接层组成。超参数变量conv_arch指定了每个VGG块里卷积层个数和输出通道数。全连接模块则与AlexNet中的相同。
  • VGG-11有5个VGG块,其中前两个块各有一个卷积层,后三个块各包含两个卷积层,共8个卷积层。 第一个模块有64个输出通道,每个后续模块将输出通道数量翻倍,直到该数字达到512。

1.3. 网络中的网络(NiN)

  • 网络中的网络(NiN)在每个像素的通道上分别使用多层感知机
  • NiN块以一个普通卷积层开始,后面是两个 1 × 1 1\times1 1×1的卷积层。这两个 1 × 1 1\times1 1×1卷积层充当带有ReLU激活函数的逐像素全连接层。
  • NiN网络使用窗口形状为 11 × 11 11\times11 11×11 5 × 5 5\times5 5×5 3 × 3 3\times3 3×3的卷积层,输出通道数量与AlexNet中的相同。 每个NiN块后有一个最大池化层,池化窗口形状为 3 × 3 3\times3 3×3,步幅为2。 NiN和AlexNet之间的一个显著区别是NiN完全取消了全连接层。NiN使用一个NiN块,其输出通道数等于标签类别的数量。最后放一个全局平均池化层,生成一个对数几率 (logits)。
  • 移除全连接层可减少过拟合,同时显著减少NiN的参数。

1.4. 含并行连结的网络(GoogLeNet)

  • 使用不同大小的卷积核组合是有利的,因为可以有效地识别不同范围的图像细节。
  • Inception块由四条并行路径组成。四条路径都使用合适的填充来使输入与输出的高和宽一致。超参数是每层输出通道数
    Inception
  • GoogLeNet一共使用9个Inception块和全局平均汇聚层的堆叠来生成其估计值。Inception块之间的最大汇聚层可降低维度。
    GoogLeNet

2. 实例解析

2.1. 实例描述

  • 在FashionMNIST数据集上训练AlexNet。注:输入图像调整到 224 × 224 224\times224 224×224,不使用跨GPU分解模型。
  • 在FashionMNIST数据集上训练VGG-11。注:输入图像调整到 224 × 224 224\times224 224×224,可以按比例缩小通道数以减少参数量,学习率可以设置得略高。
  • 在FashionMNIST数据集上训练NiN。注:输入图像调整到 224 × 224 224\times224 224×224
  • 在FashionMNIST数据集上训练GoogLeNet。注:输入图像调整到 96 × 96 96\times96 96×96

2.2. 代码实现

2.2.1. 在FashionMNIST数据集上训练AlexNet

2.2.1.1. 主要代码
AlexNet = nn.Sequential(
        nn.Conv2d(1, 96, kernel_size=11, stride=4, padding=1), nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(),
        nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(),
        nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Flatten(),
        nn.Linear(6400, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 10)
).to(device, non_blocking=True)
2.2.1.2. 完整代码
import os, torch
from tensorboardX import SummaryWriter
from rich.progress import track
from torchvision.datasets import FashionMNIST
from torchvision.transforms import Compose, ToTensor, Resize
from torch.utils.data import DataLoader
from torch import nn, optim

def load_dataset():
    """加载数据集"""
    root = "./dataset"
    transform = Compose([Resize(224), ToTensor()])
    mnist_train = FashionMNIST(root, True, transform, download=True)
    mnist_test = FashionMNIST(root, False, transform, download=True)

    dataloader_train = DataLoader(mnist_train, batch_size, shuffle=True, 
        num_workers=num_workers, pin_memory=True,
    )
    dataloader_test = DataLoader(mnist_test, batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True,
    )
    return dataloader_train, dataloader_test


if __name__ == "__main__":
    # 全局参数设置
    num_epochs = 10
    batch_size = 128
    num_workers = 3
    lr = 0.01
    device = torch.device('cuda')

    # 创建记录器
    def log_dir():
        root = "runs"
        if not os.path.exists(root):
            os.mkdir(root)
        order = len(os.listdir(root)) + 1
        return f'{
     root}/exp{
     order}'
    writer = SummaryWriter(log_dir=log_dir())

    # 数据集配置
    dataloader_train, dataloader_test = load_dataset()

    # 模型配置
    AlexNet = nn.Sequential(
        nn.Conv2d(1, 96, kernel_size=11, stride=4, padding=1), nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(),
        nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(),
        nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Flatten(),
        nn.Linear(6400, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 10)
    ).to(device, non_blocking=True)
    criterion = nn.CrossEntropyLoss(reduction='none')
    optimizer = optim.SGD(AlexNet.parameters(), lr=lr)

    # 训练循环
    metrics_train = torch.zeros(3, device=device)  # 训练损失、训练准确度、样本数
    metrics_test = torch.zeros(2, device=device)   # 测试准确度、样本数
    for epoch in track(range(num_epochs), description='AlexNet'):
        AlexNet.train()
        for X, y in dataloader_train:
            X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
            loss = criterion(AlexNet(X), y)
            optimizer.zero_grad()
            loss.mean().backward()
            optimizer.step()

        AlexNet.eval()
        with torch.no_grad():
            metrics_train.zero_()
            for X, y in dataloader_train:
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                y_hat = AlexNet(X)
                loss = criterion(y_hat, y)
                metrics_train[0] += loss.sum()
                metrics_train[1] += (y_hat.argmax(dim=1) == y).sum()
                metrics_train[2] += y.numel()
            metrics_train[0] /= metrics_train[2]
            metrics_train[1] /= metrics_train[2]

            metrics_test.zero_()
            for X, y in dataloader_test:
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                y_hat = AlexNet(X)
                metrics_test[0] += (y_hat.argmax(dim=1) == y).sum()
                metrics_test[1] += y.numel()
            metrics_test[0] /= metrics_test[1]

            _metrics_train = metrics_train.cpu().numpy()
            _metrics_test = metrics_test.cpu().numpy()
            writer.add_scalars('metrics', {
   
                'train_loss': _metrics_train[0],
                'train_acc': _metrics_train[1],
                'test_acc': _metrics_test[0]
            }, epoch)

    writer.close()
2.2.1.3. 输出结果

AlexNet

2.2.2. 在FashionMNIST数据集上训练VGG-11

2.2.2.1. 主要代码
def vgg_block(num_convs, in_channels, out_channels):
        """定义VGG块"""
        layers = []
        for _ in range(num_convs):
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
            layers.append(nn.ReLU())
            in_channels = out_channels
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        return nn.Sequential(*layers)

def vgg(conv_arch):
    """定义VGG网络"""
    vgg_blocks = []
    in_channels = 1
    out_channels = 0

    for num_convs, out_channels in conv_arch:
        vgg_blocks.append(vgg_block(num_convs, in_channels, out_channels))
        in_channels = out_channels

    return nn.Sequential(
        *vgg_blocks, 
        nn.Flatten(),
        nn.Linear(out_channels * 7 * 7, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 10),
    )
2.2.2.2. 完整代码
import os
from tensorboardX import SummaryWriter
from rich.progress import track
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, Resize, ToTensor
from torchvision.datasets import FashionMNIST

def load_dataset():
    """加载数据集"""
    root = "./dataset"
    transform = Compose([Resize(224), ToTensor()])
    mnist_train = FashionMNIST(root, True, transform, download=True)
    mnist_test = FashionMNIST(root, False, transform, download=True)

    dataloader_train = DataLoader(mnist_train, batch_size, shuffle=True, 
        num_workers=num_workers, pin_memory=True,
    )
    dataloader_test = DataLoader(mnist_test, batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True,
    )
    return dataloader_train, dataloader_test

def vgg_block(num_convs, in_channels, out_channels):
        """定义VGG块"""
        layers = []
        for _ in range(num_convs):
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
            layers.append(nn.ReLU())
            in_channels = out_channels
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        return nn.Sequential(*layers)

def vgg(conv_arch):
    """定义VGG网络"""
    vgg_blocks = []
    in_channels = 1
    out_channels = 0

    for num_convs, out_channels in conv_arch:
        vgg_blocks.append(vgg_block(num_convs, in_channels, out_channels))
        in_channels = out_channels

    return nn.Sequential(
        *vgg_blocks, 
        nn.Flatten(),
        nn.Linear(out_channels * 7 * 7, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096), nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 10),
    )


if __name__ == '__main__':
    # 全局参数配置
    num_epochs = 10
    batch_size = 128
    num_workers = 3
    lr = 0.05
    device = torch.device('cuda')

    # 记录器配置
    def log_dir():
        root = "runs"
        if not os.path.exists(root):
            os.mkdir(root)
        order = len(os.listdir(root)) + 1
        return f'{
     root}/exp{
     order}'
    writer = SummaryWriter(log_dir=log_dir())

    # 数据集配置
    dataloader_train, dataloader_test = load_dataset()

    # 模型配置
    ratio = 4
    conv_arch = ((1, 64), (1, 128), (2, 256), (2, 512), (2, 512))
    small_conv_arch =  [(pair[0], pair[1] // ratio) for pair in conv_arch]
    net = vgg(small_conv_arch).to(device, non_blocking=True)
    criterion = nn.CrossEntropyLoss(reduction='none')
    optimizer = optim.SGD(net.parameters(), lr=lr)

    # 训练循环
    metrics_train = torch.zeros(3, device=device)  # 训练损失、训练准确度、样本数
    metrics_test = torch.zeros(2, device=device)   # 测试准确度、样本数
    for epoch in track(range(num_epochs), description='VGG'):
        net.train()
        for X, y in dataloader_train:
            X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
            loss = criterion(net(X), y)
            optimizer.zero_grad()
            loss.mean().backward()
            optimizer.step()
    
        net.eval()
        with torch.no_grad():
            metrics_train.zero_()
            for X, y in dataloader_train:
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                y_hat = net(X)
                loss = criterion(y_hat, y)
                metrics_train[0] += loss.sum()
                metrics_train[1] += (y_hat.argmax(dim=1) == y).sum()
                metrics_train[2] += y.numel()
            metrics_train[0] /= metrics_train[2]
            metrics_train[1] /= metrics_train[2]
    
            metrics_test.zero_()
            for X, y in dataloader_test:
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                y_hat = net(X)
                metrics_test[0] += (y_hat.argmax(dim=1) == y).sum()
                metrics_test[1] += y.numel()
            metrics_test[0] /= metrics_test[1]
    
            _metrics_train = metrics_train.cpu().numpy()
            _metrics_test = metrics_test.cpu().numpy()
            writer.add_scalars('metrics', {
   
                'train_loss': _metrics_train[0],
                'train_acc': _metrics_train[1],
                'test_acc': _metrics_test[0]
            }, epoch)
    
    writer.close()
2.2.2.3. 输出结果

VGG

2.2.3. 在FashionMNIST数据集上训练NiN

2.2.3.1. 主要代码
def nin_block(in_channels, out_channels, kernel_size, strides, padding):
    """定义NiN块"""
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size, strides, padding), nn.ReLU(),
        nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU(),
        nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU()
    )
net = nn.Sequential(
    nin_block(1, 96, kernel_size=11, strides=4, padding=0),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nin_block(96, 256, kernel_size=5, strides=1, padding=2),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nin_block(256, 384, kernel_size=3, strides=1, padding=1),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Dropout(0.5),
    nin_block(384, 10, kernel_size=3, strides=1, padding=1),
    nn.AdaptiveAvgPool2d((1, 1)),
    nn.Flatten()
).to(device, non_blocking=True)
2.2.3.2. 完整代码
import os
from tensorboardX import SummaryWriter
from alive_progress import alive_bar
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor, Resize
from torchvision.datasets import FashionMNIST

def load_dataset():
    """加载数据集"""
    root = "./dataset"
    transform = Compose([Resize(224), ToTensor()])
    mnist_train = FashionMNIST(root, True, transform, download=True)
    mnist_test = FashionMNIST(root, False, transform, download=True)

    dataloader_train = DataLoader(mnist_train, batch_size, shuffle=True, 
        num_workers=num_workers, pin_memory=True,
    )
    dataloader_test = DataLoader(mnist_test, batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True,
    )
    return dataloader_train, dataloader_test

def train_on_FashionMNIST(title):
    """在FashionMNIST数据集上训练指定模型"""
    # 创建记录器
    def log_dir():
        root = "runs"
        if not os.path.exists(root):
            os.mkdir(root)
        order = len(os.listdir(root)) + 1
        return f'{
     root}/exp{
     order}'
    writer = SummaryWriter(log_dir=log_dir())

    # 数据集配置
    dataloader_train, dataloader_test = load_dataset()

    # 模型配置
    criterion = nn.CrossEntropyLoss(reduction='none')
    optimizer = optim.SGD(net.parameters(), lr=lr)

    # 训练循环
    metrics_train = torch.zeros(3, device=device)  # 训练损失、训练准确度、样本数
    metrics_test = torch.zeros(2, device=device)   # 测试准确度、样本数
    with alive_bar(num_epochs, theme='classic', title=title) as bar:
        for epoch in range(num_epochs):
            net.train()
            for X, y in dataloader_train:
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                loss = criterion(net(X), y)
                optimizer.zero_grad()
                loss.mean().backward()
                optimizer.step()
        
            net.eval()
            with torch.no_grad():
                metrics_train.zero_()
                for X, y in dataloader_train:
                    X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                    y_hat = net(X)
                    loss = criterion(y_hat, y)
                    metrics_train[0] += loss.sum()
                    metrics_train[1] += (y_hat.argmax(dim=1) == y).sum()
                    metrics_train[2] += y.numel()
                metrics_train[0] /= metrics_train[2]
                metrics_train[1] /= metrics_train[2]
        
                metrics_test.zero_()
                for X, y in dataloader_test:
                    X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                    y_hat = net(X)
                    metrics_test[0] += (y_hat.argmax(dim=1) == y).sum()
                    metrics_test[1] += y.numel()
                metrics_test[0] /= metrics_test[1]
        
                _metrics_train = metrics_train.cpu().numpy()
                _metrics_test = metrics_test.cpu().numpy()
                writer.add_scalars('metrics', {
   
                    'train_loss': _metrics_train[0],
                    'train_acc': _metrics_train[1],
                    'test_acc': _metrics_test[0]
                }, epoch)
            bar()
    
    writer.close()

if __name__ == '__main__':
    # 全局参数配置
    num_epochs = 10
    batch_size = 128
    num_workers = 3
    lr = 0.1
    device = torch.device('cuda')

    # 模型配置
    def nin_block(in_channels, out_channels, kernel_size, strides, padding):
        """定义NiN块"""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, strides, padding), nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU()
        )
    net = nn.Sequential(
        nin_block(1, 96, kernel_size=11, strides=4, padding=0),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nin_block(96, 256, kernel_size=5, strides=1, padding=2),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nin_block(256, 384, kernel_size=3, strides=1, padding=1),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Dropout(0.5),
        nin_block(384, 10, kernel_size=3, strides=1, padding=1),
        nn.AdaptiveAvgPool2d((1, 1)),
        nn.Flatten()
    ).to(device, non_blocking=True)
    def init_weights(m):
        if type(m) in [nn.Linear, nn.Conv2d]:
            nn.init.xavier_uniform_(m.weight)
            nn.init.zeros_(m.bias)
    net.apply(init_weights)
    
    # 在FashionMNIST数据集上训练模型
    train_on_FashionMNIST('NiN')
2.2.3.3. 输出结果

NiN

2.2.4. 在FashionMNIST数据集上训练GoogLeNet

2.2.4.1. 主要代码
class Inception(nn.Module):
    # c1--c4是每条路径的输出通道数
    def __init__(self, in_channels, c1, c2, c3, c4, **kwargs):
        super(Inception, self).__init__(**kwargs)
        # 线路1,单1x1卷积层
        self.p1_1 = nn.Conv2d(in_channels, c1, kernel_size=1)
        # 线路2,1x1卷积层后接3x3卷积层
        self.p2_1 = nn.Conv2d(in_channels, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        # 线路3,1x1卷积层后接5x5卷积层
        self.p3_1 = nn.Conv2d(in_channels, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        # 线路4,3x3最大汇聚层后接1x1卷积层
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_channels, c4, kernel_size=1)

    def forward(self, x):
        p1 = F.relu(self.p1_1(x))
        p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
        p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
        p4 = F.relu(self.p4_2(self.p4_1(x)))
        # 在通道维度上连结输出
        return torch.cat((p1, p2, p3, p4), dim=1)
b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
               nn.ReLU(),
               nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
               nn.ReLU(),
               nn.Conv2d(64, 192, kernel_size=3, padding=1),
               nn.ReLU(),
               nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
               Inception(256, 128, (128, 192), (32, 96), 64),
               nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
               Inception(512, 160, (112, 224), (24, 64), 64),
               Inception(512, 128, (128, 256), (24, 64), 64),
               Inception(512, 112, (144, 288), (32, 64), 64),
               Inception(528, 256, (160, 320), (32, 128), 128),
               nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
               Inception(832, 384, (192, 384), (48, 128), 128),
               nn.AdaptiveAvgPool2d((1,1)),
               nn.Flatten())

net = nn.Sequential(b1, b2, b3, b4, b5, nn.Linear(1024, 10)).to(device, non_blocking=True)
2.2.4.2. 完整代码
import os
from tensorboardX import SummaryWriter
from alive_progress import alive_bar
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor, Resize
from torchvision.datasets import FashionMNIST

def load_dataset():
    """加载数据集"""
    root = "./dataset"
    transform = Compose([Resize(96), ToTensor()])
    mnist_train = FashionMNIST(root, True, transform, download=True)
    mnist_test = FashionMNIST(root, False, transform, download=True)

    dataloader_train = DataLoader(mnist_train, batch_size, shuffle=True, 
        num_workers=num_workers, pin_memory=True,
    )
    dataloader_test = DataLoader(mnist_test, batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True,
    )
    return dataloader_train, dataloader_test

def train_on_FashionMNIST(title):
    """在FashionMNIST数据集上训练指定模型"""
    # 创建记录器
    def log_dir():
        root = "runs"
        if not os.path.exists(root):
            os.mkdir(root)
        order = len(os.listdir(root)) + 1
        return f'{
     root}/exp{
     order}'
    writer = SummaryWriter(log_dir=log_dir())

    # 数据集配置
    dataloader_train, dataloader_test = load_dataset()

    # 模型配置
    criterion = nn.CrossEntropyLoss(reduction='none')
    optimizer = optim.SGD(net.parameters(), lr=lr)

    # 训练循环
    metrics_train = torch.zeros(3, device=device)  # 训练损失、训练准确度、样本数
    metrics_test = torch.zeros(2, device=device)   # 测试准确度、样本数
    with alive_bar(num_epochs, theme='classic', title=title) as bar:
        for epoch in range(num_epochs):
            net.train()
            for X, y in dataloader_train:
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                loss = criterion(net(X), y)
                optimizer.zero_grad()
                loss.mean().backward()
                optimizer.step()
        
            net.eval()
            with torch.no_grad():
                metrics_train.zero_()
                for X, y in dataloader_train:
                    X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                    y_hat = net(X)
                    loss = criterion(y_hat, y)
                    metrics_train[0] += loss.sum()
                    metrics_train[1] += (y_hat.argmax(dim=1) == y).sum()
                    metrics_train[2] += y.numel()
                metrics_train[0] /= metrics_train[2]
                metrics_train[1] /= metrics_train[2]
        
                metrics_test.zero_()
                for X, y in dataloader_test:
                    X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                    y_hat = net(X)
                    metrics_test[0] += (y_hat.argmax(dim=1) == y).sum()
                    metrics_test[1] += y.numel()
                metrics_test[0] /= metrics_test[1]
        
                _metrics_train = metrics_train.cpu().numpy()
                _metrics_test = metrics_test.cpu().numpy()
                writer.add_scalars('metrics', {
   
                    'train_loss': _metrics_train[0],
                    'train_acc': _metrics_train[1],
                    'test_acc': _metrics_test[0]
                }, epoch)
            bar()
    
    writer.close()

if __name__ == '__main__':
    # 全局参数配置
    num_epochs = 10
    batch_size = 128
    num_workers = 3
    lr = 0.1
    device = torch.device('cuda')

    # 模型配置
    class Inception(nn.Module):
        # c1--c4是每条路径的输出通道数
        def __init__(self, in_channels, c1, c2, c3, c4, **kwargs):
            super(Inception, self).__init__(**kwargs)
            # 线路1,单1x1卷积层
            self.p1_1 = nn.Conv2d(in_channels, c1, kernel_size=1)
            # 线路2,1x1卷积层后接3x3卷积层
            self.p2_1 = nn.Conv2d(in_channels, c2[0], kernel_size=1)
            self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
            # 线路3,1x1卷积层后接5x5卷积层
            self.p3_1 = nn.Conv2d(in_channels, c3[0], kernel_size=1)
            self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
            # 线路4,3x3最大汇聚层后接1x1卷积层
            self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
            self.p4_2 = nn.Conv2d(in_channels, c4, kernel_size=1)

        def forward(self, x):
            p1 = F.relu(self.p1_1(x))
            p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
            p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
            p4 = F.relu(self.p4_2(self.p4_1(x)))
            # 在通道维度上连结输出
            return torch.cat((p1, p2, p3, p4), dim=1)
    b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                   nn.ReLU(),
                   nn.Conv2d(64, 192, kernel_size=3, padding=1),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                   Inception(256, 128, (128, 192), (32, 96), 64),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                   Inception(512, 160, (112, 224), (24, 64), 64),
                   Inception(512, 128, (128, 256), (24, 64), 64),
                   Inception(512, 112, (144, 288), (32, 64), 64),
                   Inception(528, 256, (160, 320), (32, 128), 128),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
                   Inception(832, 384, (192, 384), (48, 128), 128),
                   nn.AdaptiveAvgPool2d((1,1)),
                   nn.Flatten())

    net = nn.Sequential(b1, b2, b3, b4, b5, nn.Linear(1024, 10)).to(device, non_blocking=True)
    def init_weights(m):
        if type(m) in [nn.Linear, nn.Conv2d]:
            nn.init.xavier_uniform_(m.weight)
            nn.init.zeros_(m.bias)
    net.apply(init_weights)

    # 在FashionMNIST数据集上训练模型
    train_on_FashionMNIST('GoogLeNet')
2.2.4.3. 输出结果

GoogLeNet

相关推荐

  1. pytorch 神经网络CNN

    2023-12-11 23:12:04       41 阅读
  2. 神经网络Pytorch 08)

    2023-12-11 23:12:04       30 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-11 23:12:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-11 23:12:04       101 阅读
  3. 在Django里面运行非项目文件

    2023-12-11 23:12:04       82 阅读
  4. Python语言-面向对象

    2023-12-11 23:12:04       91 阅读

热门阅读

  1. Qt12.11

    2023-12-11 23:12:04       65 阅读
  2. 二叉搜索树的最近公共祖先【数据结构】

    2023-12-11 23:12:04       48 阅读
  3. 647.回文子串

    2023-12-11 23:12:04       48 阅读
  4. Hough算法数学原理

    2023-12-11 23:12:04       48 阅读
  5. Flex布局 实现元素排列 4列变2列?

    2023-12-11 23:12:04       49 阅读
  6. 离线环境下安装python库(推荐pip download)

    2023-12-11 23:12:04       64 阅读
  7. 什么是https 加密协议?

    2023-12-11 23:12:04       61 阅读
  8. 什么是多态

    2023-12-11 23:12:04       51 阅读
  9. 我的Android播放器封装经验

    2023-12-11 23:12:04       52 阅读
  10. python扩展内置类型

    2023-12-11 23:12:04       61 阅读