#!/usr/bin/env python3 from whisper import whisper import torch from torch import Tensor, nn import torch.nn.functional as F from typing import Optional, Iterable from dataclasses import dataclass import json @dataclass class ModelDimensions: n_mels: int n_audio_ctx: int n_audio_state: int n_audio_head: int n_audio_layer: int n_vocab: int n_text_ctx: int n_text_state: int n_text_head: int n_text_layer: int class LayerNorm(nn.LayerNorm): def forward(self, x: Tensor) -> Tensor: return super().forward(x.float()).type(x.dtype) class Linear(nn.Linear): def forward(self, x: Tensor) -> Tensor: return F.linear( x, self.weight.to(x.dtype), None if self.bias is None else self.bias.to(x.dtype), ) class Conv1d(nn.Conv1d): def _conv_forward( self, x: Tensor, weight: Tensor, bias: Optional[Tensor] ) -> Tensor: return super()._conv_forward( x, weight.to(x.dtype), None if bias is None else bias.to(x.dtype) ) class MultiHeadAttention(nn.Module): def __init__(self, n_state: int, n_head: int, no_cross: bool = False, cross_only: bool = False): super().__init__() self.no_cross = no_cross self.cross_only = cross_only self.n_head = n_head if not cross_only: self.query = Linear(n_state, n_state) self.out = Linear(n_state, n_state) if not no_cross: self.key = Linear(n_state, n_state, bias=False) self.value = Linear(n_state, n_state) def forward( self, x: Tensor, mask: Optional[Tensor] = None, k_cache: Optional[Tensor] = None, v_cache: Optional[Tensor] = None, offset: Optional[int] = None, ): if self.cross_only: k = self.key(x) v = self.value(x) k_len = k.shape[-2] k_cache[:,:k_len,:] = k v_len = v.shape[-2] v_cache[:,:v_len,:] = v return x q = self.query(x) if self.no_cross: k = torch.zeros_like(k_cache) k_len = k.shape[-2] k[:,:k_len,:] = k_cache v = torch.zeros_like(v_cache) v_len = v.shape[-2] v[:,:k_len,:] = v_cache else: k = self.key(x) v = self.value(x) q_len = q.shape[-2] end_step = offset + q_len k_cache[:, offset:end_step, :] = k v_cache[:, offset:end_step, :] = v k = k_cache[:, :end_step, :] v = v_cache[:, :end_step, :] wv = self.qkv_attention(q, k, v, mask) return self.out(wv) def qkv_attention( self, q: Tensor, k: Tensor, v: Tensor, mask: Optional[Tensor] = None ): n_batch, n_ctx, n_state = q.shape scale = (n_state // self.n_head) ** -0.25 q = q.view(*q.shape[:2], self.n_head, -1).permute(0, 2, 1, 3) * scale k = k.view(*k.shape[:2], self.n_head, -1).permute(0, 2, 3, 1) * scale v = v.view(*v.shape[:2], self.n_head, -1).permute(0, 2, 1, 3) qk = q @ k if mask is not None: qk = qk + mask[:n_ctx, :n_ctx] qk = qk.float() w = F.softmax(qk, dim=-1).to(q.dtype) return (w @ v).permute(0, 2, 1, 3).flatten(start_dim=2) class ResidualAttentionBlock(nn.Module): def __init__(self, n_state: int, n_head: int, cross_attention: bool = False, cross_only: bool = False): super().__init__() self.cross_only = cross_only if cross_only: self.cross_attn = ( MultiHeadAttention(n_state, n_head, cross_only=True) ) else: self.attn = MultiHeadAttention(n_state, n_head) self.attn_ln = LayerNorm(n_state) self.cross_attn = ( MultiHeadAttention(n_state, n_head, no_cross=True) if cross_attention else None ) self.cross_attn_ln = LayerNorm(n_state) if cross_attention else None n_mlp = n_state * 4 self.mlp = nn.Sequential( Linear(n_state, n_mlp), nn.GELU(), Linear(n_mlp, n_state) ) self.mlp_ln = LayerNorm(n_state) def forward( self, x: Tensor, offset: Optional[int] = None, mask: Optional[Tensor] = None, k_cache1: Optional[Tensor] = None, v_cache1: Optional[Tensor] = None, k_cache2: Optional[Tensor] = None, v_cache2: Optional[Tensor] = None, ): if self.cross_only: x = self.cross_attn(x, k_cache=k_cache2, v_cache=v_cache2) else: x = x + self.attn(self.attn_ln(x), mask=mask, k_cache=k_cache1, v_cache=v_cache1, offset=offset) if self.cross_attn: x = x + self.cross_attn(self.cross_attn_ln(x), k_cache=k_cache2, v_cache=v_cache2) x = x + self.mlp(self.mlp_ln(x)) return x class TextDecoder_first(nn.Module): def __init__( self, n_batch: int, n_vocab: int, n_text_ctx: int, n_audio_ctx: int, n_state: int, n_head: int, n_layer: int ): super().__init__() self.blocks: Iterable[ResidualAttentionBlock] = nn.ModuleList( [ ResidualAttentionBlock(n_state, n_head, cross_attention=True, cross_only=True) for _ in range(n_layer) ] ) self.kvcache_shape1 = (n_layer, n_batch, n_text_ctx, n_state) self.kvcache_shape2 = (n_layer, n_batch, n_audio_ctx, n_state) self.register_buffer("k_cache1", torch.zeros(self.kvcache_shape1)) self.register_buffer("v_cache1", torch.zeros(self.kvcache_shape1)) self.register_buffer("k_cache2", torch.zeros(self.kvcache_shape2)) self.register_buffer("v_cache2", torch.zeros(self.kvcache_shape2)) def forward(self, xa: Tensor): """ xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state) the encoded audio features to be attended on """ self.k_cache1[:,:,:,:] = 0 self.v_cache1[:,:,:,:] = 0 x = xa for i, block in enumerate(self.blocks): x = block(x, k_cache2=self.k_cache2[i], v_cache2=self.v_cache2[i]) return x class TextDecoder_second(nn.Module): def __init__( self, n_batch: int, n_vocab: int, n_text_ctx: int, n_audio_ctx: int, n_state: int, n_head: int, n_layer: int ): super().__init__() self.token_embedding = nn.Embedding(n_vocab, n_state) self.positional_embedding = nn.Parameter(torch.empty(n_text_ctx, n_state)) self.blocks: Iterable[ResidualAttentionBlock] = nn.ModuleList( [ ResidualAttentionBlock(n_state, n_head, cross_attention=True) for _ in range(n_layer) ] ) self.ln = LayerNorm(n_state) mask = torch.empty(n_text_ctx, n_text_ctx).fill_(-np.inf).triu_(1) self.register_buffer("mask", mask, persistent=False) self.kvcache_shape1 = (n_layer, n_batch, n_text_ctx, n_state) self.kvcache_shape2 = (n_layer, n_batch, n_audio_ctx, n_state) self.register_buffer("k_cache1", torch.zeros(self.kvcache_shape1)) self.register_buffer("v_cache1", torch.zeros(self.kvcache_shape1)) self.register_buffer("k_cache2", torch.zeros(self.kvcache_shape2)) self.register_buffer("v_cache2", torch.zeros(self.kvcache_shape2)) def forward(self, x: Tensor, offset_mask: Tensor): """ x : torch.LongTensor, shape = (batch_size, <= n_ctx) the text tokens xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state) the encoded audio features to be attended on """ end_step = offset_mask.shape[-1] offset = end_step - x.shape[-1] x = ( self.token_embedding(x) + self.positional_embedding[offset:end_step] ) for i, block in enumerate(self.blocks): x = block(x, offset=offset, mask=self.mask, k_cache1=self.k_cache1[i], v_cache1=self.v_cache1[i], k_cache2=self.k_cache2[i], v_cache2=self.v_cache2[i]) x = self.ln(x) logits = ( x @ torch.transpose(self.token_embedding.weight.to(x.dtype), 0, 1) ).float() return logits import numpy as np import coremltools as ct def converter_encoder(model: whisper.Whisper, split: bool = False): model.eval() encoder = model.encoder hparams = model.dims input_shape = (1, hparams.n_mels, 3000) input_data = torch.randn(input_shape) traced_model = torch.jit.trace(encoder, input_data) coreml_model = ct.convert( traced_model, inputs=[ct.TensorType(name="logmel_data", shape=input_shape)], outputs=[ct.TensorType(name="output")], minimum_deployment_target=ct.target.iOS18, ) coreml_model.save("encoder.mlpackage") if split: ct.models.utils.bisect_model( coreml_model, "./encoder/", merge_chunks_to_pipeline=True, ) del coreml_model def converter_decoder(model: whisper.Whisper): model.eval() org_decoder = model.decoder hparams = model.dims batch_size = 1 decoder1 = TextDecoder_first( batch_size, hparams.n_vocab, hparams.n_text_ctx, hparams.n_audio_ctx, hparams.n_text_state, hparams.n_text_head, hparams.n_text_layer, ) decoder1.load_state_dict(org_decoder.state_dict(), strict=False) decoder1.eval() tokens_shape = (batch_size, 1) audio_shape = (batch_size, hparams.n_audio_ctx, hparams.n_audio_state) audio_data = torch.randn(audio_shape) traced_model1 = torch.jit.trace(decoder1, [audio_data]) audio_length = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_audio_ctx, default=1) inputs = [ ct.TensorType(shape=(batch_size, audio_length, hparams.n_audio_state), dtype=np.float16, name="audio_data"), ] outputs = [ct.TensorType(dtype=np.float16, name="dummy")] states = [ ct.StateType( wrapped_type=ct.TensorType( shape=decoder1.kvcache_shape1, dtype=np.float16 ), name="k_cache1", ), ct.StateType( wrapped_type=ct.TensorType( shape=decoder1.kvcache_shape1, dtype=np.float16 ), name="v_cache1", ), ct.StateType( wrapped_type=ct.TensorType( shape=decoder1.kvcache_shape2, dtype=np.float16 ), name="k_cache2", ), ct.StateType( wrapped_type=ct.TensorType( shape=decoder1.kvcache_shape2, dtype=np.float16 ), name="v_cache2", ), ] converted_model = ct.convert( traced_model1, inputs=inputs, outputs=outputs, states=states, minimum_deployment_target=ct.target.iOS18, ) converted_model.save("decoder_first.mlpackage") del traced_model1 del converted_model decoder2 = TextDecoder_second( batch_size, hparams.n_vocab, hparams.n_text_ctx, hparams.n_audio_ctx, hparams.n_text_state, hparams.n_text_head, hparams.n_text_layer, ) decoder2.load_state_dict(org_decoder.state_dict(), strict=False) decoder2.eval() token_data = torch.randint(hparams.n_vocab, tokens_shape).long() offset_mask = torch.zeros(tokens_shape) traced_model2 = torch.jit.trace(decoder2, [token_data, offset_mask]) query_length = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_text_ctx, default=1) end_step_dim = ct.RangeDim(lower_bound=1, upper_bound=hparams.n_text_ctx, default=1) inputs = [ ct.TensorType(shape=(batch_size, query_length), dtype=np.int32, name="token_data"), ct.TensorType(shape=(batch_size, end_step_dim), dtype=np.float16, name="offset_mask"), ] outputs = [ct.TensorType(dtype=np.float16, name="logits")] converted_model = ct.convert( traced_model2, inputs=inputs, outputs=outputs, states=states, minimum_deployment_target=ct.target.iOS18, ) converted_model.save("decoder_second.mlpackage") del traced_model2 del converted_model def test_model(hparams: ModelDimensions): logmel_shape = (1, hparams.n_mels, 3000) encoder = ct.models.MLModel("encoder.mlpackage") encoder_output = encoder.predict({'logmel_data': np.random.rand(*logmel_shape)}) audio_data = encoder_output['output'] decoder1 = ct.models.MLModel("decoder_first.mlpackage") decoder2 = ct.models.MLModel("decoder_second.mlpackage") decoder_state = decoder1.make_state() decoder_input = { 'audio_data': audio_data, } decoder_output = decoder1.predict(decoder_input, decoder_state) past_kv_len = 0 token_data = np.random.randint(hparams.n_vocab, size=(1, 5), dtype=np.int32) offset_mask = np.zeros((1, past_kv_len + 5)) decoder_input = { 'token_data': token_data, 'offset_mask': offset_mask, } decoder_output = decoder2.predict(decoder_input, decoder_state) print(decoder_output) past_kv_len += 5 while past_kv_len + 1 < hparams.n_text_ctx: token_data = np.random.randint(hparams.n_vocab, size=(1, 1), dtype=np.int32) offset_mask = np.zeros((1, past_kv_len + 1)) decoder_input = { 'token_data': token_data, 'offset_mask': offset_mask, } decoder_output = decoder2.predict(decoder_input, decoder_state) print(decoder_output) past_kv_len += 1 def print_dims(model: whisper.Whisper): with open('model_dims.json', 'w') as f: json.dump(model.dims.__dict__, f, indent=2) if __name__=='__main__': import os os.makedirs("work", exist_ok=True) os.chdir("work") for model_size in ['tiny','base','small','medium','large-v2','large-v3']: print(model_size) os.makedirs(model_size, exist_ok=True) os.chdir(model_size) model = whisper.load_model(model_size) print_dims(model) converter_encoder(model, split=model_size.startswith('large')) converter_decoder(model) # test_model(model.dims) del model os.chdir("..") os.chdir("..")