File size: 9,833 Bytes
be4c742
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b65df9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be4c742
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b65df9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import json
import os
import re
from collections import defaultdict
import glob
import numpy as np
import time

import torch
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import DataLoader

from PIL import Image

class ImageEncoder(nn.Module):
    def __init__(self, embed_dim):
        super(ImageEncoder, self).__init__()
        # Load a pretrained VGG19 model
        self.model = models.vgg19(pretrained=True)
        # Get the number of input features for the last fully connected layer
        in_features = self.model.classifier[-1].in_features
        # Removing the last layer of VGG19's classifier (the final fully connected layer for classification)
        self.model.classifier = nn.Sequential(*list(self.model.classifier.children())[:-1])
        # Adding a new fully connected layer to map features to the desired embedding dimension
        self.fc = nn.Linear(in_features, embed_dim)

    def forward(self, image):
        # Extracting features of the image using the modified VGG19 model
        with torch.no_grad():  # Freezing the weights of the pretrained model during this pass
            img_feature = self.model(image)  # Output shape: (batch_size, feature_dim)

        #features to the embedding dimension
        img_feature = self.fc(img_feature)  # Output shape: (batch_size, embed_dim)

        # Applying L2 normalization to the features for better similarity comparisons
        l2_norm = F.normalize(img_feature, p=2, dim=1).detach()  # Normalize along the feature dimension

        return l2_norm

class ImageEncoder_attn(nn.Module):
    def __init__(self, embed_dim):
        super(ImageEncoder_attn, self).__init__()
        # Load a pretrained VGG19 model
        self.model = models.vgg19(pretrained=True).features
        # Adding a 1x1 convolutional layer to map features to the desired embedding dimension
        self.conv = nn.Conv2d(512, embed_dim, kernel_size=1)

    def forward(self, image):
        # Extracting spatial features of the image using the modified VGG19 model
        with torch.no_grad():  # Freezing the weights of the pretrained model during this pass
            img_features = self.model(image)  # Shape: (batch_size, 512, H, W)

        # Map features to the desired embedding dimension
        img_features = self.conv(img_features)  # Shape: (batch_size, embed_dim, H, W)

        # Flatten spatial dimensions to get per-region features
        img_features = img_features.flatten(2).permute(0, 2, 1)  # Shape: (batch_size, num_regions, embed_dim)

        return img_features

class QuesEncoder(nn.Module):
    def __init__(self, ques_vocab_size, word_embed, hidden_size, num_hidden, qu_feature_size):
        super(QuesEncoder, self).__init__()
        # Embedding layer to map question words to word embeddings
        self.word_embedding = nn.Embedding(ques_vocab_size, word_embed)
        # Activation function to add non-linearity to embeddings
        self.tanh = nn.Tanh()
        # LSTM layer for sequential processing of question embeddings
        # Takes word embeddings as input and outputs hidden states
        self.lstm = nn.LSTM(word_embed, hidden_size, num_hidden)  # (input_dim, hidden_dim, num_layers)
        # Fully connected layer to transform the concatenated LSTM states to the desired feature size
        self.fc = nn.Linear(2 * num_hidden * hidden_size, qu_feature_size)

    def forward(self, question):
        # Map question words to embeddings
        # Shape: (batch_size, question_length, word_embed)
        ques_embedding = self.word_embedding(question)
        # Applying Tanh activation to the embeddings
        ques_embedding = self.tanh(ques_embedding)
        # Transpose for LSTM input: (question_length, batch_size, word_embed)
        ques_embedding = ques_embedding.transpose(0, 1)
        # Passing embeddings through the LSTM
        # Outputs: LSTM outputs (_) and final hidden states (hidden, cell)
        # hidden and cell shapes: (num_layers, batch_size, hidden_size)
        _, (hidden, cell) = self.lstm(ques_embedding)
        # Concatenating the hidden and cell states along the feature dimension
        # Shape: (num_layers, batch_size, 2 * hidden_size)
        ques_feature = torch.cat((hidden, cell), dim=2)
        # Transpose for batch-first format: (batch_size, num_layers, 2 * hidden_size)
        ques_feature = ques_feature.transpose(0, 1)
        # Flattening the feature tensor: (batch_size, num_layers * 2 * hidden_size)
        ques_feature = ques_feature.reshape(ques_feature.size(0), -1)
        # Applying Tanh activation to the flattened features
        ques_feature = self.tanh(ques_feature)
        # Transforming the features to the desired output size: (batch_size, qu_feature_size)
        ques_feature = self.fc(ques_feature)

        return ques_feature

class VQAModel(nn.Module):
    def __init__(self, feature_size, ques_vocab_size, ans_vocab_size, word_embed, hidden_size, num_hidden):
        super(VQAModel, self).__init__()

        # Encoder to extract image features
        self.img_encoder = ImageEncoder(feature_size)

        # Encoder to extract question features
        self.ques_encoder = QuesEncoder(ques_vocab_size, word_embed, hidden_size, num_hidden, feature_size)

        # Dropout layer to prevent overfitting
        self.dropout = nn.Dropout(0.5)

        # Tanh activation function for non-linearity
        self.tanh = nn.Tanh()

        # Fully connected layer to map combined features to answer space
        self.fc1 = nn.Linear(feature_size, ans_vocab_size)

        # Second fully connected layer to refine logits in the answer space
        self.fc2 = nn.Linear(ans_vocab_size, ans_vocab_size)

    def forward(self, image, question):
        # Extract image features using the image encoder
        # Output shape: (batch_size, feature_size)
        img_feature = self.img_encoder(image)

        # Extract question features using the question encoder
        # Output shape: (batch_size, feature_size)
        qst_feature = self.ques_encoder(question)

        # Combine image and question features element-wise (Hadamard product)
        # Output shape: (batch_size, feature_size)
        combined_feature = img_feature * qst_feature

        # Apply dropout for regularization
        combined_feature = self.dropout(combined_feature)

        # Apply Tanh activation for non-linearity
        combined_feature = self.tanh(combined_feature)

        # Map combined features to the answer space using the first fully connected layer
        # Output shape: (batch_size, ans_vocab_size)
        combined_feature = self.fc1(combined_feature)

        # Apply another round of dropout for regularization
        combined_feature = self.dropout(combined_feature)

        # Apply Tanh activation again for non-linearity
        combined_feature = self.tanh(combined_feature)

        # Refine logits using the second fully connected layer
        # Output shape: (batch_size, ans_vocab_size)
        logits = self.fc2(combined_feature)

        return logits

class VQAModel_attn(nn.Module):
    def __init__(self, feature_size, ques_vocab_size, ans_vocab_size, word_embed, hidden_size, num_hidden):
        super(VQAModel_attn, self).__init__()

        # Encoder to extract image features
        self.img_encoder = ImageEncoder_attn(feature_size)

        # Encoder to extract question features
        self.ques_encoder = QuesEncoder(ques_vocab_size, word_embed, hidden_size, num_hidden, feature_size)

        # Attention mechanism layers
        self.attention_fc = nn.Linear(2 * feature_size, 1)  # For compatibility scoring

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

        # Fully connected layers for answer prediction
        self.fc1 = nn.Linear(feature_size, ans_vocab_size)
        self.fc2 = nn.Linear(ans_vocab_size, ans_vocab_size)

    def forward(self, image, question):
        # Extract image features (batch_size, num_regions, feature_size)
        img_features = self.img_encoder(image)

        # Extract question features (batch_size, feature_size)
        qst_feature = self.ques_encoder(question)

        # Ensure qst_feature has the correct dimensions
        # Expand to (batch_size, 1, feature_size), then repeat to match num_regions
        qst_feature_exp = qst_feature.unsqueeze(1).expand(-1, img_features.size(1), -1)

        #print(f"img_features shape: {img_features.shape}")
        #print(f"qst_feature shape: {qst_feature.shape}")
        #print(f"qst_feature_exp shape: {qst_feature_exp.shape}")
        # Concatenate image and question features along the last dimension
        # Shape: (batch_size, num_regions, 2 * feature_size)
        combined_features = torch.cat([img_features, qst_feature_exp], dim=-1)

        # Compute attention scores for each region
        # Shape: (batch_size, num_regions, 1)
        attention_scores = self.attention_fc(combined_features)

        # Apply softmax to get attention weights
        # Shape: (batch_size, num_regions)
        attention_weights = F.softmax(attention_scores.squeeze(-1), dim=1)

        # Compute the weighted sum of image features
        # Shape: (batch_size, feature_size)
        attended_img_feature = torch.sum(img_features * attention_weights.unsqueeze(-1), dim=1)

        # Combine attended image features with question features
        combined_feature = attended_img_feature + qst_feature

        # Dropout and fully connected layers for answer prediction
        combined_feature = self.dropout(combined_feature)
        combined_feature = F.relu(self.fc1(combined_feature))
        logits = self.fc2(combined_feature)

        return logits