SnowflakeCore-Demo-Inteface / modeling_snowflake.py
FlameF0X's picture
Create modeling_snowflake.py
4276930 verified
raw
history blame
4.99 kB
import torch
import torch.nn as nn
import math
# --- Submodule: FusedQKVAttention ---
class FusedQKVAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
# Fused QKV projection
self.qkv_proj = nn.Linear(d_model, 3 * d_model)
self.wo = nn.Linear(d_model, d_model)
# Initialize weights for better training stability
nn.init.xavier_uniform_(self.qkv_proj.weight)
nn.init.xavier_uniform_(self.wo.weight)
nn.init.zeros_(self.qkv_proj.bias)
nn.init.zeros_(self.wo.bias)
def forward(self, x, attention_mask=None):
batch_size, seq_len, _ = x.shape
# Fused projection and reshape
qkv = self.qkv_proj(x).reshape(batch_size, seq_len, 3, self.num_heads, self.head_dim)
qkv = qkv.permute(2, 0, 3, 1, 4) # [3, batch, heads, seq_len, head_dim]
q, k, v = qkv[0], qkv[1], qkv[2]
# Compute attention with memory efficiency
attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
if attention_mask is not None:
attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf'))
attention_weights = torch.softmax(attention_scores, dim=-1)
# Apply attention and reshape
context = torch.matmul(attention_weights, v)
context = context.transpose(1, 2).reshape(batch_size, seq_len, self.d_model)
return self.wo(context)
# --- Submodule: EnhancedFeedForward ---
class EnhancedFeedForward(nn.Module):
def __init__(self, d_model, ff_dim, dropout=0.1):
super().__init__()
self.linear1 = nn.Linear(d_model, ff_dim)
self.dropout1 = nn.Dropout(dropout)
self.linear2 = nn.Linear(ff_dim, d_model)
self.dropout2 = nn.Dropout(dropout)
self.activation = nn.GELU()
# Initialize weights for better training
nn.init.xavier_uniform_(self.linear1.weight)
nn.init.xavier_uniform_(self.linear2.weight)
nn.init.zeros_(self.linear1.bias)
nn.init.zeros_(self.linear2.bias)
def forward(self, x):
return self.dropout2(self.linear2(self.dropout1(self.activation(self.linear1(x)))))
# --- Submodule: EnhancedTransformerBlock ---
class EnhancedTransformerBlock(nn.Module):
def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
super().__init__()
self.attention = FusedQKVAttention(d_model, num_heads)
self.norm1 = nn.LayerNorm(d_model, eps=1e-6)
self.dropout1 = nn.Dropout(dropout)
self.feed_forward = EnhancedFeedForward(d_model, ff_dim, dropout)
self.norm2 = nn.LayerNorm(d_model, eps=1e-6)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, attention_mask=None):
# Pre-norm architecture
attn_input = self.norm1(x)
attn_output = self.attention(attn_input, attention_mask)
x = x + self.dropout1(attn_output)
ff_input = self.norm2(x)
ff_output = self.feed_forward(ff_input)
x = x + self.dropout2(ff_output)
return x
# --- Main Model Class: Snowflake4CausalLM ---
class Snowflake4CausalLM(nn.Module):
def __init__(self, vocab_size, max_seq_length, d_model, num_heads, num_layers, ff_dim, dropout=0.1):
super().__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
# Initialize positional encodings without in-place modification
self.pos_encoding = nn.Parameter(torch.zeros(1, max_seq_length, d_model))
position = torch.arange(max_seq_length).unsqueeze(1).float()
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pos_enc = torch.zeros(1, max_seq_length, d_model)
pos_enc[0, :, 0::2] = torch.sin(position * div_term)
pos_enc[0, :, 1::2] = torch.cos(position * div_term)
self.pos_encoding.data = pos_enc.data
self.layers = nn.ModuleList([
EnhancedTransformerBlock(d_model, num_heads, ff_dim, dropout)
for _ in range(num_layers)
])
self.final_norm = nn.LayerNorm(d_model, eps=1e-6)
self.dropout = nn.Dropout(dropout)
self.fc_out = nn.Linear(d_model, vocab_size)
# Tie embedding and output weights for memory efficiency and better generalization
self.fc_out.weight = self.embedding.weight
# Initialize embedding weights
nn.init.normal_(self.embedding.weight, mean=0, std=0.02)
def forward(self, input_ids, attention_mask=None):
seq_length = input_ids.size(1)
x = self.embedding(input_ids) + self.pos_encoding[:, :seq_length, :]
x = self.dropout(x)
for layer in self.layers:
x = layer(x, attention_mask)
x = self.final_norm(x)
return self.fc_out(x)