Spaces:
Sleeping
Sleeping
import json | |
import os | |
import re | |
from collections import defaultdict | |
import glob | |
import numpy as np | |
import time | |
import torch | |
import torch.nn as nn | |
import torchvision.models as models | |
import torch.nn.functional as F | |
from torch import optim | |
from torch.utils.data import Dataset | |
from torchvision import transforms | |
from torch.utils.data import DataLoader | |
from PIL import Image | |
class ImageEncoder(nn.Module): | |
def __init__(self, embed_dim): | |
super(ImageEncoder, self).__init__() | |
# Load a pretrained VGG19 model | |
self.model = models.vgg19(pretrained=True) | |
# Get the number of input features for the last fully connected layer | |
in_features = self.model.classifier[-1].in_features | |
# Removing the last layer of VGG19's classifier (the final fully connected layer for classification) | |
self.model.classifier = nn.Sequential(*list(self.model.classifier.children())[:-1]) | |
# Adding a new fully connected layer to map features to the desired embedding dimension | |
self.fc = nn.Linear(in_features, embed_dim) | |
def forward(self, image): | |
# Extracting features of the image using the modified VGG19 model | |
with torch.no_grad(): # Freezing the weights of the pretrained model during this pass | |
img_feature = self.model(image) # Output shape: (batch_size, feature_dim) | |
#features to the embedding dimension | |
img_feature = self.fc(img_feature) # Output shape: (batch_size, embed_dim) | |
# Applying L2 normalization to the features for better similarity comparisons | |
l2_norm = F.normalize(img_feature, p=2, dim=1).detach() # Normalize along the feature dimension | |
return l2_norm | |
class ImageEncoder_attn(nn.Module): | |
def __init__(self, embed_dim): | |
super(ImageEncoder_attn, self).__init__() | |
# Load a pretrained VGG19 model | |
self.model = models.vgg19(pretrained=True).features | |
# Adding a 1x1 convolutional layer to map features to the desired embedding dimension | |
self.conv = nn.Conv2d(512, embed_dim, kernel_size=1) | |
def forward(self, image): | |
# Extracting spatial features of the image using the modified VGG19 model | |
with torch.no_grad(): # Freezing the weights of the pretrained model during this pass | |
img_features = self.model(image) # Shape: (batch_size, 512, H, W) | |
# Map features to the desired embedding dimension | |
img_features = self.conv(img_features) # Shape: (batch_size, embed_dim, H, W) | |
# Flatten spatial dimensions to get per-region features | |
img_features = img_features.flatten(2).permute(0, 2, 1) # Shape: (batch_size, num_regions, embed_dim) | |
return img_features | |
class QuesEncoder(nn.Module): | |
def __init__(self, ques_vocab_size, word_embed, hidden_size, num_hidden, qu_feature_size): | |
super(QuesEncoder, self).__init__() | |
# Embedding layer to map question words to word embeddings | |
self.word_embedding = nn.Embedding(ques_vocab_size, word_embed) | |
# Activation function to add non-linearity to embeddings | |
self.tanh = nn.Tanh() | |
# LSTM layer for sequential processing of question embeddings | |
# Takes word embeddings as input and outputs hidden states | |
self.lstm = nn.LSTM(word_embed, hidden_size, num_hidden) # (input_dim, hidden_dim, num_layers) | |
# Fully connected layer to transform the concatenated LSTM states to the desired feature size | |
self.fc = nn.Linear(2 * num_hidden * hidden_size, qu_feature_size) | |
def forward(self, question): | |
# Map question words to embeddings | |
# Shape: (batch_size, question_length, word_embed) | |
ques_embedding = self.word_embedding(question) | |
# Applying Tanh activation to the embeddings | |
ques_embedding = self.tanh(ques_embedding) | |
# Transpose for LSTM input: (question_length, batch_size, word_embed) | |
ques_embedding = ques_embedding.transpose(0, 1) | |
# Passing embeddings through the LSTM | |
# Outputs: LSTM outputs (_) and final hidden states (hidden, cell) | |
# hidden and cell shapes: (num_layers, batch_size, hidden_size) | |
_, (hidden, cell) = self.lstm(ques_embedding) | |
# Concatenating the hidden and cell states along the feature dimension | |
# Shape: (num_layers, batch_size, 2 * hidden_size) | |
ques_feature = torch.cat((hidden, cell), dim=2) | |
# Transpose for batch-first format: (batch_size, num_layers, 2 * hidden_size) | |
ques_feature = ques_feature.transpose(0, 1) | |
# Flattening the feature tensor: (batch_size, num_layers * 2 * hidden_size) | |
ques_feature = ques_feature.reshape(ques_feature.size(0), -1) | |
# Applying Tanh activation to the flattened features | |
ques_feature = self.tanh(ques_feature) | |
# Transforming the features to the desired output size: (batch_size, qu_feature_size) | |
ques_feature = self.fc(ques_feature) | |
return ques_feature | |
class VQAModel(nn.Module): | |
def __init__(self, feature_size, ques_vocab_size, ans_vocab_size, word_embed, hidden_size, num_hidden): | |
super(VQAModel, self).__init__() | |
# Encoder to extract image features | |
self.img_encoder = ImageEncoder(feature_size) | |
# Encoder to extract question features | |
self.ques_encoder = QuesEncoder(ques_vocab_size, word_embed, hidden_size, num_hidden, feature_size) | |
# Dropout layer to prevent overfitting | |
self.dropout = nn.Dropout(0.5) | |
# Tanh activation function for non-linearity | |
self.tanh = nn.Tanh() | |
# Fully connected layer to map combined features to answer space | |
self.fc1 = nn.Linear(feature_size, ans_vocab_size) | |
# Second fully connected layer to refine logits in the answer space | |
self.fc2 = nn.Linear(ans_vocab_size, ans_vocab_size) | |
def forward(self, image, question): | |
# Extract image features using the image encoder | |
# Output shape: (batch_size, feature_size) | |
img_feature = self.img_encoder(image) | |
# Extract question features using the question encoder | |
# Output shape: (batch_size, feature_size) | |
qst_feature = self.ques_encoder(question) | |
# Combine image and question features element-wise (Hadamard product) | |
# Output shape: (batch_size, feature_size) | |
combined_feature = img_feature * qst_feature | |
# Apply dropout for regularization | |
combined_feature = self.dropout(combined_feature) | |
# Apply Tanh activation for non-linearity | |
combined_feature = self.tanh(combined_feature) | |
# Map combined features to the answer space using the first fully connected layer | |
# Output shape: (batch_size, ans_vocab_size) | |
combined_feature = self.fc1(combined_feature) | |
# Apply another round of dropout for regularization | |
combined_feature = self.dropout(combined_feature) | |
# Apply Tanh activation again for non-linearity | |
combined_feature = self.tanh(combined_feature) | |
# Refine logits using the second fully connected layer | |
# Output shape: (batch_size, ans_vocab_size) | |
logits = self.fc2(combined_feature) | |
return logits | |
class VQAModel_attn(nn.Module): | |
def __init__(self, feature_size, ques_vocab_size, ans_vocab_size, word_embed, hidden_size, num_hidden): | |
super(VQAModel_attn, self).__init__() | |
# Encoder to extract image features | |
self.img_encoder = ImageEncoder_attn(feature_size) | |
# Encoder to extract question features | |
self.ques_encoder = QuesEncoder(ques_vocab_size, word_embed, hidden_size, num_hidden, feature_size) | |
# Attention mechanism layers | |
self.attention_fc = nn.Linear(2 * feature_size, 1) # For compatibility scoring | |
# Dropout layer | |
self.dropout = nn.Dropout(0.5) | |
# Fully connected layers for answer prediction | |
self.fc1 = nn.Linear(feature_size, ans_vocab_size) | |
self.fc2 = nn.Linear(ans_vocab_size, ans_vocab_size) | |
def forward(self, image, question): | |
# Extract image features (batch_size, num_regions, feature_size) | |
img_features = self.img_encoder(image) | |
# Extract question features (batch_size, feature_size) | |
qst_feature = self.ques_encoder(question) | |
# Ensure qst_feature has the correct dimensions | |
# Expand to (batch_size, 1, feature_size), then repeat to match num_regions | |
qst_feature_exp = qst_feature.unsqueeze(1).expand(-1, img_features.size(1), -1) | |
#print(f"img_features shape: {img_features.shape}") | |
#print(f"qst_feature shape: {qst_feature.shape}") | |
#print(f"qst_feature_exp shape: {qst_feature_exp.shape}") | |
# Concatenate image and question features along the last dimension | |
# Shape: (batch_size, num_regions, 2 * feature_size) | |
combined_features = torch.cat([img_features, qst_feature_exp], dim=-1) | |
# Compute attention scores for each region | |
# Shape: (batch_size, num_regions, 1) | |
attention_scores = self.attention_fc(combined_features) | |
# Apply softmax to get attention weights | |
# Shape: (batch_size, num_regions) | |
attention_weights = F.softmax(attention_scores.squeeze(-1), dim=1) | |
# Compute the weighted sum of image features | |
# Shape: (batch_size, feature_size) | |
attended_img_feature = torch.sum(img_features * attention_weights.unsqueeze(-1), dim=1) | |
# Combine attended image features with question features | |
combined_feature = attended_img_feature + qst_feature | |
# Dropout and fully connected layers for answer prediction | |
combined_feature = self.dropout(combined_feature) | |
combined_feature = F.relu(self.fc1(combined_feature)) | |
logits = self.fc2(combined_feature) | |
return logits |