|
from transformers import PreTrainedModel
|
|
|
|
import math
|
|
import pandas as pd
|
|
import torch
|
|
from torch import nn
|
|
from d2l import torch as d2l
|
|
from transformers import PretrainedConfig
|
|
|
|
|
|
|
|
class PositionWiseFFN(nn.Module):
|
|
"""基于位置的前馈网络"""
|
|
def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
|
|
**kwargs):
|
|
super(PositionWiseFFN, self).__init__(**kwargs)
|
|
self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
|
|
self.relu = nn.ReLU()
|
|
self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)
|
|
|
|
def forward(self, X):
|
|
return self.dense2(self.relu(self.dense1(X)))
|
|
|
|
def transpose_qkv(X, num_heads):
|
|
"""为了多注意力头的并行计算而变换形状
|
|
|
|
Defined in :numref:`sec_multihead-attention`"""
|
|
|
|
|
|
|
|
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
|
|
|
|
|
|
|
|
X = X.permute(0, 2, 1, 3)
|
|
|
|
|
|
|
|
return X.reshape(-1, X.shape[2], X.shape[3])
|
|
|
|
def transpose_output(X, num_heads):
|
|
"""逆转transpose_qkv函数的操作
|
|
|
|
Defined in :numref:`sec_multihead-attention`"""
|
|
X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
|
|
X = X.permute(0, 2, 1, 3)
|
|
return X.reshape(X.shape[0], X.shape[1], -1)
|
|
|
|
def sequence_mask(X, valid_len, value=0):
|
|
"""在序列中屏蔽不相关的项
|
|
|
|
Defined in :numref:`sec_seq2seq_decoder`"""
|
|
maxlen = X.size(1)
|
|
mask = torch.arange((maxlen), dtype=torch.float32,
|
|
device=X.device)[None, :] < valid_len[:, None]
|
|
X[~mask] = value
|
|
return X
|
|
|
|
def masked_softmax(X, valid_lens):
|
|
"""通过在最后一个轴上掩蔽元素来执行softmax操作
|
|
|
|
Defined in :numref:`sec_attention-scoring-functions`"""
|
|
|
|
if valid_lens is None:
|
|
return nn.functional.softmax(X, dim=-1)
|
|
else:
|
|
shape = X.shape
|
|
if valid_lens.dim() == 1:
|
|
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
|
|
else:
|
|
valid_lens = valid_lens.reshape(-1)
|
|
|
|
X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens,
|
|
value=-1e4)
|
|
return nn.functional.softmax(X.reshape(shape), dim=-1)
|
|
|
|
class DotProductAttention(nn.Module):
|
|
"""缩放点积注意力
|
|
|
|
Defined in :numref:`subsec_additive-attention`"""
|
|
def __init__(self, dropout, **kwargs):
|
|
super(DotProductAttention, self).__init__(**kwargs)
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self, queries, keys, values, valid_lens=None):
|
|
d = queries.shape[-1]
|
|
|
|
scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
|
|
self.attention_weights = masked_softmax(scores, valid_lens)
|
|
return torch.bmm(self.dropout(self.attention_weights), values)
|
|
|
|
class MultiHeadAttention(nn.Module):
|
|
"""多头注意力
|
|
|
|
Defined in :numref:`sec_multihead-attention`"""
|
|
def __init__(self, key_size, query_size, value_size, num_hiddens,
|
|
num_heads, dropout, bias=False, **kwargs):
|
|
super(MultiHeadAttention, self).__init__(**kwargs)
|
|
self.num_heads = num_heads
|
|
self.attention = DotProductAttention(dropout)
|
|
self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
|
|
self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
|
|
self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
|
|
self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
|
|
|
|
def forward(self, queries, keys, values, valid_lens):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
queries = transpose_qkv(self.W_q(queries), self.num_heads)
|
|
keys = transpose_qkv(self.W_k(keys), self.num_heads)
|
|
values = transpose_qkv(self.W_v(values), self.num_heads)
|
|
|
|
if valid_lens is not None:
|
|
|
|
|
|
valid_lens = torch.repeat_interleave(
|
|
valid_lens, repeats=self.num_heads, dim=0)
|
|
|
|
|
|
|
|
output = self.attention(queries, keys, values, valid_lens)
|
|
|
|
|
|
output_concat = transpose_output(output, self.num_heads)
|
|
return self.W_o(output_concat)
|
|
|
|
|
|
|
|
|
|
class AddNorm(nn.Module):
|
|
"""残差连接后进行层规范化"""
|
|
def __init__(self, normalized_shape, dropout, **kwargs):
|
|
super(AddNorm, self).__init__(**kwargs)
|
|
self.dropout = nn.Dropout(dropout)
|
|
self.ln = nn.LayerNorm(normalized_shape)
|
|
|
|
def forward(self, X, Y):
|
|
return self.ln(self.dropout(Y) + X)
|
|
|
|
|
|
|
|
class EncoderBlock(nn.Module):
|
|
"""Transformer编码器块"""
|
|
def __init__(self, key_size, query_size, value_size, num_hiddens,
|
|
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
|
|
dropout, use_bias=False, **kwargs):
|
|
super(EncoderBlock, self).__init__(**kwargs)
|
|
self.attention = MultiHeadAttention(
|
|
key_size, query_size, value_size, num_hiddens, num_heads, dropout,
|
|
use_bias)
|
|
self.addnorm1 = AddNorm(norm_shape, dropout)
|
|
self.ffn = PositionWiseFFN(
|
|
ffn_num_input, ffn_num_hiddens, num_hiddens)
|
|
self.addnorm2 = AddNorm(norm_shape, dropout)
|
|
|
|
def forward(self, X, valid_lens):
|
|
Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
|
|
return self.addnorm2(Y, self.ffn(Y))
|
|
|
|
class PositionalEncoding(nn.Module):
|
|
"""位置编码
|
|
|
|
Defined in :numref:`sec_self-attention-and-positional-encoding`"""
|
|
def __init__(self, num_hiddens, dropout, max_len=1000):
|
|
super(PositionalEncoding, self).__init__()
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
|
self.P = torch.zeros((1, max_len, num_hiddens))
|
|
X = torch.arange(max_len, dtype=torch.float32).reshape(
|
|
-1, 1) / torch.pow(10000, torch.arange(
|
|
0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
|
|
self.P[:, :, 0::2] = torch.sin(X)
|
|
self.P[:, :, 1::2] = torch.cos(X)
|
|
|
|
def forward(self, X):
|
|
X = X + self.P[:, :X.shape[1], :].to(X.device)
|
|
return self.dropout(X)
|
|
|
|
class Encoder(nn.Module):
|
|
"""编码器-解码器架构的基本编码器接口"""
|
|
def __init__(self, **kwargs):
|
|
super(Encoder, self).__init__(**kwargs)
|
|
|
|
def forward(self, X, *args):
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
class DecoderBlock(nn.Module):
|
|
"""解码器中第i个块"""
|
|
def __init__(self, key_size, query_size, value_size, num_hiddens,
|
|
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
|
|
dropout, i, **kwargs):
|
|
super(DecoderBlock, self).__init__(**kwargs)
|
|
self.i = i
|
|
self.attention1 = MultiHeadAttention(
|
|
key_size, query_size, value_size, num_hiddens, num_heads, dropout)
|
|
self.addnorm1 = AddNorm(norm_shape, dropout)
|
|
self.attention2 = MultiHeadAttention(
|
|
key_size, query_size, value_size, num_hiddens, num_heads, dropout)
|
|
self.addnorm2 = AddNorm(norm_shape, dropout)
|
|
self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens,
|
|
num_hiddens)
|
|
self.addnorm3 = AddNorm(norm_shape, dropout)
|
|
|
|
def forward(self, X, state):
|
|
enc_outputs, enc_valid_lens = state[0], state[1]
|
|
|
|
|
|
|
|
|
|
if state[2][self.i] is None:
|
|
key_values = X
|
|
else:
|
|
key_values = torch.cat((state[2][self.i], X), axis=1)
|
|
state[2][self.i] = key_values
|
|
if self.training:
|
|
batch_size, num_steps, _ = X.shape
|
|
|
|
|
|
dec_valid_lens = torch.arange(
|
|
1, num_steps + 1, device=X.device).repeat(batch_size, 1)
|
|
else:
|
|
dec_valid_lens = None
|
|
|
|
|
|
X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
|
|
Y = self.addnorm1(X, X2)
|
|
|
|
|
|
Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
|
|
Z = self.addnorm2(Y, Y2)
|
|
return self.addnorm3(Z, self.ffn(Z)), state
|
|
|
|
class Decoder(nn.Module):
|
|
"""编码器-解码器架构的基本解码器接口
|
|
|
|
Defined in :numref:`sec_encoder-decoder`"""
|
|
def __init__(self, **kwargs):
|
|
super(Decoder, self).__init__(**kwargs)
|
|
|
|
def init_state(self, enc_outputs, *args):
|
|
raise NotImplementedError
|
|
|
|
def forward(self, X, state):
|
|
raise NotImplementedError
|
|
|
|
class AttentionDecoder(Decoder):
|
|
"""带有注意力机制解码器的基本接口
|
|
|
|
Defined in :numref:`sec_seq2seq_attention`"""
|
|
def __init__(self, **kwargs):
|
|
super(AttentionDecoder, self).__init__(**kwargs)
|
|
|
|
@property
|
|
def attention_weights(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
class TransformerEncoder(Encoder):
|
|
"""Transformer编码器"""
|
|
def __init__(self, vocab_size, key_size, query_size, value_size,
|
|
num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
|
|
num_heads, num_layers, dropout, use_bias=False, **kwargs):
|
|
super(TransformerEncoder, self).__init__(**kwargs)
|
|
self.num_hiddens = num_hiddens
|
|
self.embedding = nn.Embedding(vocab_size, num_hiddens)
|
|
self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
|
|
self.blks = nn.Sequential()
|
|
for i in range(num_layers):
|
|
self.blks.add_module("block"+str(i),
|
|
EncoderBlock(key_size, query_size, value_size, num_hiddens,
|
|
norm_shape, ffn_num_input, ffn_num_hiddens,
|
|
num_heads, dropout, use_bias))
|
|
|
|
def forward(self, X, valid_lens, *args):
|
|
|
|
|
|
|
|
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
|
|
self.attention_weights = [None] * len(self.blks)
|
|
for i, blk in enumerate(self.blks):
|
|
X = blk(X, valid_lens)
|
|
self.attention_weights[
|
|
i] = blk.attention.attention.attention_weights
|
|
return X
|
|
|
|
|
|
class TransformerDecoder(AttentionDecoder):
|
|
def __init__(self, vocab_size, key_size, query_size, value_size,
|
|
num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
|
|
num_heads, num_layers, dropout, **kwargs):
|
|
super(TransformerDecoder, self).__init__(**kwargs)
|
|
self.num_hiddens = num_hiddens
|
|
self.num_layers = num_layers
|
|
self.embedding = nn.Embedding(vocab_size, num_hiddens)
|
|
self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
|
|
self.blks = nn.Sequential()
|
|
for i in range(num_layers):
|
|
self.blks.add_module("block"+str(i),
|
|
DecoderBlock(key_size, query_size, value_size, num_hiddens,
|
|
norm_shape, ffn_num_input, ffn_num_hiddens,
|
|
num_heads, dropout, i))
|
|
self.dense = nn.Linear(num_hiddens, vocab_size)
|
|
|
|
def init_state(self, enc_outputs, enc_valid_lens, *args):
|
|
return [enc_outputs, enc_valid_lens, [None] * self.num_layers]
|
|
|
|
def forward(self, X, state):
|
|
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
|
|
self._attention_weights = [[None] * len(self.blks) for _ in range (2)]
|
|
for i, blk in enumerate(self.blks):
|
|
X, state = blk(X, state)
|
|
|
|
self._attention_weights[0][
|
|
i] = blk.attention1.attention.attention_weights
|
|
|
|
self._attention_weights[1][
|
|
i] = blk.attention2.attention.attention_weights
|
|
return self.dense(X), state
|
|
|
|
@property
|
|
def attention_weights(self):
|
|
return self._attention_weights
|
|
|
|
class transformerConfig(PretrainedConfig):
|
|
model_type = "custom_transformer"
|
|
|
|
def __init__(
|
|
self,
|
|
src_vocab_len : int =184,
|
|
tgt_vocab : int =201,
|
|
num_hiddens : int =32,
|
|
num_layers : int =2,
|
|
dropout : int =0.1,
|
|
batch_size : int =64,
|
|
num_steps : int =10,
|
|
lr : int =0.005,
|
|
num_epochs : int =200,
|
|
|
|
ffn_num_input : int =32,
|
|
ffn_num_hiddens : int =64,
|
|
num_heads : int =4,
|
|
key_size : int =32,
|
|
query_size : int =32,
|
|
value_size : int =32,
|
|
norm_shape : int =[32],
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**kwargs,
|
|
):
|
|
|
|
|
|
|
|
|
|
self.src_vocab_len = src_vocab_len
|
|
self.tgt_vocab = tgt_vocab
|
|
self.num_hiddens = num_hiddens
|
|
self.num_layers = num_layers
|
|
self.dropout = dropout
|
|
self.batch_size = batch_size
|
|
self.num_steps = num_steps
|
|
self.lr = lr
|
|
self.num_epochs = num_epochs
|
|
self.ffn_num_input = ffn_num_input
|
|
self.ffn_num_hiddens = ffn_num_hiddens
|
|
self.num_heads = num_heads
|
|
self.key_size = key_size
|
|
self.query_size = query_size
|
|
self.value_size = value_size
|
|
self.norm_shape = norm_shape
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
class transformerModel(PreTrainedModel):
|
|
|
|
config_class = transformerConfig
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self.encoder = TransformerEncoder(
|
|
config.src_vocab_len, config.key_size, config.query_size, config.value_size, config.num_hiddens,
|
|
config.norm_shape, config.ffn_num_input, config.ffn_num_hiddens, config.num_heads,
|
|
config.num_layers, config.dropout)
|
|
|
|
self.decoder = TransformerDecoder(
|
|
config.tgt_vocab, config.key_size, config.query_size, config.value_size, config.num_hiddens,
|
|
config.norm_shape, config.ffn_num_input, config.ffn_num_hiddens, config.num_heads,
|
|
config.num_layers, config.dropout)
|
|
|
|
def forward(self, enc_X, dec_X, *args):
|
|
enc_outputs = self.encoder(enc_X, *args)
|
|
dec_state = self.decoder.init_state(enc_outputs, *args)
|
|
return self.decoder(dec_X, dec_state)
|
|
|
|
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
|
|
device, save_attention_weights=False):
|
|
"""序列到序列模型的预测
|
|
|
|
Defined in :numref:`sec_seq2seq_training`"""
|
|
|
|
net.eval()
|
|
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
|
|
src_vocab['<eos>']]
|
|
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
|
|
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
|
|
|
|
enc_X = torch.unsqueeze(
|
|
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
|
|
enc_outputs = net.encoder(enc_X, enc_valid_len)
|
|
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
|
|
|
|
dec_X = torch.unsqueeze(torch.tensor(
|
|
[tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
|
|
output_seq, attention_weight_seq = [], []
|
|
for _ in range(num_steps):
|
|
Y, dec_state = net.decoder(dec_X, dec_state)
|
|
|
|
dec_X = Y.argmax(dim=2)
|
|
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
|
|
|
|
if save_attention_weights:
|
|
attention_weight_seq.append(net.decoder.attention_weights)
|
|
|
|
if pred == tgt_vocab['<eos>']:
|
|
break
|
|
output_seq.append(pred)
|
|
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq |