from transformers import PreTrainedModel import torch from torch import nn import math from transformers import PretrainedConfig # 把transformerConfig和transformerModel都放在一个文件中,避免类别不匹配引起的错误 class transformerConfig(PretrainedConfig): model_type = "IQsignal_transformer" def __init__( self, vocab_size : int = 32, key_size : int = 32, query_size : int = 32, value_size : int = 32, num_hiddens : int = 32, norm_shape : int = [32], ffn_num_input : int = 32, ffn_num_hiddens : int = 64, num_heads : int = 4, num_layers : int = 1, dropout : int = 0.1, **kwargs, ): self.vocab_size = vocab_size self.key_size = key_size self.query_size = query_size self.value_size = value_size self.num_hiddens = num_hiddens self.norm_shape = norm_shape self.ffn_num_input = ffn_num_input self.ffn_num_hiddens = ffn_num_hiddens self.num_heads = num_heads self.num_layers = num_layers self.dropout = dropout super().__init__(**kwargs) class PositionWiseFFN(nn.Module): """基于位置的前馈网络""" def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs, **kwargs): super(PositionWiseFFN, self).__init__(**kwargs) self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens) self.relu = nn.ReLU() self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs) def forward(self, X): return self.dense2(self.relu(self.dense1(X))) class AddNorm(nn.Module): """残差连接后进行层规范化""" def __init__(self, normalized_shape, dropout, **kwargs): super(AddNorm, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(normalized_shape) def forward(self, X, Y): return self.ln(self.dropout(Y) + X) def masked_softmax(X, valid_lens): """通过在最后一个轴上掩蔽元素来执行softmax操作 Defined in :numref:`sec_attention-scoring-functions`""" # X:3D张量,valid_lens:1D或2D张量 if valid_lens is None: return nn.functional.softmax(X, dim=-1) else: shape = X.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) else: valid_lens = valid_lens.reshape(-1) # 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0 X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(X.reshape(shape), dim=-1) def transpose_qkv(X, num_heads): """为了多注意力头的并行计算而变换形状 Defined in :numref:`sec_multihead-attention`""" # 输入X的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens) # 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads, # num_hiddens/num_heads) X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) # 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) X = X.permute(0, 2, 1, 3) # 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) def transpose_output(X, num_heads): """逆转transpose_qkv函数的操作 Defined in :numref:`sec_multihead-attention`""" X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1) def sequence_mask(X, valid_len, value=0): """在序列中屏蔽不相关的项 Defined in :numref:`sec_seq2seq_decoder`""" maxlen = X.size(1) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X class DotProductAttention(nn.Module): """缩放点积注意力 Defined in :numref:`subsec_additive-attention`""" def __init__(self, dropout, **kwargs): super(DotProductAttention, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) # queries的形状:(batch_size,查询的个数,d) # keys的形状:(batch_size,“键-值”对的个数,d) # values的形状:(batch_size,“键-值”对的个数,值的维度) # valid_lens的形状:(batch_size,)或者(batch_size,查询的个数) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # 设置transpose_b=True为了交换keys的最后两个维度 scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values) class MultiHeadAttention(nn.Module): """多头注意力 Defined in :numref:`sec_multihead-attention`""" def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs): super(MultiHeadAttention, self).__init__(**kwargs) self.num_heads = num_heads self.attention = DotProductAttention(dropout) self.W_q = nn.Linear(query_size, num_hiddens, bias=bias) self.W_k = nn.Linear(key_size, num_hiddens, bias=bias) self.W_v = nn.Linear(value_size, num_hiddens, bias=bias) self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias) def forward(self, queries, keys, values, valid_lens): # queries,keys,values的形状: # (batch_size,查询或者“键-值”对的个数,num_hiddens) # valid_lens 的形状: # (batch_size,)或(batch_size,查询的个数) # 经过变换后,输出的queries,keys,values 的形状: # (batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) queries = transpose_qkv(self.W_q(queries), self.num_heads) keys = transpose_qkv(self.W_k(keys), self.num_heads) values = transpose_qkv(self.W_v(values), self.num_heads) if valid_lens is not None: # 在轴0,将第一项(标量或者矢量)复制num_heads次, # 然后如此复制第二项,然后诸如此类。 valid_lens = torch.repeat_interleave( valid_lens, repeats=self.num_heads, dim=0) # output的形状:(batch_size*num_heads,查询的个数, # num_hiddens/num_heads) output = self.attention(queries, keys, values, valid_lens) # output_concat的形状:(batch_size,查询的个数,num_hiddens) output_concat = transpose_output(output, self.num_heads) return self.W_o(output_concat) class EncoderBlock(nn.Module): """Transformer编码器块""" def __init__(self, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias=False, **kwargs): super(EncoderBlock, self).__init__(**kwargs) self.attention = MultiHeadAttention( key_size, query_size, value_size, num_hiddens, num_heads, dropout, use_bias) self.addnorm1 = AddNorm(norm_shape, dropout) self.ffn = PositionWiseFFN( ffn_num_input, ffn_num_hiddens, num_hiddens) self.addnorm2 = AddNorm(norm_shape, dropout) def forward(self, X, valid_lens): Y = self.addnorm1(X, self.attention(X, X, X, valid_lens)) return self.addnorm2(Y, self.ffn(Y)) class Encoder(nn.Module): """编码器-解码器架构的基本编码器接口""" def __init__(self, **kwargs): super(Encoder, self).__init__(**kwargs) def forward(self, X, *args): raise NotImplementedError class transformerModel(PreTrainedModel): config_class = transformerConfig def __init__(self, config): super().__init__(config) self.num_hiddens = config.num_hiddens self.Linear = nn.Linear(config.vocab_size, config.vocab_size) # self.embedding = nn.Embedding(vocab_size, num_hiddens) # 将输入vocab_size的维度 转化为 想要的num_hiddens维度 # self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) self.ln = nn.LayerNorm(config.norm_shape) self.blks = nn.Sequential() for i in range(config.num_layers): self.blks.add_module("block" + str(i), EncoderBlock(config.key_size, config.query_size, config.value_size, config.num_hiddens, config.norm_shape, config.ffn_num_input, config.ffn_num_hiddens, config.num_heads, config.dropout)) self.l1 = nn.Linear(64, 16) self.l2 = nn.Linear(16, 4) def forward(self, X, valid_lens, *args): # 因为位置编码值在-1和1之间, # 因此嵌入值乘以嵌入维度的平方根进行缩放, # 然后再与位置编码相加。 X = self.ln(self.Linear(X).to(torch.float32)) self.attention_weights = [None] * len(self.blks) for i, blk in enumerate(self.blks): X = blk(X, valid_lens) self.attention_weights[ i] = blk.attention.attention.attention_weights X = self.l1(torch.reshape(X, [8, 64])) X = self.l2(X) return X # class TransformerEncoder(nn.Module): # """Transformer编码器""" # def __init__(self, vocab_size, key_size, query_size, value_size, # num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, # num_heads, num_layers, dropout, use_bias=False, **kwargs): # super(TransformerEncoder, self).__init__(**kwargs) # self.num_hiddens = num_hiddens # self.Linear = nn.Linear(vocab_size,vocab_size) # # self.embedding = nn.Embedding(vocab_size, num_hiddens) # 将输入vocab_size的维度 转化为 想要的num_hiddens维度 # # self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) # self.ln = nn.LayerNorm(vocab_size) # self.blks = nn.Sequential() # for i in range(num_layers): # self.blks.add_module("block"+str(i), # EncoderBlock(key_size, query_size, value_size, num_hiddens, # norm_shape, ffn_num_input, ffn_num_hiddens, # num_heads, dropout, use_bias)) # # self.l1 = nn.Linear(64, 16) # self.l2 = nn.Linear(16, 5) # # def forward(self, X, valid_lens, *args): # # 因为位置编码值在-1和1之间, # # 因此嵌入值乘以嵌入维度的平方根进行缩放, # # 然后再与位置编码相加。 # X = self.ln(self.Linear(X)) # self.attention_weights = [None] * len(self.blks) # for i, blk in enumerate(self.blks): # X = blk(X, valid_lens) # self.attention_weights[ # i] = blk.attention.attention.attention_weights # # X = self.l1(torch.reshape(X,[8, 64])) # X = self.l2(X) # return X