相关知识
BIO标注
将实体的开头标注为B,实体的其他部分标注为I,非实体标注为O.
序列标注
指的是给输入序列的每个token标注标签的过程。应用包括分词、词性标注、命名实体识别等。
条件随机场Conditional random field,CRF
随机场是由若干个位置组合成的整体。当给每一个位置按照某种分布随机赋予一个值之后,整体就叫做随机场。那词性标注来举例子,一个句子有十个词语,那这个句子在随机选择完对应的词性就形成了一个随机场。
马尔科夫随机场指随机场里的某一个位置的赋值只和他相邻位置的赋值有关,而和不相邻位置的无关。
条件随机场是马尔可夫随机场的特例。它假设马尔可夫随机场只有两种变量X和Y。X一般是给定的,Y是在给定X的条件下我们的输出。在词性标注的例子里, X是词,Y是词性,用数学语言描述CRF就是:
设X与Y是随机变量,P(Y|X)是给定X时Y的条件概率分布,若随机变量Y构成的是一个马尔科夫随机场,则称条件概率分布P(Y|X)是条件随机场。因此CRF是一种概率图模型。
输出序列y的概率为:
P ( y ∣ x ) = exp ( Score ( x , y ) ) ∑ y ′ ∈ Y exp ( Score ( x , y ′ ) ) ( 1 ) \begin{align}P(y|x) = \frac{\exp{(\text{Score}(x, y)})}{\sum_{y' \in Y} \exp{(\text{Score}(x, y')})} \qquad (1)\end{align} P(y∣x)=∑y′∈Yexp(Score(x,y′))exp(Score(x,y))(1)
这里的score是衡量序列与标签之间的关系的。
设 x i x_i xi, y i y_i yi为序列的第 i i i个Token和对应的标签,则 Score \text{Score} Score需要能够在计算 x i x_i xi和 y i y_i yi的映射的同时,捕获相邻标签 y i − 1 y_{i-1} yi−1和 y i y_{i} yi之间的关系,因此我们定义两个概率函数:
- 发射概率函数 ψ EMIT \psi_\text{EMIT} ψEMIT:表示 x i → y i x_i \rightarrow y_i xi→yi的概率。这个指输入序列$x_i 被标注为标签 被标注为标签 被标注为标签y_i$的概率
- 转移概率函数 ψ TRANS \psi_\text{TRANS} ψTRANS:表示 y i − 1 → y i y_{i-1} \rightarrow y_i yi−1→yi的概率。这里描述的是输出标签之阿吉你的相互依赖关系。
则可以得到 Score \text{Score} Score的计算公式:
Score ( x , y ) = ∑ i log ψ EMIT ( x i → y i ) + log ψ TRANS ( y i − 1 → y i ) ( 2 ) \begin{align}\text{Score}(x,y) = \sum_i \log \psi_\text{EMIT}(x_i \rightarrow y_i) + \log \psi_\text{TRANS}(y_{i-1} \rightarrow y_i) \qquad (2)\end{align} Score(x,y)=i∑logψEMIT(xi→yi)+logψTRANS(yi−1→yi)(2)
设标签集合为 T T T,构造大小为 ∣ T ∣ x ∣ T ∣ |T|x|T| ∣T∣x∣T∣的矩阵 P \textbf{P} P,用于存储标签间的转移概率;由编码层(可以为Dense、LSTM等)输出的隐状态 h h h可以直接视作发射概率,此时 Score \text{Score} Score的计算公式可以转化为:
Score ( x , y ) = ∑ i h i [ y i ] + P y i − 1 , y i ( 3 ) \begin{align}\text{Score}(x,y) = \sum_i h_i[y_i] + \textbf{P}_{y_{i-1}, y_{i}} \qquad (3)\end{align} Score(x,y)=i∑hi[yi]+Pyi−1,yi(3)
设标签集合为 T T T,构造大小为 ∣ T ∣ x ∣ T ∣ |T|x|T| ∣T∣x∣T∣的矩阵 P \textbf{P} P,用于存储标签间的转移概率;由编码层输出的隐状态 h h h可以直接视作发射概率,此时 Score \text{Score} Score的计算公式可以转化为:
Score ( x , y ) = ∑ i h i [ y i ] + P y i − 1 , y i ( 3 ) \begin{align}\text{Score}(x,y) = \sum_i h_i[y_i] + \textbf{P}_{y_{i-1}, y_{i}} \qquad (3)\end{align} Score(x,y)=i∑hi[yi]+Pyi−1,yi(3)
损失函数计算
分类问题常用的损失函数是负对数似然函数(Negative Log Likelihood)
Loss = − l o g ( P ( y ∣ x ) ) ( 4 ) \begin{align}\text{Loss} = -log(P(y|x)) \qquad (4)\end{align} Loss=−log(P(y∣x))(4)
带入到第一个公式可得:
Loss = − l o g ( exp ( Score ( x , y ) ) ∑ y ′ ∈ Y exp ( Score ( x , y ′ ) ) ) ( 5 ) \begin{align}\text{Loss} = -log(\frac{\exp{(\text{Score}(x, y)})}{\sum_{y' \in Y} \exp{(\text{Score}(x, y')})}) \qquad (5)\end{align} Loss=−log(∑y′∈Yexp(Score(x,y′))exp(Score(x,y)))(5)
= l o g ( ∑ y ′ ∈ Y exp ( Score ( x , y ′ ) ) − Score ( x , y ) \begin{align}= log(\sum_{y' \in Y} \exp{(\text{Score}(x, y')}) - \text{Score}(x, y) \end{align} =log(y′∈Y∑exp(Score(x,y′))−Score(x,y)
我们称被减数为Normalizer,减数为Score,分别实现后相减得到最终Loss。
代码实现
Score计算
其中mask是掩码矩阵,将多个序列打包为一个Batch时填充的值忽略,使得ScoreScore计算仅包含有效的Token。
def compute_score(emissions, tags, seq_ends, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# tags: (seq_length, batch_size)
# mask: (seq_length, batch_size)
seq_length, batch_size = tags.shape
mask = mask.astype(emissions.dtype)
# 将score设置为初始转移概率
# shape: (batch_size,)
score = start_trans[tags[0]]
# score += 第一次发射概率
# shape: (batch_size,)
score += emissions[0, mnp.arange(batch_size), tags[0]]
for i in range(1, seq_length):
# 标签由i-1转移至i的转移概率(当mask == 1时有效)
# shape: (batch_size,)
score += trans[tags[i - 1], tags[i]] * mask[i]
# 预测tags[i]的发射概率(当mask == 1时有效)
# shape: (batch_size,)
score += emissions[i, mnp.arange(batch_size), tags[i]] * mask[i]
# 结束转移
# shape: (batch_size,)
last_tags = tags[seq_ends, mnp.arange(batch_size)]
# score += 结束转移概率
# shape: (batch_size,)
score += end_trans[last_tags]
return score
Normalizer计算
Normalizer是 𝑥 对应的所有可能的输出序列的Score的对数指数和。
假设需要计算从第 0 0 0至第 i i i个Token所有可能的输出序列得分 Score i \text{Score}_{i} Scorei,则可以先计算出从第 0 0 0至第 i − 1 i-1 i−1个Token所有可能的输出序列得分 Score i − 1 \text{Score}_{i-1} Scorei−1。因此,Normalizer可以改写为以下形式:
l o g ( ∑ y 0 , i ′ ∈ Y exp ( Score i ) ) = l o g ( ∑ y 0 , i − 1 ′ ∈ Y exp ( Score i − 1 + h i + P ) ) ( 6 ) log(\sum_{y'_{0,i} \in Y} \exp{(\text{Score}_i})) = log(\sum_{y'_{0,i-1} \in Y} \exp{(\text{Score}_{i-1} + h_{i} + \textbf{P}})) \qquad (6) log(y0,i′∈Y∑exp(Scorei))=log(y0,i−1′∈Y∑exp(Scorei−1+hi+P))(6)
其中 h i h_i hi为第 i i i个Token的发射概率, P \textbf{P} P是转移矩阵。由于发射概率矩阵 h h h和转移概率矩阵 P \textbf{P} P独立于 y y y的序列路径计算,可以将其提出,可得:
l o g ( ∑ y 0 , i ′ ∈ Y exp ( Score i ) ) = l o g ( ∑ y 0 , i − 1 ′ ∈ Y exp ( Score i − 1 ) ) + h i + P ( 7 ) log(\sum_{y'_{0,i} \in Y} \exp{(\text{Score}_i})) = log(\sum_{y'_{0,i-1} \in Y} \exp{(\text{Score}_{i-1}})) + h_{i} + \textbf{P} \qquad (7) log(y0,i′∈Y∑exp(Scorei))=log(y0,i−1′∈Y∑exp(Scorei−1))+hi+P(7)
def compute_normalizer(emissions, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# mask: (seq_length, batch_size)
seq_length = emissions.shape[0]
# 将score设置为初始转移概率,并加上第一次发射概率
# shape: (batch_size, num_tags)
score = start_trans + emissions[0]
for i in range(1, seq_length):
# 扩展score的维度用于总score的计算
# shape: (batch_size, num_tags, 1)
broadcast_score = score.expand_dims(2)
# 扩展emission的维度用于总score的计算
# shape: (batch_size, 1, num_tags)
broadcast_emissions = emissions[i].expand_dims(1)
# 根据最后一个公式,计算score_i
# 此时broadcast_score是由第0个到当前Token所有可能路径
# 对应score的log_sum_exp
# shape: (batch_size, num_tags, num_tags)
next_score = broadcast_score + trans + broadcast_emissions
# 对score_i做log_sum_exp运算,用于下一个Token的score计算
# shape: (batch_size, num_tags)
next_score = ops.logsumexp(next_score, axis=1)
# 当mask == 1时,score才会变化
# shape: (batch_size, num_tags)
score = mnp.where(mask[i].expand_dims(1), next_score, score)
# 最后加结束转移概率
# shape: (batch_size, num_tags)
score += end_trans
# 对所有可能的路径得分求log_sum_exp
# shape: (batch_size,)
return ops.logsumexp(score, axis=1)
Viterbi算法
该算法用于求解序列最优路径。
取得最大概率得分 Score \text{Score} Score,以及每个Token对应的标签历史 History \text{History} History后,根据Viterbi算法可以得到公式:
P 0 , i = m a x ( P 0 , i − 1 ) + P i − 1 , i P_{0,i} = max(P_{0, i-1}) + P_{i-1, i} P0,i=max(P0,i−1)+Pi−1,i
从第0个至第 i i i个Token对应概率最大的序列,只需要考虑从第0个至第 i − 1 i-1 i−1个Token对应概率最大的序列,以及从第 i i i个至第 i − 1 i-1 i−1个概率最大的标签即可。因此我们逆序求解每一个概率最大的标签,构成最佳的预测序列。
def viterbi_decode(emissions, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# mask: (seq_length, batch_size)
seq_length = mask.shape[0]
score = start_trans + emissions[0]
history = ()
for i in range(1, seq_length):
broadcast_score = score.expand_dims(2)
broadcast_emission = emissions[i].expand_dims(1)
next_score = broadcast_score + trans + broadcast_emission
# 求当前Token对应score取值最大的标签,并保存
indices = next_score.argmax(axis=1)
history += (indices,)
next_score = next_score.max(axis=1)
score = mnp.where(mask[i].expand_dims(1), next_score, score)
score += end_trans
return score, history
def post_decode(score, history, seq_length):
# 使用Score和History计算最佳预测序列
batch_size = seq_length.shape[0]
seq_ends = seq_length - 1
# shape: (batch_size,)
best_tags_list = []
# 依次对一个Batch中每个样例进行解码
for idx in range(batch_size):
# 查找使最后一个Token对应的预测概率最大的标签,
# 并将其添加至最佳预测序列存储的列表中
best_last_tag = score[idx].argmax(axis=0)
best_tags = [int(best_last_tag.asnumpy())]
# 重复查找每个Token对应的预测概率最大的标签,加入列表
for hist in reversed(history[:seq_ends[idx]]):
best_last_tag = hist[idx][best_tags[-1]]
best_tags.append(int(best_last_tag.asnumpy()))
# 将逆序求解的序列标签重置为正序
best_tags.reverse()
best_tags_list.append(best_tags)
return best_tags_list
搭建CRF层
def sequence_mask(seq_length, max_length, batch_first=False):
"""根据序列实际长度和最大长度生成mask矩阵"""
range_vector = mnp.arange(0, max_length, 1, seq_length.dtype)
result = range_vector < seq_length.view(seq_length.shape + (1,))
if batch_first:
return result.astype(ms.int64)
return result.astype(ms.int64).swapaxes(0, 1)
class CRF(nn.Cell):
def __init__(self, num_tags: int, batch_first: bool = False, reduction: str = 'sum') -> None:
if num_tags <= 0:
raise ValueError(f'invalid number of tags: {num_tags}')
super().__init__()
if reduction not in ('none', 'sum', 'mean', 'token_mean'):
raise ValueError(f'invalid reduction: {reduction}')
self.num_tags = num_tags
self.batch_first = batch_first
self.reduction = reduction
self.start_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='start_transitions')
self.end_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='end_transitions')
self.transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags, num_tags)), name='transitions')
def construct(self, emissions, tags=None, seq_length=None):
if tags is None:
return self._decode(emissions, seq_length)
return self._forward(emissions, tags, seq_length)
def _forward(self, emissions, tags=None, seq_length=None):
if self.batch_first:
batch_size, max_length = tags.shape
emissions = emissions.swapaxes(0, 1)
tags = tags.swapaxes(0, 1)
else:
max_length, batch_size = tags.shape
if seq_length is None:
seq_length = mnp.full((batch_size,), max_length, ms.int64)
mask = sequence_mask(seq_length, max_length)
# shape: (batch_size,)
numerator = compute_score(emissions, tags, seq_length-1, mask, self.transitions, self.start_transitions, self.end_transitions)
# shape: (batch_size,)
denominator = compute_normalizer(emissions, mask, self.transitions, self.start_transitions, self.end_transitions)
# shape: (batch_size,)
llh = denominator - numerator
if self.reduction == 'none':
return llh
if self.reduction == 'sum':
return llh.sum()
if self.reduction == 'mean':
return llh.mean()
return llh.sum() / mask.astype(emissions.dtype).sum()
def _decode(self, emissions, seq_length=None):
if self.batch_first:
batch_size, max_length = emissions.shape[:2]
emissions = emissions.swapaxes(0, 1)
else:
batch_size, max_length = emissions.shape[:2]
if seq_length is None:
seq_length = mnp.full((batch_size,), max_length, ms.int64)
mask = sequence_mask(seq_length, max_length)
return viterbi_decode(emissions, mask, self.transitions, self.start_transitions, self.end_transitions)
构建双向LSTM+CRF模型
这里的模型结构使用 nn.Embedding -> nn.LSTM -> nn.Dense -> CRF
来完成实体识别任务。
其中LSTM提取序列特征,经过Dense层变换获得发射概率矩阵,最后送入CRF层。
class BiLSTM_CRF(nn.Cell):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags, padding_idx=0):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, bidirectional=True, batch_first=True)
self.hidden2tag = nn.Dense(hidden_dim, num_tags, 'he_uniform')
self.crf = CRF(num_tags, batch_first=True)
def construct(self, inputs, seq_length, tags=None):
embeds = self.embedding(inputs)
outputs, _ = self.lstm(embeds, seq_length=seq_length)
feats = self.hidden2tag(outputs)
crf_outs = self.crf(feats, tags, seq_length)
return crf_outs
构建词表和标签表
embedding_dim = 16
hidden_dim = 32
training_data = [(
"清 华 大 学 坐 落 于 首 都 北 京".split(),
"B I I I O O O O O B I".split()
), (
"重 庆 是 一 个 魔 幻 城 市".split(),
"B I O O O O O O O".split()
)]
word_to_idx = {}
word_to_idx['<pad>'] = 0
for sentence, tags in training_data:
for word in sentence:
if word not in word_to_idx:
word_to_idx[word] = len(word_to_idx)
tag_to_idx = {"B": 0, "I": 1, "O": 2}
实例化模型
model = BiLSTM_CRF(len(word_to_idx), embedding_dim, hidden_dim, len(tag_to_idx))
optimizer = nn.SGD(model.trainable_params(), learning_rate=0.01, weight_decay=1e-4)
grad_fn = ms.value_and_grad(model, None, optimizer.parameters)
def train_step(data, seq_length, label):
loss, grads = grad_fn(data, seq_length, label)
optimizer(grads)
return loss
构建数据
将生成的数据打包成Batch,按照序列最大长度,对长度不足的序列进行填充,分别返回输入序列、输出标签和序列长度构成的Tensor。
def prepare_sequence(seqs, word_to_idx, tag_to_idx):
seq_outputs, label_outputs, seq_length = [], [], []
max_len = max([len(i[0]) for i in seqs])
for seq, tag in seqs:
seq_length.append(len(seq))
idxs = [word_to_idx[w] for w in seq]
labels = [tag_to_idx[t] for t in tag]
idxs.extend([word_to_idx['<pad>'] for i in range(max_len - len(seq))])
labels.extend([tag_to_idx['O'] for i in range(max_len - len(seq))])
seq_outputs.append(idxs)
label_outputs.append(labels)
return ms.Tensor(seq_outputs, ms.int64), \
ms.Tensor(label_outputs, ms.int64), \
ms.Tensor(seq_length, ms.int64)
模型预测
data, label, seq_length = prepare_sequence(training_data, word_to_idx, tag_to_idx)
data.shape, label.shape, seq_length.shape
from tqdm import tqdm
steps = 500
with tqdm(total=steps) as t:
for i in range(steps):
loss = train_step(data, seq_length, label)
t.set_postfix(loss=loss)
t.update(1)
score, history = model(data, seq_length)
Score
Tensor(shape=[2, 3], dtype=Float32, value= [[ 3.15928860e+01,
3.63119812e+01, 3.17248516e+01], [ 2.81416149e+01, 2.61749763e+01, 3.24760780e+01]])
predict = post_decode(score, history, seq_length)
Predict
[[0, 1, 1, 1, 2, 2, 2, 2, 2, 0, 1], [0, 1, 2, 2, 2, 2, 2, 2, 2]]
idx_to_tag = {idx: tag for tag, idx in tag_to_idx.items()}
def sequence_to_tag(sequences, idx_to_tag):
outputs = []
for seq in sequences:
outputs.append([idx_to_tag[i] for i in seq])
return outputs
sequence_to_tag(predict, idx_to_tag)
[[‘B’, ‘I’, ‘I’, ‘I’, ‘O’, ‘O’, ‘O’, ‘O’, ‘O’, ‘B’, ‘I’],
[‘B’, ‘I’, ‘O’, ‘O’, ‘O’, ‘O’, ‘O’, ‘O’, ‘O’]]
总结
本章使用LSTM和CRF完成了序列标注任务