autoencoder / modeling_autoencoder.py
amaye15's picture
Update modeling_autoencoder.py
1006385
from typing import Optional, Sequence
import torch
from dataclasses import dataclass
from torch import nn, Tensor
from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModel
from transformers.utils import ModelOutput
#from huggingface_hub import notebook_login
#from transformers import AutoConfig, AutoModel
#from autoencoder_model.modeling_autoencoder import AutoEncoder, AutoEncoderConfig
#notebook_login()
# Register Huggingface Model
#AutoEncoderConfig.register_for_auto_class()
#AutoEncoder.register_for_auto_class("AutoModel")
#AutoConfig.register("autoencoder", AutoEncoderConfig)
#AutoModel.register(AutoEncoderConfig, AutoModel)
# Create Model
#autoencoder = AutoEncoder(AutoEncoderConfig())
#autoencoder.push_to_hub("autoencoder")
# Download Model
#config = AutoConfig.from_pretrained("amaye15/autoencoder", trust_remote_code = True)
#autoencoder = AutoModel.from_config(config, trust_remote_code = True)
# Stucture
# Example
# Model Outputs
# Model Configuration
# Model Layers
# Model
##########################################################################################
#################################### Outputs #############################################
##########################################################################################
@dataclass
class AutoencoderModelOutput(ModelOutput):
"""
Represents the output of an autoencoder model. This class holds various
important tensors that are the result of passing data through an autoencoder.
Attributes:
logits (torch.FloatTensor, optional): The reconstructed output from the autoencoder.
This is typically the direct output of the decoder part of the model.
labels (torch.FloatTensor, optional): The true labels associated with the input data,
if available. Useful for supervised training scenarios or evaluation.
hidden_state (torch.FloatTensor, optional): The encoded representation of the input data.
This is the output of the encoder part of the model and serves as a compressed
representation of the input data.
loss (torch.FloatTensor, optional): The computed loss value when comparing the reconstructed
output to the original input data. This is essential for training and evaluating the model's performance.
"""
logits: torch.FloatTensor = None
labels: torch.FloatTensor = None
hidden_state: torch.FloatTensor = None
loss: torch.FloatTensor = None
##########################################################################################
################################# Configuration ##########################################
##########################################################################################
class AutoEncoderConfig(PretrainedConfig):
"""
Configuration class for AutoEncoder. This class stores the parameters for the autoencoder model.
Attributes:
input_dim (int): The dimensionality of the input data. Default is 128.
latent_dim (int): The dimensionality of the latent representation. Default is 64.
layer_types (str): The type of layers used, e.g., 'linear', 'lstm', 'gru', 'rnn'. Default is 'linear'.
dropout_rate (float): The dropout rate applied after each layer (except for the last layer). Default is 0.1.
num_layers (int): The number of layers in the encoder/decoder. Default is 3.
compression_rate (float): Factor by which to compress the dimensions through layers. Default is 0.5.
bidirectional (bool): Whether the sequence layers should be bidirectional. Default is False.
embed (bool): Whether to use embedding for input data. If True, `vocab_size` and `max_position` must be specified. Default is False.
vocab_size (int): The size of the vocabulary. Required if `embed` is True.
max_position (int): The maximum position for positional encoding. Required if `embed` is True.
Raises:
ValueError: If `embed` is True and either `vocab_size` or `max_position` is not defined as an integer.
"""
model_type = "autoencoder"
def __init__(
self,
input_dim: int = 128,
latent_dim: int = 64,
layer_types: str = 'linear',
dropout_rate: float = 0.1,
num_layers: int = 3,
compression_rate: float = 0.5,
bidirectional: bool = False,
embed: bool = False,
vocab_size: int|bool = False,
max_position: int|bool = False,
pad_token_id: int = 0,
bos_token_id: int = 1,
eos_token_id: int = 2,
**kwargs
):
super().__init__(**kwargs)
self.input_dim = input_dim
self.latent_dim = latent_dim
self.layer_types = layer_types
self.dropout_rate = dropout_rate
self.num_layers = num_layers
self.compression_rate = compression_rate
self.bidirectional = bidirectional
self.embed = embed
self.vocab_size = vocab_size
self.max_position = max_position
self.pad_token_id = pad_token_id
self.bos_token_id = bos_token_id
self.eos_token_id = eos_token_id
if self.embed:
if not self.vocab_size and isinstance(self.vocab_size, int):
raise ValueError("vocab_size needs to be defined when embed is True - AutoEncoderConfig(embed = True, vocab_size = 10_000, max_postion = 512")
if not self.max_position and isinstance(self.max_position, int):
raise ValueError("max_position needs to be defined when embed is True - AutoEncoderConfig(embed = True, vocab_size = 10_000, max_postion = 512)")
##########################################################################################
############################# Block/Encoder/Decoder ######################################
##########################################################################################
def create_layers(
model_section: str,
layer_types: str,
input_dim: int,
latent_dim: int,
num_layers: int,
dropout_rate: float,
compression_rate: float,
bidirectional: bool,
classes: bool|int = False
) -> nn.Sequential:
"""
Creates a sequence of layers for the encoder or decoder part of the autoencoder.
Args:
model_section (str): A string indicating whether this is for 'encoder' or 'decoder'.
layer_types (str): The type of layers to include in the sequence.
input_dim (int): The input dimension for the first layer.
latent_dim (int): The target dimension for the latent representation.
num_layers (int): The number of layers to create.
dropout_rate (float): The dropout rate to apply between layers.
compression_rate (float): The compression rate for reducing dimensions through layers.
bidirectional (bool): Whether the RNN layers should be bidirectional.
classes (bool|int): If an integer is provided, it defines the output dimension of the last layer in the decoder.
It's ignored for the encoder or if the value is False.
Returns:
A nn.Sequential module containing the created layers. The configuration of these layers is determined by the arguments provided.
Raises:
ValueError: If certain layer type conditions are not met or if required parameters for specific configurations are missing.
"""
layers = [] # Initialize an empty list to store the layers.
current_dim = input_dim # Start with the initial input dimension.
# Lists to store input and output dimensions for each layer.
input_dimensions = []
output_dimensions = []
# Calculate input and output dimensions for each layer.
for _ in range(num_layers):
input_dimensions.append(current_dim) # Store current dimension.
next_dim = max(int(current_dim * compression_rate), latent_dim) # Calculate next dimension with compression.
current_dim = next_dim # Update current dimension.
output_dimensions.append(current_dim) # Store output dimension.
# Ensure the last layer's output dimension is the latent dimension.
output_dimensions[num_layers - 1] = latent_dim
# Adjust dimensions for decoder configuration.
if model_section == "decoder":
# Swap input and output dimensions for decoder.
input_dimensions, output_dimensions = output_dimensions, input_dimensions
input_dimensions.reverse() # Reverse the order for decoder stack.
output_dimensions.reverse()
# Set the final layer's dimension to classes if specified and valid.
if isinstance(classes, int) and not isinstance(classes, bool):
if bidirectional:
output_dimensions[-1] = classes//2
else:
output_dimensions[-1] = classes
# Adjust dimensions for bidirectional RNN layers.
if bidirectional and (layer_types in ['lstm', 'rnn', 'gru']):
output_dimensions = [2 * value for value in output_dimensions]
# Construct layers based on the specified layer type.
for idx, (input_dim, output_dim) in enumerate(zip(input_dimensions, output_dimensions)):
# Add layers according to the specified type.
if layer_types == 'linear':
layers.append(nn.Linear(input_dim, output_dim))
elif layer_types in ['lstm', 'rnn', 'gru']:
rnn_layer = getattr(nn, layer_types.upper()) # Dynamically get the RNN layer class.
half_output_dim = output_dim // (2 if bidirectional else 1)
if model_section == "decoder":
if idx == 0:
layers.append(rnn_layer(input_dim, half_output_dim, batch_first=True, bidirectional=bidirectional))
else:
layers.append(rnn_layer(input_dim*2, half_output_dim, batch_first=True, bidirectional=bidirectional))
else:
layers.append(rnn_layer(input_dim, half_output_dim, batch_first=True, bidirectional=bidirectional))
# Add dropout layer between layers, except for the last layer.
if (idx != num_layers - 1) and (dropout_rate is not None):
layers.append(nn.Dropout(dropout_rate))
# Return the sequence of layers as an nn.Sequential module.
return nn.Sequential(*layers)
##########################################################################################
##################################### Model ##############################################
##########################################################################################
class AutoEncoder(PreTrainedModel):
"""
AutoEncoder model for creating an encoder-decoder architecture.
Inherits from PreTrainedModel to utilize its pretrained model features from the Hugging Face library.
Args:
config (AutoEncoderConfig): The configuration instance with all model parameters.
"""
config_class = AutoEncoderConfig
def __init__(self, config: AutoEncoderConfig):
super(AutoEncoder, self).__init__(config)
# Embeddings
if config.embed:
# Word Embeddings
self.word_embeddings = nn.Embedding(config.vocab_size,
config.input_dim,
config.pad_token_id,)
# Postional Embeddings
self.position_embeddings = nn.Embedding(config.max_position,
config.input_dim,)
# Encoder
self.encoder = create_layers("encoder",
config.layer_types,
config.input_dim,
config.latent_dim,
config.num_layers,
config.dropout_rate,
config.compression_rate,
config.bidirectional,)
# Decoder
if config.embed:
# Assuming symmetry between encoder and decoder
self.decoder = create_layers("decoder",
config.layer_types,
config.input_dim,
config.latent_dim,
config.num_layers,
config.dropout_rate,
config.compression_rate,
config.bidirectional,
config.vocab_size,)
else:
# Assuming symmetry between encoder and decoder
self.decoder = create_layers("decoder",
config.layer_types,
config.input_dim,
config.latent_dim,
config.num_layers,
config.dropout_rate,
config.compression_rate,
config.bidirectional,)
def forward(self, input_ids: Tensor, position_ids: Optional[Tensor] = None, labels: Optional[Tensor] = None) -> Tensor:
# Define Data Class
outputs = AutoencoderModelOutput()
outputs.labels = labels if labels != None else input_ids
# Embeddings
if self.config.embed:
# Word Embeddings
input_embeddings = self.word_embeddings(input_ids)
# Positional Embeddings
seq_length = input_ids.size(1)
position_ids = position_ids or torch.arange(seq_length, dtype=torch.long, device=input_ids.device)
position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
position_embeddings = self.position_embeddings(position_ids)
# Combine Embeddings
input_ids = input_embeddings + position_embeddings
# Non-Linear Encoding & Decoding
if self.config.layer_types in ['lstm', 'rnn', 'gru']:
# Encoding
for layer in self.encoder:
if isinstance(layer, nn.LSTM):
input_ids, (h_n, c_n) = layer(input_ids)
elif isinstance(layer, nn.RNN) or isinstance(layer, nn.GRU):
input_ids, h_o = layer(input_ids)
else:
input_ids = layer(input_ids)
# Hidden Vector
outputs.hidden_state = input_ids
# Decoding
for layer in self.decoder:
if isinstance(layer, nn.LSTM):
input_ids, (h_n, c_n) = layer(input_ids)
elif isinstance(layer, nn.RNN) or isinstance(layer, nn.GRU):
input_ids, h_o = layer(input_ids)
else:
input_ids = layer(input_ids)
# Linear Encoding & Decoding
else:
# Encoding
input_ids = self.encoder(input_ids)
# Hidden Vector
outputs.hidden_state = input_ids
# Decoding
input_ids = self.decoder(input_ids)
outputs.logits = input_ids
# Choose loss function based on dtype
if torch.is_floating_point(outputs.labels):
loss_fn = nn.MSELoss()
outputs.loss = loss_fn(outputs.logits.view(-1), outputs.labels.view(-1))
elif not torch.is_floating_point(outputs.labels) and not torch.is_complex(outputs.labels):
loss_fn = nn.CrossEntropyLoss()
outputs.loss = loss_fn(outputs.logits.reshape(-1, self.config.vocab_size), outputs.labels.view(-1))
else:
raise ValueError("Unsupported tensor dtype for these loss functions")
return outputs