coreml-whisper / whisper_convert.py
lithium0003's picture
initial commit
ca32d55
#!/usr/bin/env python3
from whisper import whisper
import torch
from torch import Tensor, nn
import torch.nn.functional as F
from typing import Optional, Iterable
from dataclasses import dataclass
import json
@dataclass
class ModelDimensions:
n_mels: int
n_audio_ctx: int
n_audio_state: int
n_audio_head: int
n_audio_layer: int
n_vocab: int
n_text_ctx: int
n_text_state: int
n_text_head: int
n_text_layer: int
class LayerNorm(nn.LayerNorm):
def forward(self, x: Tensor) -> Tensor:
return super().forward(x.float()).type(x.dtype)
class Linear(nn.Linear):
def forward(self, x: Tensor) -> Tensor:
return F.linear(
x,
self.weight.to(x.dtype),
None if self.bias is None else self.bias.to(x.dtype),
)
class Conv1d(nn.Conv1d):
def _conv_forward(
self, x: Tensor, weight: Tensor, bias: Optional[Tensor]
) -> Tensor:
return super()._conv_forward(
x, weight.to(x.dtype), None if bias is None else bias.to(x.dtype)
)
class MultiHeadAttention(nn.Module):
def __init__(self, n_state: int, n_head: int, no_cross: bool = False, cross_only: bool = False):
super().__init__()
self.no_cross = no_cross
self.cross_only = cross_only
self.n_head = n_head
if not cross_only:
self.query = Linear(n_state, n_state)
self.out = Linear(n_state, n_state)
if not no_cross:
self.key = Linear(n_state, n_state, bias=False)
self.value = Linear(n_state, n_state)
def forward(
self,
x: Tensor,
mask: Optional[Tensor] = None,
k_cache: Optional[Tensor] = None,
v_cache: Optional[Tensor] = None,
offset: Optional[int] = None,
):
if self.cross_only:
k = self.key(x)
v = self.value(x)
k_len = k.shape[-2]
k_cache[:,:k_len,:] = k
v_len = v.shape[-2]
v_cache[:,:v_len,:] = v
return x
q = self.query(x)
if self.no_cross:
k = torch.zeros_like(k_cache)
k_len = k.shape[-2]
k[:,:k_len,:] = k_cache
v = torch.zeros_like(v_cache)
v_len = v.shape[-2]
v[:,:k_len,:] = v_cache
else:
k = self.key(x)
v = self.value(x)
q_len = q.shape[-2]
end_step = offset + q_len
k_cache[:, offset:end_step, :] = k
v_cache[:, offset:end_step, :] = v
k = k_cache[:, :end_step, :]
v = v_cache[:, :end_step, :]
wv = self.qkv_attention(q, k, v, mask)
return self.out(wv)
def qkv_attention(
self, q: Tensor, k: Tensor, v: Tensor, mask: Optional[Tensor] = None
):
n_batch, n_ctx, n_state = q.shape
scale = (n_state // self.n_head) ** -0.25
q = q.view(*q.shape[:2], self.n_head, -1).permute(0, 2, 1, 3) * scale
k = k.view(*k.shape[:2], self.n_head, -1).permute(0, 2, 3, 1) * scale
v = v.view(*v.shape[:2], self.n_head, -1).permute(0, 2, 1, 3)
qk = q @ k
if mask is not None:
qk = qk + mask[:n_ctx, :n_ctx]
qk = qk.float()
w = F.softmax(qk, dim=-1).to(q.dtype)
return (w @ v).permute(0, 2, 1, 3).flatten(start_dim=2)
class ResidualAttentionBlock(nn.Module):
def __init__(self, n_state: int, n_head: int, cross_attention: bool = False, cross_only: bool = False):
super().__init__()
self.cross_only = cross_only
if cross_only:
self.cross_attn = (
MultiHeadAttention(n_state, n_head, cross_only=True)
)
else:
self.attn = MultiHeadAttention(n_state, n_head)
self.attn_ln = LayerNorm(n_state)
self.cross_attn = (
MultiHeadAttention(n_state, n_head, no_cross=True) if cross_attention else None
)
self.cross_attn_ln = LayerNorm(n_state) if cross_attention else None
n_mlp = n_state * 4
self.mlp = nn.Sequential(
Linear(n_state, n_mlp), nn.GELU(), Linear(n_mlp, n_state)
)
self.mlp_ln = LayerNorm(n_state)
def forward(
self,
x: Tensor,
offset: Optional[int] = None,
mask: Optional[Tensor] = None,
k_cache1: Optional[Tensor] = None,
v_cache1: Optional[Tensor] = None,
k_cache2: Optional[Tensor] = None,
v_cache2: Optional[Tensor] = None,
):
if self.cross_only:
x = self.cross_attn(x, k_cache=k_cache2, v_cache=v_cache2)
else:
x = x + self.attn(self.attn_ln(x), mask=mask, k_cache=k_cache1, v_cache=v_cache1, offset=offset)
if self.cross_attn:
x = x + self.cross_attn(self.cross_attn_ln(x), k_cache=k_cache2, v_cache=v_cache2)
x = x + self.mlp(self.mlp_ln(x))
return x
class TextDecoder_first(nn.Module):
def __init__(
self, n_batch: int, n_vocab: int, n_text_ctx: int, n_audio_ctx: int, n_state: int, n_head: int, n_layer: int
):
super().__init__()
self.blocks: Iterable[ResidualAttentionBlock] = nn.ModuleList(
[
ResidualAttentionBlock(n_state, n_head, cross_attention=True, cross_only=True)
for _ in range(n_layer)
]
)
self.kvcache_shape1 = (n_layer, n_batch, n_text_ctx, n_state)
self.kvcache_shape2 = (n_layer, n_batch, n_audio_ctx, n_state)
self.register_buffer("k_cache1", torch.zeros(self.kvcache_shape1))
self.register_buffer("v_cache1", torch.zeros(self.kvcache_shape1))
self.register_buffer("k_cache2", torch.zeros(self.kvcache_shape2))
self.register_buffer("v_cache2", torch.zeros(self.kvcache_shape2))
def forward(self, xa: Tensor):
"""
xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state)
the encoded audio features to be attended on
"""
self.k_cache1[:,:,:,:] = 0
self.v_cache1[:,:,:,:] = 0
x = xa
for i, block in enumerate(self.blocks):
x = block(x, k_cache2=self.k_cache2[i], v_cache2=self.v_cache2[i])
return x
class TextDecoder_second(nn.Module):
def __init__(
self, n_batch: int, n_vocab: int, n_text_ctx: int, n_audio_ctx: int, n_state: int, n_head: int, n_layer: int
):
super().__init__()
self.token_embedding = nn.Embedding(n_vocab, n_state)
self.positional_embedding = nn.Parameter(torch.empty(n_text_ctx, n_state))
self.blocks: Iterable[ResidualAttentionBlock] = nn.ModuleList(
[
ResidualAttentionBlock(n_state, n_head, cross_attention=True)
for _ in range(n_layer)
]
)
self.ln = LayerNorm(n_state)
mask = torch.empty(n_text_ctx, n_text_ctx).fill_(-np.inf).triu_(1)
self.register_buffer("mask", mask, persistent=False)
self.kvcache_shape1 = (n_layer, n_batch, n_text_ctx, n_state)
self.kvcache_shape2 = (n_layer, n_batch, n_audio_ctx, n_state)
self.register_buffer("k_cache1", torch.zeros(self.kvcache_shape1))
self.register_buffer("v_cache1", torch.zeros(self.kvcache_shape1))
self.register_buffer("k_cache2", torch.zeros(self.kvcache_shape2))
self.register_buffer("v_cache2", torch.zeros(self.kvcache_shape2))
def forward(self, x: Tensor, offset_mask: Tensor):
"""
x : torch.LongTensor, shape = (batch_size, <= n_ctx)
the text tokens
xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state)
the encoded audio features to be attended on
"""
end_step = offset_mask.shape[-1]
offset = end_step - x.shape[-1]
x = (
self.token_embedding(x)
+ self.positional_embedding[offset:end_step]
)
for i, block in enumerate(self.blocks):
x = block(x, offset=offset, mask=self.mask, k_cache1=self.k_cache1[i], v_cache1=self.v_cache1[i], k_cache2=self.k_cache2[i], v_cache2=self.v_cache2[i])
x = self.ln(x)
logits = (
x @ torch.transpose(self.token_embedding.weight.to(x.dtype), 0, 1)
).float()
return logits
import numpy as np
import coremltools as ct
def converter_encoder(model: whisper.Whisper, split: bool = False):
model.eval()
encoder = model.encoder
hparams = model.dims
input_shape = (1, hparams.n_mels, 3000)
input_data = torch.randn(input_shape)
traced_model = torch.jit.trace(encoder, input_data)
coreml_model = ct.convert(
traced_model,
inputs=[ct.TensorType(name="logmel_data", shape=input_shape)],
outputs=[ct.TensorType(name="output")],
minimum_deployment_target=ct.target.iOS18,
)
coreml_model.save("encoder.mlpackage")
if split:
ct.models.utils.bisect_model(
coreml_model,
"./encoder/",
merge_chunks_to_pipeline=True,
)
del coreml_model
def converter_decoder(model: whisper.Whisper):
model.eval()
org_decoder = model.decoder
hparams = model.dims
batch_size = 1
decoder1 = TextDecoder_first(
batch_size,
hparams.n_vocab,
hparams.n_text_ctx,
hparams.n_audio_ctx,
hparams.n_text_state,
hparams.n_text_head,
hparams.n_text_layer,
)
decoder1.load_state_dict(org_decoder.state_dict(), strict=False)
decoder1.eval()
tokens_shape = (batch_size, 1)
audio_shape = (batch_size, hparams.n_audio_ctx, hparams.n_audio_state)
audio_data = torch.randn(audio_shape)
traced_model1 = torch.jit.trace(decoder1, [audio_data])
audio_length = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_audio_ctx, default=1)
inputs = [
ct.TensorType(shape=(batch_size, audio_length, hparams.n_audio_state), dtype=np.float16, name="audio_data"),
]
outputs = [ct.TensorType(dtype=np.float16, name="dummy")]
states = [
ct.StateType(
wrapped_type=ct.TensorType(
shape=decoder1.kvcache_shape1, dtype=np.float16
),
name="k_cache1",
),
ct.StateType(
wrapped_type=ct.TensorType(
shape=decoder1.kvcache_shape1, dtype=np.float16
),
name="v_cache1",
),
ct.StateType(
wrapped_type=ct.TensorType(
shape=decoder1.kvcache_shape2, dtype=np.float16
),
name="k_cache2",
),
ct.StateType(
wrapped_type=ct.TensorType(
shape=decoder1.kvcache_shape2, dtype=np.float16
),
name="v_cache2",
),
]
converted_model = ct.convert(
traced_model1,
inputs=inputs,
outputs=outputs,
states=states,
minimum_deployment_target=ct.target.iOS18,
)
converted_model.save("decoder_first.mlpackage")
del traced_model1
del converted_model
decoder2 = TextDecoder_second(
batch_size,
hparams.n_vocab,
hparams.n_text_ctx,
hparams.n_audio_ctx,
hparams.n_text_state,
hparams.n_text_head,
hparams.n_text_layer,
)
decoder2.load_state_dict(org_decoder.state_dict(), strict=False)
decoder2.eval()
token_data = torch.randint(hparams.n_vocab, tokens_shape).long()
offset_mask = torch.zeros(tokens_shape)
traced_model2 = torch.jit.trace(decoder2, [token_data, offset_mask])
query_length = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_text_ctx, default=1)
end_step_dim = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_text_ctx, default=1)
inputs = [
ct.TensorType(shape=(batch_size, query_length), dtype=np.int32, name="token_data"),
ct.TensorType(shape=(batch_size, end_step_dim), dtype=np.float16, name="offset_mask"),
]
outputs = [ct.TensorType(dtype=np.float16, name="logits")]
converted_model = ct.convert(
traced_model2,
inputs=inputs,
outputs=outputs,
states=states,
minimum_deployment_target=ct.target.iOS18,
)
converted_model.save("decoder_second.mlpackage")
del traced_model2
del converted_model
def test_model(hparams: ModelDimensions):
logmel_shape = (1, hparams.n_mels, 3000)
encoder = ct.models.MLModel("encoder.mlpackage")
encoder_output = encoder.predict({'logmel_data': np.random.rand(*logmel_shape)})
audio_data = encoder_output['output']
decoder1 = ct.models.MLModel("decoder_first.mlpackage")
decoder2 = ct.models.MLModel("decoder_second.mlpackage")
decoder_state = decoder1.make_state()
decoder_input = {
'audio_data': audio_data,
}
decoder_output = decoder1.predict(decoder_input, decoder_state)
past_kv_len = 0
token_data = np.random.randint(hparams.n_vocab, size=(1, 5), dtype=np.int32)
offset_mask = np.zeros((1, past_kv_len + 5))
decoder_input = {
'token_data': token_data,
'offset_mask': offset_mask,
}
decoder_output = decoder2.predict(decoder_input, decoder_state)
print(decoder_output)
past_kv_len += 5
while past_kv_len + 1 < hparams.n_text_ctx:
token_data = np.random.randint(hparams.n_vocab, size=(1, 1), dtype=np.int32)
offset_mask = np.zeros((1, past_kv_len + 1))
decoder_input = {
'token_data': token_data,
'offset_mask': offset_mask,
}
decoder_output = decoder2.predict(decoder_input, decoder_state)
print(decoder_output)
past_kv_len += 1
def print_dims(model: whisper.Whisper):
with open('model_dims.json', 'w') as f:
json.dump(model.dims.__dict__, f, indent=2)
if __name__=='__main__':
import os
os.makedirs("work", exist_ok=True)
os.chdir("work")
for model_size in ['tiny','base','small','medium','large-v2','large-v3']:
print(model_size)
os.makedirs(model_size, exist_ok=True)
os.chdir(model_size)
model = whisper.load_model(model_size)
print_dims(model)
converter_encoder(model, split=model_size.startswith('large'))
converter_decoder(model)
# test_model(model.dims)
del model
os.chdir("..")
os.chdir("..")