|
|
|
|
|
from whisper import whisper |
|
|
|
import torch |
|
from torch import Tensor, nn |
|
import torch.nn.functional as F |
|
from typing import Optional, Iterable |
|
from dataclasses import dataclass |
|
import json |
|
|
|
@dataclass |
|
class ModelDimensions: |
|
n_mels: int |
|
n_audio_ctx: int |
|
n_audio_state: int |
|
n_audio_head: int |
|
n_audio_layer: int |
|
n_vocab: int |
|
n_text_ctx: int |
|
n_text_state: int |
|
n_text_head: int |
|
n_text_layer: int |
|
|
|
class LayerNorm(nn.LayerNorm): |
|
def forward(self, x: Tensor) -> Tensor: |
|
return super().forward(x.float()).type(x.dtype) |
|
|
|
|
|
class Linear(nn.Linear): |
|
def forward(self, x: Tensor) -> Tensor: |
|
return F.linear( |
|
x, |
|
self.weight.to(x.dtype), |
|
None if self.bias is None else self.bias.to(x.dtype), |
|
) |
|
|
|
class Conv1d(nn.Conv1d): |
|
def _conv_forward( |
|
self, x: Tensor, weight: Tensor, bias: Optional[Tensor] |
|
) -> Tensor: |
|
return super()._conv_forward( |
|
x, weight.to(x.dtype), None if bias is None else bias.to(x.dtype) |
|
) |
|
|
|
class MultiHeadAttention(nn.Module): |
|
def __init__(self, n_state: int, n_head: int, no_cross: bool = False, cross_only: bool = False): |
|
super().__init__() |
|
self.no_cross = no_cross |
|
self.cross_only = cross_only |
|
self.n_head = n_head |
|
if not cross_only: |
|
self.query = Linear(n_state, n_state) |
|
self.out = Linear(n_state, n_state) |
|
if not no_cross: |
|
self.key = Linear(n_state, n_state, bias=False) |
|
self.value = Linear(n_state, n_state) |
|
|
|
def forward( |
|
self, |
|
x: Tensor, |
|
mask: Optional[Tensor] = None, |
|
k_cache: Optional[Tensor] = None, |
|
v_cache: Optional[Tensor] = None, |
|
offset: Optional[int] = None, |
|
): |
|
if self.cross_only: |
|
k = self.key(x) |
|
v = self.value(x) |
|
k_len = k.shape[-2] |
|
k_cache[:,:k_len,:] = k |
|
v_len = v.shape[-2] |
|
v_cache[:,:v_len,:] = v |
|
return x |
|
|
|
q = self.query(x) |
|
|
|
if self.no_cross: |
|
k = torch.zeros_like(k_cache) |
|
k_len = k.shape[-2] |
|
k[:,:k_len,:] = k_cache |
|
v = torch.zeros_like(v_cache) |
|
v_len = v.shape[-2] |
|
v[:,:k_len,:] = v_cache |
|
|
|
else: |
|
k = self.key(x) |
|
v = self.value(x) |
|
|
|
q_len = q.shape[-2] |
|
end_step = offset + q_len |
|
|
|
k_cache[:, offset:end_step, :] = k |
|
v_cache[:, offset:end_step, :] = v |
|
|
|
k = k_cache[:, :end_step, :] |
|
v = v_cache[:, :end_step, :] |
|
|
|
wv = self.qkv_attention(q, k, v, mask) |
|
return self.out(wv) |
|
|
|
def qkv_attention( |
|
self, q: Tensor, k: Tensor, v: Tensor, mask: Optional[Tensor] = None |
|
): |
|
n_batch, n_ctx, n_state = q.shape |
|
scale = (n_state // self.n_head) ** -0.25 |
|
q = q.view(*q.shape[:2], self.n_head, -1).permute(0, 2, 1, 3) * scale |
|
k = k.view(*k.shape[:2], self.n_head, -1).permute(0, 2, 3, 1) * scale |
|
v = v.view(*v.shape[:2], self.n_head, -1).permute(0, 2, 1, 3) |
|
|
|
qk = q @ k |
|
if mask is not None: |
|
qk = qk + mask[:n_ctx, :n_ctx] |
|
qk = qk.float() |
|
|
|
w = F.softmax(qk, dim=-1).to(q.dtype) |
|
return (w @ v).permute(0, 2, 1, 3).flatten(start_dim=2) |
|
|
|
class ResidualAttentionBlock(nn.Module): |
|
def __init__(self, n_state: int, n_head: int, cross_attention: bool = False, cross_only: bool = False): |
|
super().__init__() |
|
self.cross_only = cross_only |
|
if cross_only: |
|
self.cross_attn = ( |
|
MultiHeadAttention(n_state, n_head, cross_only=True) |
|
) |
|
else: |
|
self.attn = MultiHeadAttention(n_state, n_head) |
|
self.attn_ln = LayerNorm(n_state) |
|
|
|
self.cross_attn = ( |
|
MultiHeadAttention(n_state, n_head, no_cross=True) if cross_attention else None |
|
) |
|
self.cross_attn_ln = LayerNorm(n_state) if cross_attention else None |
|
|
|
n_mlp = n_state * 4 |
|
self.mlp = nn.Sequential( |
|
Linear(n_state, n_mlp), nn.GELU(), Linear(n_mlp, n_state) |
|
) |
|
self.mlp_ln = LayerNorm(n_state) |
|
|
|
def forward( |
|
self, |
|
x: Tensor, |
|
offset: Optional[int] = None, |
|
mask: Optional[Tensor] = None, |
|
k_cache1: Optional[Tensor] = None, |
|
v_cache1: Optional[Tensor] = None, |
|
k_cache2: Optional[Tensor] = None, |
|
v_cache2: Optional[Tensor] = None, |
|
): |
|
if self.cross_only: |
|
x = self.cross_attn(x, k_cache=k_cache2, v_cache=v_cache2) |
|
else: |
|
x = x + self.attn(self.attn_ln(x), mask=mask, k_cache=k_cache1, v_cache=v_cache1, offset=offset) |
|
if self.cross_attn: |
|
x = x + self.cross_attn(self.cross_attn_ln(x), k_cache=k_cache2, v_cache=v_cache2) |
|
x = x + self.mlp(self.mlp_ln(x)) |
|
return x |
|
|
|
class TextDecoder_first(nn.Module): |
|
def __init__( |
|
self, n_batch: int, n_vocab: int, n_text_ctx: int, n_audio_ctx: int, n_state: int, n_head: int, n_layer: int |
|
): |
|
super().__init__() |
|
|
|
self.blocks: Iterable[ResidualAttentionBlock] = nn.ModuleList( |
|
[ |
|
ResidualAttentionBlock(n_state, n_head, cross_attention=True, cross_only=True) |
|
for _ in range(n_layer) |
|
] |
|
) |
|
|
|
self.kvcache_shape1 = (n_layer, n_batch, n_text_ctx, n_state) |
|
self.kvcache_shape2 = (n_layer, n_batch, n_audio_ctx, n_state) |
|
self.register_buffer("k_cache1", torch.zeros(self.kvcache_shape1)) |
|
self.register_buffer("v_cache1", torch.zeros(self.kvcache_shape1)) |
|
self.register_buffer("k_cache2", torch.zeros(self.kvcache_shape2)) |
|
self.register_buffer("v_cache2", torch.zeros(self.kvcache_shape2)) |
|
|
|
def forward(self, xa: Tensor): |
|
""" |
|
xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state) |
|
the encoded audio features to be attended on |
|
""" |
|
self.k_cache1[:,:,:,:] = 0 |
|
self.v_cache1[:,:,:,:] = 0 |
|
x = xa |
|
for i, block in enumerate(self.blocks): |
|
x = block(x, k_cache2=self.k_cache2[i], v_cache2=self.v_cache2[i]) |
|
|
|
return x |
|
|
|
|
|
class TextDecoder_second(nn.Module): |
|
def __init__( |
|
self, n_batch: int, n_vocab: int, n_text_ctx: int, n_audio_ctx: int, n_state: int, n_head: int, n_layer: int |
|
): |
|
super().__init__() |
|
|
|
self.token_embedding = nn.Embedding(n_vocab, n_state) |
|
self.positional_embedding = nn.Parameter(torch.empty(n_text_ctx, n_state)) |
|
|
|
self.blocks: Iterable[ResidualAttentionBlock] = nn.ModuleList( |
|
[ |
|
ResidualAttentionBlock(n_state, n_head, cross_attention=True) |
|
for _ in range(n_layer) |
|
] |
|
) |
|
self.ln = LayerNorm(n_state) |
|
|
|
mask = torch.empty(n_text_ctx, n_text_ctx).fill_(-np.inf).triu_(1) |
|
self.register_buffer("mask", mask, persistent=False) |
|
|
|
self.kvcache_shape1 = (n_layer, n_batch, n_text_ctx, n_state) |
|
self.kvcache_shape2 = (n_layer, n_batch, n_audio_ctx, n_state) |
|
self.register_buffer("k_cache1", torch.zeros(self.kvcache_shape1)) |
|
self.register_buffer("v_cache1", torch.zeros(self.kvcache_shape1)) |
|
self.register_buffer("k_cache2", torch.zeros(self.kvcache_shape2)) |
|
self.register_buffer("v_cache2", torch.zeros(self.kvcache_shape2)) |
|
|
|
def forward(self, x: Tensor, offset_mask: Tensor): |
|
""" |
|
x : torch.LongTensor, shape = (batch_size, <= n_ctx) |
|
the text tokens |
|
xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state) |
|
the encoded audio features to be attended on |
|
""" |
|
end_step = offset_mask.shape[-1] |
|
offset = end_step - x.shape[-1] |
|
x = ( |
|
self.token_embedding(x) |
|
+ self.positional_embedding[offset:end_step] |
|
) |
|
|
|
for i, block in enumerate(self.blocks): |
|
x = block(x, offset=offset, mask=self.mask, k_cache1=self.k_cache1[i], v_cache1=self.v_cache1[i], k_cache2=self.k_cache2[i], v_cache2=self.v_cache2[i]) |
|
|
|
x = self.ln(x) |
|
logits = ( |
|
x @ torch.transpose(self.token_embedding.weight.to(x.dtype), 0, 1) |
|
).float() |
|
|
|
return logits |
|
|
|
import numpy as np |
|
import coremltools as ct |
|
|
|
def converter_encoder(model: whisper.Whisper, split: bool = False): |
|
model.eval() |
|
encoder = model.encoder |
|
hparams = model.dims |
|
|
|
input_shape = (1, hparams.n_mels, 3000) |
|
input_data = torch.randn(input_shape) |
|
traced_model = torch.jit.trace(encoder, input_data) |
|
|
|
coreml_model = ct.convert( |
|
traced_model, |
|
inputs=[ct.TensorType(name="logmel_data", shape=input_shape)], |
|
outputs=[ct.TensorType(name="output")], |
|
minimum_deployment_target=ct.target.iOS18, |
|
) |
|
coreml_model.save("encoder.mlpackage") |
|
|
|
if split: |
|
ct.models.utils.bisect_model( |
|
coreml_model, |
|
"./encoder/", |
|
merge_chunks_to_pipeline=True, |
|
) |
|
del coreml_model |
|
|
|
def converter_decoder(model: whisper.Whisper): |
|
model.eval() |
|
org_decoder = model.decoder |
|
hparams = model.dims |
|
|
|
batch_size = 1 |
|
decoder1 = TextDecoder_first( |
|
batch_size, |
|
hparams.n_vocab, |
|
hparams.n_text_ctx, |
|
hparams.n_audio_ctx, |
|
hparams.n_text_state, |
|
hparams.n_text_head, |
|
hparams.n_text_layer, |
|
) |
|
|
|
decoder1.load_state_dict(org_decoder.state_dict(), strict=False) |
|
decoder1.eval() |
|
|
|
tokens_shape = (batch_size, 1) |
|
audio_shape = (batch_size, hparams.n_audio_ctx, hparams.n_audio_state) |
|
|
|
audio_data = torch.randn(audio_shape) |
|
traced_model1 = torch.jit.trace(decoder1, [audio_data]) |
|
|
|
audio_length = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_audio_ctx, default=1) |
|
inputs = [ |
|
ct.TensorType(shape=(batch_size, audio_length, hparams.n_audio_state), dtype=np.float16, name="audio_data"), |
|
] |
|
outputs = [ct.TensorType(dtype=np.float16, name="dummy")] |
|
states = [ |
|
ct.StateType( |
|
wrapped_type=ct.TensorType( |
|
shape=decoder1.kvcache_shape1, dtype=np.float16 |
|
), |
|
name="k_cache1", |
|
), |
|
ct.StateType( |
|
wrapped_type=ct.TensorType( |
|
shape=decoder1.kvcache_shape1, dtype=np.float16 |
|
), |
|
name="v_cache1", |
|
), |
|
ct.StateType( |
|
wrapped_type=ct.TensorType( |
|
shape=decoder1.kvcache_shape2, dtype=np.float16 |
|
), |
|
name="k_cache2", |
|
), |
|
ct.StateType( |
|
wrapped_type=ct.TensorType( |
|
shape=decoder1.kvcache_shape2, dtype=np.float16 |
|
), |
|
name="v_cache2", |
|
), |
|
] |
|
|
|
converted_model = ct.convert( |
|
traced_model1, |
|
inputs=inputs, |
|
outputs=outputs, |
|
states=states, |
|
minimum_deployment_target=ct.target.iOS18, |
|
) |
|
converted_model.save("decoder_first.mlpackage") |
|
del traced_model1 |
|
del converted_model |
|
|
|
decoder2 = TextDecoder_second( |
|
batch_size, |
|
hparams.n_vocab, |
|
hparams.n_text_ctx, |
|
hparams.n_audio_ctx, |
|
hparams.n_text_state, |
|
hparams.n_text_head, |
|
hparams.n_text_layer, |
|
) |
|
|
|
decoder2.load_state_dict(org_decoder.state_dict(), strict=False) |
|
decoder2.eval() |
|
|
|
token_data = torch.randint(hparams.n_vocab, tokens_shape).long() |
|
offset_mask = torch.zeros(tokens_shape) |
|
traced_model2 = torch.jit.trace(decoder2, [token_data, offset_mask]) |
|
|
|
query_length = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_text_ctx, default=1) |
|
end_step_dim = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_text_ctx, default=1) |
|
inputs = [ |
|
ct.TensorType(shape=(batch_size, query_length), dtype=np.int32, name="token_data"), |
|
ct.TensorType(shape=(batch_size, end_step_dim), dtype=np.float16, name="offset_mask"), |
|
] |
|
outputs = [ct.TensorType(dtype=np.float16, name="logits")] |
|
|
|
converted_model = ct.convert( |
|
traced_model2, |
|
inputs=inputs, |
|
outputs=outputs, |
|
states=states, |
|
minimum_deployment_target=ct.target.iOS18, |
|
) |
|
converted_model.save("decoder_second.mlpackage") |
|
del traced_model2 |
|
del converted_model |
|
|
|
def test_model(hparams: ModelDimensions): |
|
logmel_shape = (1, hparams.n_mels, 3000) |
|
|
|
encoder = ct.models.MLModel("encoder.mlpackage") |
|
encoder_output = encoder.predict({'logmel_data': np.random.rand(*logmel_shape)}) |
|
audio_data = encoder_output['output'] |
|
|
|
decoder1 = ct.models.MLModel("decoder_first.mlpackage") |
|
decoder2 = ct.models.MLModel("decoder_second.mlpackage") |
|
decoder_state = decoder1.make_state() |
|
decoder_input = { |
|
'audio_data': audio_data, |
|
} |
|
decoder_output = decoder1.predict(decoder_input, decoder_state) |
|
|
|
past_kv_len = 0 |
|
token_data = np.random.randint(hparams.n_vocab, size=(1, 5), dtype=np.int32) |
|
offset_mask = np.zeros((1, past_kv_len + 5)) |
|
decoder_input = { |
|
'token_data': token_data, |
|
'offset_mask': offset_mask, |
|
} |
|
decoder_output = decoder2.predict(decoder_input, decoder_state) |
|
print(decoder_output) |
|
past_kv_len += 5 |
|
|
|
while past_kv_len + 1 < hparams.n_text_ctx: |
|
token_data = np.random.randint(hparams.n_vocab, size=(1, 1), dtype=np.int32) |
|
offset_mask = np.zeros((1, past_kv_len + 1)) |
|
decoder_input = { |
|
'token_data': token_data, |
|
'offset_mask': offset_mask, |
|
} |
|
decoder_output = decoder2.predict(decoder_input, decoder_state) |
|
print(decoder_output) |
|
past_kv_len += 1 |
|
|
|
def print_dims(model: whisper.Whisper): |
|
with open('model_dims.json', 'w') as f: |
|
json.dump(model.dims.__dict__, f, indent=2) |
|
|
|
if __name__=='__main__': |
|
import os |
|
os.makedirs("work", exist_ok=True) |
|
os.chdir("work") |
|
for model_size in ['tiny','base','small','medium','large-v2','large-v3']: |
|
print(model_size) |
|
os.makedirs(model_size, exist_ok=True) |
|
os.chdir(model_size) |
|
model = whisper.load_model(model_size) |
|
print_dims(model) |
|
converter_encoder(model, split=model_size.startswith('large')) |
|
converter_decoder(model) |
|
|
|
del model |
|
os.chdir("..") |
|
os.chdir("..") |
|
|