|
import torch |
|
import torch.nn as nn |
|
import math |
|
|
|
|
|
class FusedQKVAttention(nn.Module): |
|
def __init__(self, d_model, num_heads): |
|
super().__init__() |
|
self.d_model = d_model |
|
self.num_heads = num_heads |
|
self.head_dim = d_model // num_heads |
|
|
|
self.qkv_proj = nn.Linear(d_model, 3 * d_model) |
|
self.wo = nn.Linear(d_model, d_model) |
|
|
|
nn.init.xavier_uniform_(self.qkv_proj.weight) |
|
nn.init.xavier_uniform_(self.wo.weight) |
|
nn.init.zeros_(self.qkv_proj.bias) |
|
nn.init.zeros_(self.wo.bias) |
|
|
|
def forward(self, x, attention_mask=None): |
|
batch_size, seq_len, _ = x.shape |
|
|
|
qkv = self.qkv_proj(x).reshape(batch_size, seq_len, 3, self.num_heads, self.head_dim) |
|
qkv = qkv.permute(2, 0, 3, 1, 4) |
|
q, k, v = qkv[0], qkv[1], qkv[2] |
|
|
|
attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) |
|
if attention_mask is not None: |
|
attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) |
|
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf')) |
|
attention_weights = torch.softmax(attention_scores, dim=-1) |
|
|
|
context = torch.matmul(attention_weights, v) |
|
context = context.transpose(1, 2).reshape(batch_size, seq_len, self.d_model) |
|
return self.wo(context) |
|
|
|
|
|
|
|
class EnhancedFeedForward(nn.Module): |
|
def __init__(self, d_model, ff_dim, dropout=0.1): |
|
super().__init__() |
|
self.linear1 = nn.Linear(d_model, ff_dim) |
|
self.dropout1 = nn.Dropout(dropout) |
|
self.linear2 = nn.Linear(ff_dim, d_model) |
|
self.dropout2 = nn.Dropout(dropout) |
|
self.activation = nn.GELU() |
|
|
|
nn.init.xavier_uniform_(self.linear1.weight) |
|
nn.init.xavier_uniform_(self.linear2.weight) |
|
nn.init.zeros_(self.linear1.bias) |
|
nn.init.zeros_(self.linear2.bias) |
|
|
|
def forward(self, x): |
|
return self.dropout2(self.linear2(self.dropout1(self.activation(self.linear1(x))))) |
|
|
|
|
|
|
|
class EnhancedTransformerBlock(nn.Module): |
|
def __init__(self, d_model, num_heads, ff_dim, dropout=0.1): |
|
super().__init__() |
|
self.attention = FusedQKVAttention(d_model, num_heads) |
|
self.norm1 = nn.LayerNorm(d_model, eps=1e-6) |
|
self.dropout1 = nn.Dropout(dropout) |
|
self.feed_forward = EnhancedFeedForward(d_model, ff_dim, dropout) |
|
self.norm2 = nn.LayerNorm(d_model, eps=1e-6) |
|
self.dropout2 = nn.Dropout(dropout) |
|
|
|
def forward(self, x, attention_mask=None): |
|
|
|
attn_input = self.norm1(x) |
|
attn_output = self.attention(attn_input, attention_mask) |
|
x = x + self.dropout1(attn_output) |
|
ff_input = self.norm2(x) |
|
ff_output = self.feed_forward(ff_input) |
|
x = x + self.dropout2(ff_output) |
|
return x |
|
|
|
|
|
|
|
class Snowflake4CausalLM(nn.Module): |
|
def __init__(self, vocab_size, max_seq_length, d_model, num_heads, num_layers, ff_dim, dropout=0.1): |
|
super().__init__() |
|
self.embedding = nn.Embedding(vocab_size, d_model) |
|
|
|
self.pos_encoding = nn.Parameter(torch.zeros(1, max_seq_length, d_model)) |
|
position = torch.arange(max_seq_length).unsqueeze(1).float() |
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) |
|
pos_enc = torch.zeros(1, max_seq_length, d_model) |
|
pos_enc[0, :, 0::2] = torch.sin(position * div_term) |
|
pos_enc[0, :, 1::2] = torch.cos(position * div_term) |
|
self.pos_encoding.data = pos_enc.data |
|
self.layers = nn.ModuleList([ |
|
EnhancedTransformerBlock(d_model, num_heads, ff_dim, dropout) |
|
for _ in range(num_layers) |
|
]) |
|
self.final_norm = nn.LayerNorm(d_model, eps=1e-6) |
|
self.dropout = nn.Dropout(dropout) |
|
self.fc_out = nn.Linear(d_model, vocab_size) |
|
|
|
self.fc_out.weight = self.embedding.weight |
|
|
|
nn.init.normal_(self.embedding.weight, mean=0, std=0.02) |
|
|
|
def forward(self, input_ids, attention_mask=None): |
|
seq_length = input_ids.size(1) |
|
x = self.embedding(input_ids) + self.pos_encoding[:, :seq_length, :] |
|
x = self.dropout(x) |
|
for layer in self.layers: |
|
x = layer(x, attention_mask) |
|
x = self.final_norm(x) |
|
return self.fc_out(x) |