Spaces:
Runtime error
Runtime error
import torch | |
from torch import nn | |
from typing import Optional, Tuple | |
import math | |
from ..utils.kv_cache import KVCache | |
from .language_config import LanguageModelConfig | |
class RMSNorm(nn.Module): | |
def __init__(self, dim: int, eps: float = 1e-6): | |
super().__init__() | |
self.eps = eps | |
self.weight = nn.Parameter(torch.zeros(dim)) | |
def _norm(self, x): | |
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) | |
def forward(self, x): | |
output = self._norm(x.float()) | |
output = output * (1.0 + self.weight.float()) | |
return output.type_as(x) | |
class RotaryEmbedding(nn.Module): | |
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None): | |
super().__init__() | |
self.dim = dim | |
self.max_position_embeddings = max_position_embeddings | |
self.base = base | |
inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float() / self.dim)) | |
self.register_buffer("inv_freq", tensor=inv_freq, persistent=False) | |
def forward(self, x, position_ids, seq_len=None): | |
self.inv_freq.to(x.device) | |
inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1) | |
position_ids_expanded = position_ids[:, None, :].float() | |
device_type = x.device.type | |
device_type = device_type if isinstance(device_type, str) and device_type != "mps" else "cpu" | |
with torch.autocast(device_type=device_type, enabled=False): | |
freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2) | |
emb = torch.cat((freqs, freqs), dim=-1) | |
cos = emb.cos() | |
sin = emb.sin() | |
return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype) | |
def rotate_half(x): | |
x1 = x[..., : x.shape[-1] // 2] | |
x2 = x[..., x.shape[-1] // 2 :] | |
return torch.cat((-x2, x1), dim=-1) | |
def apply_rotary_pos_emb(q, k, cos, sin, unsqueeze_dim=1): | |
cos = cos.unsqueeze(unsqueeze_dim) | |
sin = sin.unsqueeze(unsqueeze_dim) | |
q_embed = (q * cos) + (rotate_half(q) * sin) | |
k_embed = (k * cos) + (rotate_half(k) * sin) | |
return q_embed, k_embed | |
class MLP(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.config = config | |
self.hidden_size = config.hidden_size | |
self.intermediate_size = config.intermediate_size | |
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) | |
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) | |
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) | |
def forward(self, x): | |
return self.down_proj(nn.functional.gelu(self.gate_proj(x), approximate="tanh") * self.up_proj(x)) | |
def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: | |
batch, num_key_value_heads, slen, head_dim = hidden_states.shape | |
if n_rep == 1: | |
return hidden_states | |
hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) | |
return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) | |
class Attention(nn.Module): | |
def __init__(self, config: LanguageModelConfig, layer_idx: Optional[int] = None): | |
super().__init__() | |
self.config = config | |
self.layer_idx = layer_idx | |
self.attention_dropout = config.attention_dropout | |
self.hidden_size = config.hidden_size | |
self.num_heads = config.num_attention_heads | |
self.head_dim = config.head_dim | |
self.num_key_value_heads = config.num_key_value_heads | |
self.num_key_value_groups = self.num_heads // self.num_key_value_heads | |
self.max_position_embeddings = config.max_position_embeddings | |
self.rope_theta = config.rope_theta | |
self.is_causal = True | |
assert self.hidden_size % self.num_heads == 0 | |
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias) | |
self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias) | |
self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias) | |
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias) | |
self.rotary_emb = RotaryEmbedding( | |
self.head_dim, | |
max_position_embeddings=self.max_position_embeddings, | |
base=self.rope_theta, | |
) | |
def forward( | |
self, | |
hidden_states: torch.Tensor, | |
attention_mask: Optional[torch.Tensor] = None, | |
position_ids: Optional[torch.LongTensor] = None, | |
kv_cache: Optional[KVCache] = None, | |
**kwargs, | |
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: | |
bsz, q_len, _ = hidden_states.size() | |
query_states = self.q_proj(hidden_states) | |
key_states = self.k_proj(hidden_states) | |
value_states = self.v_proj(hidden_states) | |
query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2) | |
key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) | |
value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) | |
cos, sin = self.rotary_emb(value_states, position_ids, seq_len=None) | |
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) | |
if kv_cache is not None: | |
key_states, value_states = kv_cache.update(key_states, value_states, self.layer_idx) | |
key_states = repeat_kv(key_states, self.num_key_value_groups) | |
value_states = repeat_kv(value_states, self.num_key_value_groups) | |
attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim) | |
assert attention_mask is not None | |
attn_weights = attn_weights + attention_mask | |
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype) | |
attn_weights = nn.functional.dropout(attn_weights, p=self.attention_dropout, training=self.training) | |
attn_output = torch.matmul(attn_weights, value_states) | |
if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim): | |
raise ValueError( | |
f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is" | |
f" {attn_output.size()}" | |
) | |
attn_output = attn_output.transpose(1, 2).contiguous() | |
attn_output = attn_output.view(bsz, q_len, -1) | |
attn_output = self.o_proj(attn_output) | |
return attn_output, attn_weights | |
class DecoderLayer(nn.Module): | |
def __init__(self, config: LanguageModelConfig, layer_idx: int): | |
super().__init__() | |
self.hidden_size = config.hidden_size | |
self.self_attn = Attention(config=config, layer_idx=layer_idx) | |
self.mlp = MLP(config) | |
self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) | |
self.post_attention_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) | |
def forward( | |
self, | |
hidden_states: torch.Tensor, | |
attention_mask: Optional[torch.Tensor] = None, | |
position_ids: Optional[torch.LongTensor] = None, | |
kv_cache: Optional[KVCache] = None, | |
) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]: | |
residual = hidden_states | |
hidden_states = self.input_layernorm(hidden_states) | |
hidden_states, _, = self.self_attn( | |
hidden_states=hidden_states, | |
attention_mask=attention_mask, | |
position_ids=position_ids, | |
kv_cache=kv_cache, | |
) | |
hidden_states = residual + hidden_states | |
residual = hidden_states | |
hidden_states = self.post_attention_layernorm(hidden_states) | |
hidden_states = self.mlp(hidden_states) | |
hidden_states = residual + hidden_states | |
return hidden_states |