NLP(15)-序列标注任务

前言

仅记录学习过程,有问题欢迎讨论

什么时候应该使用Pooling层:

  • 如果针对每个字做标注,无需;若是针对整句话做分类,则需要pooling

NER(数据标注):B/M/E (A/O/P) --左/中/右 边界(地址/机构/人名) O–无关字

CRF -转移矩阵: 如果一个字已经是B_location 那么它大概率是E_location 或者 M_location 和其他的BEM 基本无关

  • shape为 label * label

发射矩阵:shape为 sen_len * tag_size ;相当于输出每个字对应tag的概率矩阵

采用bert:

PERSON类实体,准确率:0.480000, 召回率: 0.208092, F1: 0.290318
LOCATION类实体,准确率:0.624113, 召回率: 0.458333, F1: 0.528524
TIME类实体,准确率:0.739130, 召回率: 0.625767, F1: 0.677736
ORGANIZATION类实体,准确率:0.500000, 召回率: 0.282353, F1: 0.360898
Macro-F1: 0.464369
Micro-F1 0.492606

采用LSTM:

PERSON类实体,准确率:0.432000, 召回率: 0.312139, F1: 0.362411
LOCATION类实体,准确率:0.512987, 召回率: 0.411458, F1: 0.456642
TIME类实体,准确率:0.721804, 召回率: 0.588957, F1: 0.648644
ORGANIZATION类实体,准确率:0.450000, 召回率: 0.423529, F1: 0.436359
Macro-F1: 0.476014
Micro-F1 0.479633
`

代码

实现一个NER代码,划分每一句话的B/M/E/O

config.py

"""
配置参数信息
"""
Config = {
    "model_path": "./output/",
    "model_name": "model.pt",
    "schema_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\schema.json",
    "train_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\train.txt",
    "valid_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\test.txt",
    "vocab_path": r"E:\Anlp\week9 序列标注问题\ner\chars.txt",
    "model_type": "bert",
    # 数据标注中计算loss
    "use_crf": True,
    # 文本向量大小
    "char_dim": 20,
    # 文本长度
    "max_len": 50,
    # 词向量大小
    "hidden_size": 64,
    # 训练 轮数
    "epoch_size": 15,
    # 批量大小
    "batch_size": 25,
    # 训练集大小
    "simple_size": 300,
    # 学习率
    "lr": 0.001,
    # dropout
    "dropout": 0.5,
    # 优化器
    "optimizer": "adam",
    # 卷积核
    "kernel_size": 3,
    # 最大池 or 平均池
    "pooling_style": "max",
    # 模型层数
    "num_layers": 2,
    "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",
    # 输出层大小
    "output_size": 9,
    # 随机数种子
    "seed": 987
}

load.py j加载数据文件

"""
数据加载
"""
"""
数据加载
"""
import os
import numpy as np
import json
import re
import os
import torch
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer


# 获取字表集
def load_vocab(path):
    vocab = {}
    with open(path, 'r', encoding='utf-8') as f:
        for index, line in enumerate(f):
            word = line.strip()
            # 0留给padding位置,所以从1开始
            vocab[word] = index + 1
        vocab['unk'] = len(vocab) + 1
    return vocab


class DataGenerator:
    def __init__(self, data_path, config):
        self.data_path = data_path
        self.config = config
        self.schema = self.load_schema(config["schema_path"])
        if self.config["model_type"] == "bert":
            self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        # 中文的语句list
        self.sentence_list = []
        self.data = self.load_data()


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def load_schema(self, path):
        with open(path, encoding="utf8") as f:
            return json.load(f)

    def load_data(self):
        dataset_x = []
        dataset_y = []
        with open(self.data_path, 'r', encoding='utf-8') as f:
            # 每句话
            segments = f.read().split("\n\n")
            # 每句话字符 如: 你 0
            for segment in segments:
                sentences = []
                labels = []
                for line in segment.split("\n"):

                    if line.strip() == "":
                        continue
                    char, label = line.split()
                    sentences.append(char)
                    labels.append(self.schema[label])
                self.sentence_list.append(' '.join(sentences))
                input_id = self.sentence_to_index(sentences)
                # labels 也需要padding相同长度
                labels = self.padding(labels)
                # 标签和文本组成一个样本
                dataset_x.append(input_id)
                dataset_y.append(labels)
            data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))

        return data

    # 文本预处理
    # 转化为向量
    def sentence_to_index(self, text):
        input_ids = []
        vocab = self.vocab

        for char in text:
            input_ids.append(vocab.get(char, vocab['unk']))
        # 填充or裁剪
        input_ids = self.padding(input_ids)
        return input_ids

    # 数据预处理 裁剪or填充
    def padding(self, input_ids):
        length = self.config["max_len"]
        if len(input_ids) >= length:
            return input_ids[:length]
        else:
            padded_input_ids = input_ids + [0] * (length - len(input_ids))
            return padded_input_ids


# 用torch自带的DataLoader类封装数据
def load_data_batch(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl


if __name__ == '__main__':
    from config import Config

    dg = DataGenerator(Config["train_data_path"], Config)
    print(len(dg))
    print(dg[0])

main.py 主方法

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from loader import load_data_batch
from evaluate import Evaluator

# [DEBUG, INFO, WARNING, ERROR, CRITICAL]


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""
# 通过设置随机种子来复现上一次的结果(避免随机性)
seed = Config["seed"]
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)


def main(config):
    # 保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])
    # 加载数据
    dataset = load_data_batch(config["train_data_path"], config)
    # 加载模型
    model = TorchModel(config)
    # 是否使用gpu
    if torch.cuda.is_available():
        logger.info("gpu可以使用,迁移模型至gpu")
        model.cuda()
    # 选择优化器
    optim = choose_optimizer(config, model)
    # 加载效果测试类
    evaluator = Evaluator(config, model, logger)
    for epoch in range(config["epoch_size"]):
        epoch += 1
        logger.info("epoch %d begin" % epoch)
        epoch_loss = []
        # 训练模型
        model.train()
        for index, batch_data in enumerate(dataset):
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            # x, y = dataiter
            # 反向传播
            optim.zero_grad()
            x, y = batch_data     # 输入变化时这里需要修改,比如多输入,多输出的情况
            # 计算梯度
            loss = model(x, y)
            # 梯度更新
            loss.backward()
            # 优化器更新模型
            optim.step()
            # 记录损失
            epoch_loss.append(loss.item())
        logger.info("epoch average loss: %f" % np.mean(epoch_loss))
        # 测试模型效果
        acc = evaluator.eval(epoch)
    # 可以用model_type model_path epoch 三个参数来保存模型
    # model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))
    # torch.save(model.state_dict(), model_path)  # 保存模型权重
    return


if __name__ == "__main__":
    main(Config)

    # for model in ["cnn"]:
    #     Config["model_type"] = model
    #     print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])

    # 对比所有模型
    # 中间日志可以关掉,避免输出过多信息
    # 超参数的网格搜索
    # for model in ["gated_cnn"]:
    #     Config["model_type"] = model
    #     for lr in [1e-3, 1e-4]:
    #         Config["learning_rate"] = lr
    #         for hidden_size in [128]:
    #             Config["hidden_size"] = hidden_size
    #             for batch_size in [64, 128]:
    #                 Config["batch_size"] = batch_size
    #                 for pooling_style in ["avg"]:
    #                     Config["pooling_style"] = pooling_style
    # 可以把输出放入文件中 便于查看
    #                     print("最后一轮准确率:", main(Config), "当前配置:", Config)


evaluate.py 评估模型文件

"""
模型效果测试
"""
import re
from collections import defaultdict

import numpy as np
import torch
from loader import load_data_batch


class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        # 选择验证集合
        self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)
        # self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        # 测试模式
        self.model.eval()
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"LOCATION": defaultdict(int),
                           "TIME": defaultdict(int),
                           "PERSON": defaultdict(int),
                           "ORGANIZATION": defaultdict(int)}
        for index, batch_data in enumerate(self.dataset):
            # 取batch_size 句话
            sentences = self.dataset.dataset.sentence_list[
                        index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data
            with torch.no_grad():
                pred_results = self.model(input_id)  # 不输入labels,使用模型当前参数进行预测
            self.write_stats(labels, pred_results, sentences)
            self.show_stats()
        return

    def write_stats(self, labels, pred_results, sentences):
        assert len(labels) == len(pred_results) == len(sentences)
        if not self.config['use_crf']:
            pred_results = torch.argmax(pred_results, dim=-1)
        for true_label, pred_label, sentence in zip(labels, pred_results, sentences):
            if not self.config["use_crf"]:
                pred_label = pred_label.cpu().detach().tolist()
            true_label = true_label.cpu().detach().tolist()
            true_entities = self.decode(sentence, true_label)
            pred_entities = self.decode(sentence, pred_label)
            # 正确率 = 识别出的正确实体数 / 识别出的实体数
            # 召回率 = 识别出的正确实体数 / 样本的实体数
            for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
                self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])
                self.stats_dict[key]["样本实体数"] += len(true_entities[key])
                self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])
        return

    def show_stats(self):
        F1_scores = []
        for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
            # 正确率 = 识别出的正确实体数 / 识别出的实体数
            # 召回率 = 识别出的正确实体数 / 样本的实体数
            precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])
            recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])
            F1 = (2 * precision * recall) / (precision + recall + 1e-5)
            F1_scores.append(F1)
            self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))
        self.logger.info("Macro-F1: %f" % np.mean(F1_scores))
        correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        micro_precision = correct_pred / (total_pred + 1e-5)
        micro_recall = correct_pred / (true_enti + 1e-5)
        micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)
        self.logger.info("Micro-F1 %f" % micro_f1)
        self.logger.info("--------------------")
        return

    # 相当于截取对应的句子
    def decode(self, sentence, labels):
        labels = "".join([str(x) for x in labels[:len(sentence)]])
        results = defaultdict(list)
        for location in re.finditer("(04+)", labels):
            s, e = location.span()
            results["LOCATION"].append(sentence[s:e])
        for location in re.finditer("(15+)", labels):
            s, e = location.span()
            results["ORGANIZATION"].append(sentence[s:e])
        for location in re.finditer("(26+)", labels):
            s, e = location.span()
            results["PERSON"].append(sentence[s:e])
        for location in re.finditer("(37+)", labels):
            s, e = location.span()
            results["TIME"].append(sentence[s:e])
        return results

model.py

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from transformers import BertModel
from torchcrf import CRF

"""
建立网络模型结构
"""


class TorchModel(nn.Module):
    def __init__(self, config):
        super(TorchModel, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        output_size = config["output_size"]
        self.model_type = config["model_type"]
        num_layers = config["num_layers"]
        # self.use_bert = config["use_bert"]
        self.use_crf = config["use_crf"]

        self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)
        if self.model_type == 'rnn':
            self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,
                                  batch_first=True)
        elif self.model_type == 'lstm':
            # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)
            self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)
            hidden_size = hidden_size * 2

        elif self.model_type == 'bert':
            self.encoder = BertModel.from_pretrained(config["bert_model_path"])
            # 需要使用预训练模型的hidden_size
            hidden_size = self.encoder.config.hidden_size
        elif self.model_type == 'cnn':
            self.encoder = CNN(config)
        elif self.model_type == "gated_cnn":
            self.encoder = GatedCNN(config)
        elif self.model_type == "bert_lstm":
            self.encoder = BertLSTM(config)
            # 需要使用预训练模型的hidden_size
            hidden_size = self.encoder.config.hidden_size
        self.classify = nn.Linear(hidden_size, output_size)
        self.pooling_style = config["pooling_style"]
        self.crf_layer = CRF(output_size, batch_first=True)

        self.loss = nn.functional.cross_entropy  # loss采用交叉熵损失

    def forward(self, x, y=None):
        if self.model_type == 'bert':
            # 输入x为[batch_size, seq_len]
            # bert返回的结果是 (sequence_output, pooler_output)
            # sequence_output:batch_size, max_len, hidden_size
            # pooler_output:batch_size, hidden_size
            x = self.encoder(x)[0]
        else:
            x = self.emb(x)
            x = self.encoder(x)
        # 判断x是否是tuple
        if isinstance(x, tuple):
            x = x[0]

        # # 池化层
        # if self.pooling_style == "max":
        #     # shape[1]代表列数,shape是行和列数构成的元组
        #     self.pooling_style = nn.MaxPool1d(x.shape[1])
        # elif self.pooling_style == "avg":
        #     self.pooling_style = nn.AvgPool1d(x.shape[1])
        # x = self.pooling_style(x.transpose(1, 2)).squeeze()

        y_pred = self.classify(x)
        if y is not None:
            # 是否使用crf:
            if self.use_crf:
                mask = y.gt(-1)
                return - self.crf_layer(y_pred, y, mask, reduction="mean")
            else:
                # (number, class_num), (number)
                return self.loss(y_pred.view(-1, y.shape[-1]), y.view(-1))
        else:
            if self.use_crf:
                return self.crf_layer.decode(y_pred)
            else:
                return y_pred

# 优化器的选择
def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["lr"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)


# 定义CNN模型
class CNN(nn.Module):
    def __init__(self, config):
        super(CNN, self).__init__()
        hidden_size = config["hidden_size"]
        kernel_size = config["kernel_size"]
        pad = int((kernel_size - 1) / 2)
        self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)

    def forward(self, x):  # x : (batch_size, max_len, embeding_size)
        return self.cnn(x.transpose(1, 2)).transpose(1, 2)


# 定义GatedCNN模型
class GatedCNN(nn.Module):
    def __init__(self, config):
        super(GatedCNN, self).__init__()
        self.cnn = CNN(config)
        self.gate = CNN(config)

    # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积
    def forward(self, x):
        a = self.cnn(x)
        b = self.gate(x)
        b = torch.sigmoid(b)
        return torch.mul(a, b)


# 定义BERT-LSTM模型
class BertLSTM(nn.Module):
    def __init__(self, config):
        super(BertLSTM, self).__init__()
        self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)
        self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)

    def forward(self, x):
        x = self.bert(x)[0]
        x, _ = self.rnn(x)
        return x

# if __name__ == "__main__":
#     from config import Config
#
#     Config["output_size"] = 2
#     Config["vocab_size"] = 20
#     Config["max_length"] = 5
#     Config["model_type"] = "bert"
#     Config["use_bert"] = True
#     # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
#     x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
#     # sequence_output, pooler_output = model(x)
#     # print(x[1], type(x[2]), len(x[2]))
#
#     model = TorchModel(Config)
#     label = torch.LongTensor([0,1])
#     print(model(x, label))

相关推荐

  1. NLP(15)-序列标注任务

    2024-05-14 18:18:08       23 阅读
  2. NLP13)--文本分类任务

    2024-05-14 18:18:08       32 阅读
  3. NLP(14)--文本匹配任务

    2024-05-14 18:18:08       33 阅读
  4. 序列标注任务 - CRF条件随机场

    2024-05-14 18:18:08       19 阅读
  5. NLP10 NLP总结

    2024-05-14 18:18:08       26 阅读
  6. 昇思25天学习打卡营第18天 | LSTM+CRF序列标注

    2024-05-14 18:18:08       23 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-05-14 18:18:08       91 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-05-14 18:18:08       97 阅读
  3. 在Django里面运行非项目文件

    2024-05-14 18:18:08       78 阅读
  4. Python语言-面向对象

    2024-05-14 18:18:08       88 阅读

热门阅读

  1. 单链表与双链表

    2024-05-14 18:18:08       25 阅读
  2. 蓝桥杯单片机组——国赛1 各模块的基础模板

    2024-05-14 18:18:08       29 阅读
  3. 微信小程序-禁止页面下拉回弹

    2024-05-14 18:18:08       31 阅读
  4. Frida逆向与利用自动化

    2024-05-14 18:18:08       33 阅读
  5. NIUKE SQL:大厂面试真题(四) 【某滴打车】

    2024-05-14 18:18:08       30 阅读
  6. 回溯算法(Backtracking Algorithm)

    2024-05-14 18:18:08       28 阅读
  7. react生命周期及用法

    2024-05-14 18:18:08       23 阅读
  8. 【贪心算法】【Python实现】最优装载问题

    2024-05-14 18:18:08       28 阅读