IQsignal_transformer_15db / modeling_IQtransformer.py
bu1's picture
Upload model
398490c verified
from transformers import PreTrainedModel
import torch
from torch import nn
import math
from transformers import PretrainedConfig
# 把transformerConfig和transformerModel都放在一个文件中,避免类别不匹配引起的错误
class transformerConfig(PretrainedConfig):
model_type = "IQsignal_transformer"
def __init__(
self,
vocab_size : int = 32,
key_size : int = 32,
query_size : int = 32,
value_size : int = 32,
num_hiddens : int = 32,
norm_shape : int = [32],
ffn_num_input : int = 32,
ffn_num_hiddens : int = 64,
num_heads : int = 4,
num_layers : int = 1,
dropout : int = 0.1,
**kwargs,
):
self.vocab_size = vocab_size
self.key_size = key_size
self.query_size = query_size
self.value_size = value_size
self.num_hiddens = num_hiddens
self.norm_shape = norm_shape
self.ffn_num_input = ffn_num_input
self.ffn_num_hiddens = ffn_num_hiddens
self.num_heads = num_heads
self.num_layers = num_layers
self.dropout = dropout
super().__init__(**kwargs)
class PositionWiseFFN(nn.Module):
"""基于位置的前馈网络"""
def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
**kwargs):
super(PositionWiseFFN, self).__init__(**kwargs)
self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
self.relu = nn.ReLU()
self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)
def forward(self, X):
return self.dense2(self.relu(self.dense1(X)))
class AddNorm(nn.Module):
"""残差连接后进行层规范化"""
def __init__(self, normalized_shape, dropout, **kwargs):
super(AddNorm, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
self.ln = nn.LayerNorm(normalized_shape)
def forward(self, X, Y):
return self.ln(self.dropout(Y) + X)
def masked_softmax(X, valid_lens):
"""通过在最后一个轴上掩蔽元素来执行softmax操作
Defined in :numref:`sec_attention-scoring-functions`"""
# X:3D张量,valid_lens:1D或2D张量
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape
if valid_lens.dim() == 1:
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
valid_lens = valid_lens.reshape(-1)
# 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0
X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens,
value=-1e6)
return nn.functional.softmax(X.reshape(shape), dim=-1)
def transpose_qkv(X, num_heads):
"""为了多注意力头的并行计算而变换形状
Defined in :numref:`sec_multihead-attention`"""
# 输入X的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens)
# 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads,
# num_hiddens/num_heads)
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
# 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数,
# num_hiddens/num_heads)
X = X.permute(0, 2, 1, 3)
# 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数,
# num_hiddens/num_heads)
return X.reshape(-1, X.shape[2], X.shape[3])
def transpose_output(X, num_heads):
"""逆转transpose_qkv函数的操作
Defined in :numref:`sec_multihead-attention`"""
X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
X = X.permute(0, 2, 1, 3)
return X.reshape(X.shape[0], X.shape[1], -1)
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项
Defined in :numref:`sec_seq2seq_decoder`"""
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
X[~mask] = value
return X
class DotProductAttention(nn.Module):
"""缩放点积注意力
Defined in :numref:`subsec_additive-attention`"""
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
# queries的形状:(batch_size,查询的个数,d)
# keys的形状:(batch_size,“键-值”对的个数,d)
# values的形状:(batch_size,“键-值”对的个数,值的维度)
# valid_lens的形状:(batch_size,)或者(batch_size,查询的个数)
def forward(self, queries, keys, values, valid_lens=None):
d = queries.shape[-1]
# 设置transpose_b=True为了交换keys的最后两个维度
scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
class MultiHeadAttention(nn.Module):
"""多头注意力
Defined in :numref:`sec_multihead-attention`"""
def __init__(self, key_size, query_size, value_size, num_hiddens,
num_heads, dropout, bias=False, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads
self.attention = DotProductAttention(dropout)
self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
def forward(self, queries, keys, values, valid_lens):
# queries,keys,values的形状:
# (batch_size,查询或者“键-值”对的个数,num_hiddens)
# valid_lens 的形状:
# (batch_size,)或(batch_size,查询的个数)
# 经过变换后,输出的queries,keys,values 的形状:
# (batch_size*num_heads,查询或者“键-值”对的个数,
# num_hiddens/num_heads)
queries = transpose_qkv(self.W_q(queries), self.num_heads)
keys = transpose_qkv(self.W_k(keys), self.num_heads)
values = transpose_qkv(self.W_v(values), self.num_heads)
if valid_lens is not None:
# 在轴0,将第一项(标量或者矢量)复制num_heads次,
# 然后如此复制第二项,然后诸如此类。
valid_lens = torch.repeat_interleave(
valid_lens, repeats=self.num_heads, dim=0)
# output的形状:(batch_size*num_heads,查询的个数,
# num_hiddens/num_heads)
output = self.attention(queries, keys, values, valid_lens)
# output_concat的形状:(batch_size,查询的个数,num_hiddens)
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)
class EncoderBlock(nn.Module):
"""Transformer编码器块"""
def __init__(self, key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
dropout, use_bias=False, **kwargs):
super(EncoderBlock, self).__init__(**kwargs)
self.attention = MultiHeadAttention(
key_size, query_size, value_size, num_hiddens, num_heads, dropout,
use_bias)
self.addnorm1 = AddNorm(norm_shape, dropout)
self.ffn = PositionWiseFFN(
ffn_num_input, ffn_num_hiddens, num_hiddens)
self.addnorm2 = AddNorm(norm_shape, dropout)
def forward(self, X, valid_lens):
Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
return self.addnorm2(Y, self.ffn(Y))
class Encoder(nn.Module):
"""编码器-解码器架构的基本编码器接口"""
def __init__(self, **kwargs):
super(Encoder, self).__init__(**kwargs)
def forward(self, X, *args):
raise NotImplementedError
class transformerModel(PreTrainedModel):
config_class = transformerConfig
def __init__(self, config):
super().__init__(config)
self.num_hiddens = config.num_hiddens
self.Linear = nn.Linear(config.vocab_size, config.vocab_size)
# self.embedding = nn.Embedding(vocab_size, num_hiddens) # 将输入vocab_size的维度 转化为 想要的num_hiddens维度
# self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
self.ln = nn.LayerNorm(config.norm_shape)
self.blks = nn.Sequential()
for i in range(config.num_layers):
self.blks.add_module("block" + str(i),
EncoderBlock(config.key_size, config.query_size, config.value_size, config.num_hiddens,
config.norm_shape, config.ffn_num_input, config.ffn_num_hiddens,
config.num_heads, config.dropout))
self.l1 = nn.Linear(64, 16)
self.l2 = nn.Linear(16, 4)
def forward(self, X, valid_lens, *args):
# 因为位置编码值在-1和1之间,
# 因此嵌入值乘以嵌入维度的平方根进行缩放,
# 然后再与位置编码相加。
X = self.ln(self.Linear(X).to(torch.float32))
self.attention_weights = [None] * len(self.blks)
for i, blk in enumerate(self.blks):
X = blk(X, valid_lens)
self.attention_weights[
i] = blk.attention.attention.attention_weights
X = self.l1(torch.reshape(X, [8, 64]))
X = self.l2(X)
return X
# class TransformerEncoder(nn.Module):
# """Transformer编码器"""
# def __init__(self, vocab_size, key_size, query_size, value_size,
# num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
# num_heads, num_layers, dropout, use_bias=False, **kwargs):
# super(TransformerEncoder, self).__init__(**kwargs)
# self.num_hiddens = num_hiddens
# self.Linear = nn.Linear(vocab_size,vocab_size)
# # self.embedding = nn.Embedding(vocab_size, num_hiddens) # 将输入vocab_size的维度 转化为 想要的num_hiddens维度
# # self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
# self.ln = nn.LayerNorm(vocab_size)
# self.blks = nn.Sequential()
# for i in range(num_layers):
# self.blks.add_module("block"+str(i),
# EncoderBlock(key_size, query_size, value_size, num_hiddens,
# norm_shape, ffn_num_input, ffn_num_hiddens,
# num_heads, dropout, use_bias))
#
# self.l1 = nn.Linear(64, 16)
# self.l2 = nn.Linear(16, 5)
#
# def forward(self, X, valid_lens, *args):
# # 因为位置编码值在-1和1之间,
# # 因此嵌入值乘以嵌入维度的平方根进行缩放,
# # 然后再与位置编码相加。
# X = self.ln(self.Linear(X))
# self.attention_weights = [None] * len(self.blks)
# for i, blk in enumerate(self.blks):
# X = blk(X, valid_lens)
# self.attention_weights[
# i] = blk.attention.attention.attention_weights
#
# X = self.l1(torch.reshape(X,[8, 64]))
# X = self.l2(X)
# return X