Pytorch复现 Transformer cssdn

一、多头注意力机制 

import torch
import torch.nn as nn

class MultiHeadAttention(nn.Module):
    def __init__(self, hid_dim, n_heads):
        super(MultiHeadAttention, self).__init__()
        assert hid_dim % n_heads == 0
        self.hid_dim = hid_dim
        self.n_heads = n_heads
        self.w_q = nn.Linear(hid_dim, hid_dim)
        self.w_k = nn.Linear(hid_dim, hid_dim)
        self.w_v = nn.Linear(hid_dim, hid_dim)
        self.fc = nn.Linear(hid_dim, hid_dim)
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim // n_heads]))

    def forward(self, query, key, value, mask=None):
        bsz = query.shape[0]
        Q = self.w_q(query)
        K = self.w_k(key)
        V = self.w_v(value)

        Q = Q.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
        K = K.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
        V = V.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)

        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        attention = torch.softmax(energy, dim=-1)
        x = torch.matmul(attention, V)
        x = x.permute(0, 2, 1, 3).contiguous()
        x = x.view(bsz, -1, self.n_heads * (self.hid_dim // self.n_heads))
        x = self.fc(x)
        return x

二、前馈神经网络

class Feedforward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(Feedforward, self).__init__()
        # 两层线性变换和ReLU激活函数
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = torch.nn.functional.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.linear2(x)
        return x

三、位置编码 

class PositionalEncoding(nn.Module):
    "实现位置编码"
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # 初始化一个Shape为(max_len, d_model)的位置编码矩阵
        pe = torch.zeros(max_len, d_model).to(device)
        
        # 初始化一个tensor [[0, 1, 2, 3, ...]]
        position = torch.arange(0, max_len).unsqueeze(1)
        # 这里计算sin和cos中的频率项,通过e的指数进行变换
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)  # 偶数位置填充sin
        pe[:, 1::2] = torch.cos(position * div_term)  # 奇数位置填充cos
        
        pe = pe.unsqueeze(0)  # 为了方便批处理,增加一个可以unsqueeze出一个`batch`
        
        # 将pe注册为一个不参与梯度反传,但又希望保存在model的state_dict中的持久化buffer
        # 这个操作使得pe可以在forward中使用,但是不会被视为模型的一个可训练参数
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        将embedding后的inputs,例如(1, 7, 128),batch size为1, 7个单词,每个单词的嵌入为128
        """
        # 将x与positional encoding相加。
        x = x + self.pe[:, :x.size(1)].requires_grad_(False)
        return self.dropout(x)

四、编码层

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        # 自注意力层和前馈神经网络层初始化及残差连接和层归一化
        self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.feedforward = Feedforward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # 自注意力机制
        attn_output = self.self_attn(x, x, x, mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # 前馈神经网络
        ff_output = self.feedforward(x)
        x = x + self.dropout(ff_output)
        x = self.norm2(x)

        return x

五、解码层

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        # 自注意力层和前馈神经网络层初始化及残差连接和层归一化
        self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.enc_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.feedforward = Feedforward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, self_mask, context_mask):
        # 自注意力机制
        attn_output = self.self_attn(x, x, x, self_mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # 编码器-解码器注意力机制
        attn_output = self.enc_attn(x, enc_output, enc_output, context_mask)
        x = x + self.dropout(attn_output)
        x = self.norm2(x)

        # 前馈神经网络
        ff_output = self.feedforward(x)
        x = x + self.dropout(ff_output)
        x = self.norm3(x)

        return x

六、Transformer模型 

这段代码是构建Transformer模型的核心,通常用于机器翻译、文本摘要、问答系统等自然语言处理任务。

class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout=0.1):
        super(Transformer, self).__init__()
        # Transformer模型包含嵌入层、位置编码、编码器和解码器层以及输出层
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_encoder_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_decoder_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, trg, src_mask, trg_mask):
        # 嵌入层和位置编码
        src = self.embedding(src)
        src = self.positional_encoding(src)
        trg = self.embedding(trg)
        trg = self.positional_encoding(trg)

        # 编码器层
        for layer in self.encoder_layers:
            src = layer(src, src_mask)

        # 解码器层
        for layer in self.decoder_layers:
            trg = layer(trg, src, trg_mask, src_mask)

        # 输出层
        output = self.fc_out(trg)
        return output
  • self.embedding: 嵌入层,用于将输入词汇转换成固定大小的向量。
  • self.positional_encoding: 位置编码层,给嵌入向量添加位置信息。
  • self.encoder_layers: 编码器层,是一个模块列表,其中包含了多个EncoderLayer
  • self.decoder_layers: 解码器层,是一个模块列表,其中包含了多个DecoderLayer
  • self.fc_out: 线性层,将解码器输出转换为最终的词汇大小输出。
  • self.dropout: 丢弃层,用于训练中的正则化。

七、初始化模型

使用PyTorch框架初始化一个Transformer模型,并为模型的源和目标序列生成随机数据以及对应的掩码。

# 模型示例
vocab_size = 10000  # 假设词汇表大小为10000
d_model = 512
n_heads = 8
n_encoder_layers = 6
n_decoder_layers = 6
d_ff = 2048
dropout = 0.1

transformer_model = Transformer(vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout)

# 生成输入,这里的输入是随机的,需要根据实际情况修改
src = torch.randint(0, vocab_size, (32, 10))  # 假设源序列长度为10
trg = torch.randint(0, vocab_size, (32, 20))  # 假设目标序列长度为20

# 生成掩码,用于屏蔽序列中的填充位置
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)  # 源序列掩码
trg_mask = (trg != 0).unsqueeze(1).unsqueeze(2)  # 目标序列掩码

# 模型前向传播
output = transformer_model(src, trg, src_mask, trg_mask)
print(output.shape)

代码通过模型进行前向传播,并打印输出的形状。根据Transformer模型的设计,输出的形状应该是(batch_size, trg_seq_len, vocab_size),在这个例子中是(32, 20, 10000)。这表示对于每个批次中的32个样本的每个位置,模型都会输出一个10000维的向量,向量表示每个词汇的分数或概率。 

 

完整代码如下:

import torch
import torch.nn as nn
import math

# 指定设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class MultiHeadAttention(nn.Module):
    def __init__(self, hid_dim, n_heads):
        super(MultiHeadAttention, self).__init__()
        assert hid_dim % n_heads == 0
        self.hid_dim = hid_dim
        self.n_heads = n_heads
        self.w_q = nn.Linear(hid_dim, hid_dim)
        self.w_k = nn.Linear(hid_dim, hid_dim)
        self.w_v = nn.Linear(hid_dim, hid_dim)
        self.fc = nn.Linear(hid_dim, hid_dim)
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim // n_heads]))

    def forward(self, query, key, value, mask=None):
        bsz = query.shape[0]
        Q = self.w_q(query)
        K = self.w_k(key)
        V = self.w_v(value)

        Q = Q.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
        K = K.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
        V = V.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)

        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        attention = torch.softmax(energy, dim=-1)
        x = torch.matmul(attention, V)
        x = x.permute(0, 2, 1, 3).contiguous()
        x = x.view(bsz, -1, self.n_heads * (self.hid_dim // self.n_heads))
        x = self.fc(x)
        return x

class Feedforward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(Feedforward, self).__init__()
        # 两层线性变换和ReLU激活函数
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = torch.nn.functional.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.linear2(x)
        return x


class PositionalEncoding(nn.Module):
    "实现位置编码"

    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 初始化一个Shape为(max_len, d_model)的位置编码矩阵
        pe = torch.zeros(max_len, d_model).to(device)

        # 初始化一个tensor [[0, 1, 2, 3, ...]]
        position = torch.arange(0, max_len).unsqueeze(1)
        # 这里计算sin和cos中的频率项,通过e的指数进行变换
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)  # 偶数位置填充sin
        pe[:, 1::2] = torch.cos(position * div_term)  # 奇数位置填充cos

        pe = pe.unsqueeze(0)  # 为了方便批处理,增加一个可以unsqueeze出一个`batch`

        # 将pe注册为一个不参与梯度反传,但又希望保存在model的state_dict中的持久化buffer
        # 这个操作使得pe可以在forward中使用,但是不会被视为模型的一个可训练参数
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        将embedding后的inputs,例如(1, 7, 128),batch size为1, 7个单词,每个单词的嵌入为128
        """
        # 将x与positional encoding相加。
        x = x + self.pe[:, :x.size(1)].requires_grad_(False)
        return self.dropout(x)

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        # 自注意力层和前馈神经网络层初始化及残差连接和层归一化
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.feedforward = Feedforward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # 自注意力机制
        attn_output = self.self_attn(x, x, x, mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # 前馈神经网络
        ff_output = self.feedforward(x)
        x = x + self.dropout(ff_output)
        x = self.norm2(x)

        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        # 自注意力层和前馈神经网络层初始化及残差连接和层归一化
        self.self_attn = MultiHeadAttention(d_model, n_heads )
        self.enc_attn = MultiHeadAttention(d_model, n_heads )
        self.feedforward = Feedforward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, self_mask, context_mask):
        # 自注意力机制
        attn_output = self.self_attn(x, x, x, self_mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # 编码器-解码器注意力机制
        attn_output = self.enc_attn(x, enc_output, enc_output, context_mask)
        x = x + self.dropout(attn_output)
        x = self.norm2(x)

        # 前馈神经网络
        ff_output = self.feedforward(x)
        x = x + self.dropout(ff_output)
        x = self.norm3(x)

        return x
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout=0.1):
        super(Transformer, self).__init__()
        # Transformer模型包含嵌入层、位置编码、编码器和解码器层以及输出层
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_encoder_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_decoder_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, trg, src_mask, trg_mask):
        # 嵌入层和位置编码
        src = self.embedding(src)
        src = self.positional_encoding(src)
        trg = self.embedding(trg)
        trg = self.positional_encoding(trg)

        # 编码器层
        for layer in self.encoder_layers:
            src = layer(src, src_mask)

        # 解码器层
        for layer in self.decoder_layers:
            trg = layer(trg, src, trg_mask, src_mask)

        # 输出层
        output = self.fc_out(trg)
        return output

