File size: 4,085 Bytes
a34ca71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cddd5fd
a34ca71
 
8bc220a
a34ca71
8bc220a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a34ca71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from typing import Optional, Tuple, Union
import torch
from torch import nn
from transformers.modeling_outputs import BaseModelOutput
from transformers import Wav2Vec2BertModel, Wav2Vec2BertConfig, Wav2Vec2BertPreTrainedModel
from transformers.models.mllama.configuration_mllama import MllamaTextConfig


class Llama3Embedding(Wav2Vec2BertPreTrainedModel):
    base_model_prefix = "audio_model"
    def __init__(self, config: Wav2Vec2BertConfig, text_config: MllamaTextConfig):
        super().__init__(config)
        assert config.add_adapter is True, f'{type(self).__name__} requires add adapter to be true.'
        assert config.output_hidden_size == text_config.hidden_size
        self.text_embeddings = nn.Embedding(text_config.vocab_size, text_config.hidden_size, text_config.pad_token_id)
        self.audio_embedding = Wav2Vec2BertModel(config)
        #assert self.text_embeddings.weight.size(-1) == text_config.hidden_size, f"{self.text_embeddings.weight}, {text_config.hidden_size=}, {text_config.vocab_size=}"
        self.start_of_audio = nn.Parameter(data=torch.zeros((1, config.output_hidden_size)), requires_grad=True)
        self.end_of_audio = nn.Parameter(data=torch.zeros((1, config.output_hidden_size)), requires_grad=True)
        self.text_config = text_config

    def _init_weights(self, module):
        std = self.text_config.initializer_range
        """Initialize the weights"""
        if isinstance(module, Wav2Vec2BertSelfAttention):
            if hasattr(module, "pos_bias_u"):
                nn.init.xavier_uniform_(module.pos_bias_u)
            if hasattr(module, "pos_bias_v"):
                nn.init.xavier_uniform_(module.pos_bias_v)
        elif isinstance(module, Wav2Vec2BertFeatureProjection):
            k = math.sqrt(1 / module.projection.in_features)
            nn.init.uniform_(module.projection.weight, a=-k, b=k)
            nn.init.uniform_(module.projection.bias, a=-k, b=k)
        elif isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)

            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, (nn.LayerNorm, nn.GroupNorm)):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)
        elif isinstance(module, nn.Conv1d):
            nn.init.kaiming_normal_(module.weight)

            if module.bias is not None:
                k = math.sqrt(module.groups / (module.in_channels * module.kernel_size[0]))
                nn.init.uniform_(module.bias, a=-k, b=k)        
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.Parameter):
            module.data.normal_(mean=0.0, std=std)
            
    def forward(
        self,
        input_ids: torch.LongTensor = None,
        audio_features: Optional[torch.Tensor] = None,
    ) -> Union[BaseModelOutput, Tuple[torch.Tensor, ...]]:
        input_embeddings = self.text_embeddings(torch.clamp(input_ids, min=0))
        if audio_features is None:
            return input_embeddings
        bs, max_num_img, l, d = audio_features.shape
        audio_embeddings = self.audio_embedding(input_features=audio_features.view((bs*max_num_img, l, d)))['last_hidden_state']
        audio_embeddings = audio_embeddings.view((bs, max_num_img, -1, self.start_of_audio.shape[-1]))        

        for i in range(bs):
            for j in range(max_num_img):
                audio_id = -1 - j
                if torch.any(input_ids[i] == audio_id):
                    positions = torch.nonzero(input_ids[i] == audio_id, as_tuple=True)
                    seq_len = input_embeddings[i][positions].shape[0] - 2
                    input_embeddings[i] = input_embeddings[i].index_put(positions, torch.concat([self.start_of_audio, audio_embeddings[i, j, :seq_len, :], self.end_of_audio]), accumulate=False)
        return input_embeddings