# 模型示例
vocab_size = 10000  # 假设词汇表大小为10000
d_model = 512
n_heads = 8
n_encoder_layers = 6
n_decoder_layers = 6
d_ff = 2048
dropout = 0.1

transformer_model = Transformer(vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout)

# 生成输入,这里的输入是随机的,需要根据实际情况修改
src = torch.randint(0, vocab_size, (32, 10))  # 假设源序列长度为10
trg = torch.randint(0, vocab_size, (32, 20))  # 假设目标序列长度为20

# 生成掩码,用于屏蔽序列中的填充位置
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)  # 源序列掩码
trg_mask = (trg != 0).unsqueeze(1).unsqueeze(2)  # 目标序列掩码

# 模型前向传播
output = transformer_model(src, trg, src_mask, trg_mask)
print(output.shape)

相关推荐

  1. SSD (Pytorch Ubuntu20.04

    2024-03-30 09:56:02       9 阅读
  2. NAS with RL时pytorch的相关问题

    2024-03-30 09:56:02       43 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-30 09:56:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-30 09:56:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-30 09:56:02       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-30 09:56:02       20 阅读

热门阅读

  1. hanlp的使用

    2024-03-30 09:56:02       15 阅读
  2. MongoDB集成springboot

    2024-03-30 09:56:02       17 阅读
  3. 怎么学习写代码?

    2024-03-30 09:56:02       20 阅读
  4. Redisson源码研究

    2024-03-30 09:56:02       21 阅读
  5. 2024.3.23-25@spring框架学习笔记

    2024-03-30 09:56:02       17 阅读
  6. rectangle2

    2024-03-30 09:56:02       15 阅读
  7. UE4 quest3平台网络不稳定问题排查及解决

    2024-03-30 09:56:02       28 阅读
  8. vscode插件

    2024-03-30 09:56:02       19 阅读
  9. Python数据库编程全指南SQLite和MySQL实践

    2024-03-30 09:56:02       18 阅读
  10. 【统计】什么事 R 方

    2024-03-30 09:56:02       20 阅读