|
|
|
from typing import Optional, Sequence |
|
import torch |
|
from dataclasses import dataclass |
|
from torch import nn, Tensor |
|
from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModel |
|
from transformers.utils import ModelOutput |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
class AutoencoderModelOutput(ModelOutput): |
|
""" |
|
Represents the output of an autoencoder model. This class holds various |
|
important tensors that are the result of passing data through an autoencoder. |
|
|
|
Attributes: |
|
logits (torch.FloatTensor, optional): The reconstructed output from the autoencoder. |
|
This is typically the direct output of the decoder part of the model. |
|
labels (torch.FloatTensor, optional): The true labels associated with the input data, |
|
if available. Useful for supervised training scenarios or evaluation. |
|
hidden_state (torch.FloatTensor, optional): The encoded representation of the input data. |
|
This is the output of the encoder part of the model and serves as a compressed |
|
representation of the input data. |
|
loss (torch.FloatTensor, optional): The computed loss value when comparing the reconstructed |
|
output to the original input data. This is essential for training and evaluating the model's performance. |
|
""" |
|
logits: torch.FloatTensor = None |
|
labels: torch.FloatTensor = None |
|
hidden_state: torch.FloatTensor = None |
|
loss: torch.FloatTensor = None |
|
|
|
|
|
|
|
|
|
|
|
class AutoEncoderConfig(PretrainedConfig): |
|
""" |
|
Configuration class for AutoEncoder. This class stores the parameters for the autoencoder model. |
|
|
|
Attributes: |
|
input_dim (int): The dimensionality of the input data. Default is 128. |
|
latent_dim (int): The dimensionality of the latent representation. Default is 64. |
|
layer_types (str): The type of layers used, e.g., 'linear', 'lstm', 'gru', 'rnn'. Default is 'linear'. |
|
dropout_rate (float): The dropout rate applied after each layer (except for the last layer). Default is 0.1. |
|
num_layers (int): The number of layers in the encoder/decoder. Default is 3. |
|
compression_rate (float): Factor by which to compress the dimensions through layers. Default is 0.5. |
|
bidirectional (bool): Whether the sequence layers should be bidirectional. Default is False. |
|
embed (bool): Whether to use embedding for input data. If True, `vocab_size` and `max_position` must be specified. Default is False. |
|
vocab_size (int): The size of the vocabulary. Required if `embed` is True. |
|
max_position (int): The maximum position for positional encoding. Required if `embed` is True. |
|
|
|
Raises: |
|
ValueError: If `embed` is True and either `vocab_size` or `max_position` is not defined as an integer. |
|
""" |
|
model_type = "autoencoder" |
|
|
|
def __init__( |
|
self, |
|
input_dim: int = 128, |
|
latent_dim: int = 64, |
|
layer_types: str = 'linear', |
|
dropout_rate: float = 0.1, |
|
num_layers: int = 3, |
|
compression_rate: float = 0.5, |
|
bidirectional: bool = False, |
|
embed: bool = False, |
|
vocab_size: int|bool = False, |
|
max_position: int|bool = False, |
|
pad_token_id: int = 0, |
|
bos_token_id: int = 1, |
|
eos_token_id: int = 2, |
|
**kwargs |
|
): |
|
super().__init__(**kwargs) |
|
self.input_dim = input_dim |
|
self.latent_dim = latent_dim |
|
self.layer_types = layer_types |
|
self.dropout_rate = dropout_rate |
|
self.num_layers = num_layers |
|
self.compression_rate = compression_rate |
|
self.bidirectional = bidirectional |
|
self.embed = embed |
|
self.vocab_size = vocab_size |
|
self.max_position = max_position |
|
self.pad_token_id = pad_token_id |
|
self.bos_token_id = bos_token_id |
|
self.eos_token_id = eos_token_id |
|
|
|
if self.embed: |
|
if not self.vocab_size and isinstance(self.vocab_size, int): |
|
raise ValueError("vocab_size needs to be defined when embed is True - AutoEncoderConfig(embed = True, vocab_size = 10_000, max_postion = 512") |
|
if not self.max_position and isinstance(self.max_position, int): |
|
raise ValueError("max_position needs to be defined when embed is True - AutoEncoderConfig(embed = True, vocab_size = 10_000, max_postion = 512)") |
|
|
|
|
|
|
|
|
|
|
|
def create_layers( |
|
model_section: str, |
|
layer_types: str, |
|
input_dim: int, |
|
latent_dim: int, |
|
num_layers: int, |
|
dropout_rate: float, |
|
compression_rate: float, |
|
bidirectional: bool, |
|
classes: bool|int = False |
|
) -> nn.Sequential: |
|
""" |
|
Creates a sequence of layers for the encoder or decoder part of the autoencoder. |
|
|
|
Args: |
|
model_section (str): A string indicating whether this is for 'encoder' or 'decoder'. |
|
layer_types (str): The type of layers to include in the sequence. |
|
input_dim (int): The input dimension for the first layer. |
|
latent_dim (int): The target dimension for the latent representation. |
|
num_layers (int): The number of layers to create. |
|
dropout_rate (float): The dropout rate to apply between layers. |
|
compression_rate (float): The compression rate for reducing dimensions through layers. |
|
bidirectional (bool): Whether the RNN layers should be bidirectional. |
|
classes (bool|int): If an integer is provided, it defines the output dimension of the last layer in the decoder. |
|
It's ignored for the encoder or if the value is False. |
|
|
|
Returns: |
|
A nn.Sequential module containing the created layers. The configuration of these layers is determined by the arguments provided. |
|
|
|
Raises: |
|
ValueError: If certain layer type conditions are not met or if required parameters for specific configurations are missing. |
|
""" |
|
|
|
layers = [] |
|
current_dim = input_dim |
|
|
|
|
|
input_dimensions = [] |
|
output_dimensions = [] |
|
|
|
|
|
for _ in range(num_layers): |
|
input_dimensions.append(current_dim) |
|
next_dim = max(int(current_dim * compression_rate), latent_dim) |
|
current_dim = next_dim |
|
output_dimensions.append(current_dim) |
|
|
|
|
|
output_dimensions[num_layers - 1] = latent_dim |
|
|
|
|
|
if model_section == "decoder": |
|
|
|
input_dimensions, output_dimensions = output_dimensions, input_dimensions |
|
input_dimensions.reverse() |
|
output_dimensions.reverse() |
|
|
|
|
|
if isinstance(classes, int) and not isinstance(classes, bool): |
|
if bidirectional: |
|
output_dimensions[-1] = classes//2 |
|
else: |
|
output_dimensions[-1] = classes |
|
|
|
|
|
if bidirectional and (layer_types in ['lstm', 'rnn', 'gru']): |
|
output_dimensions = [2 * value for value in output_dimensions] |
|
|
|
|
|
for idx, (input_dim, output_dim) in enumerate(zip(input_dimensions, output_dimensions)): |
|
|
|
if layer_types == 'linear': |
|
layers.append(nn.Linear(input_dim, output_dim)) |
|
elif layer_types in ['lstm', 'rnn', 'gru']: |
|
rnn_layer = getattr(nn, layer_types.upper()) |
|
half_output_dim = output_dim // (2 if bidirectional else 1) |
|
if model_section == "decoder": |
|
if idx == 0: |
|
layers.append(rnn_layer(input_dim, half_output_dim, batch_first=True, bidirectional=bidirectional)) |
|
else: |
|
layers.append(rnn_layer(input_dim*2, half_output_dim, batch_first=True, bidirectional=bidirectional)) |
|
else: |
|
layers.append(rnn_layer(input_dim, half_output_dim, batch_first=True, bidirectional=bidirectional)) |
|
|
|
if (idx != num_layers - 1) and (dropout_rate is not None): |
|
layers.append(nn.Dropout(dropout_rate)) |
|
|
|
|
|
return nn.Sequential(*layers) |
|
|
|
|
|
|
|
|
|
|
|
class AutoEncoder(PreTrainedModel): |
|
""" |
|
AutoEncoder model for creating an encoder-decoder architecture. |
|
|
|
Inherits from PreTrainedModel to utilize its pretrained model features from the Hugging Face library. |
|
|
|
Args: |
|
config (AutoEncoderConfig): The configuration instance with all model parameters. |
|
""" |
|
config_class = AutoEncoderConfig |
|
|
|
def __init__(self, config: AutoEncoderConfig): |
|
super(AutoEncoder, self).__init__(config) |
|
|
|
|
|
if config.embed: |
|
|
|
self.word_embeddings = nn.Embedding(config.vocab_size, |
|
config.input_dim, |
|
config.pad_token_id,) |
|
|
|
self.position_embeddings = nn.Embedding(config.max_position, |
|
config.input_dim,) |
|
|
|
self.encoder = create_layers("encoder", |
|
config.layer_types, |
|
config.input_dim, |
|
config.latent_dim, |
|
config.num_layers, |
|
config.dropout_rate, |
|
config.compression_rate, |
|
config.bidirectional,) |
|
|
|
if config.embed: |
|
|
|
self.decoder = create_layers("decoder", |
|
config.layer_types, |
|
config.input_dim, |
|
config.latent_dim, |
|
config.num_layers, |
|
config.dropout_rate, |
|
config.compression_rate, |
|
config.bidirectional, |
|
config.vocab_size,) |
|
else: |
|
|
|
self.decoder = create_layers("decoder", |
|
config.layer_types, |
|
config.input_dim, |
|
config.latent_dim, |
|
config.num_layers, |
|
config.dropout_rate, |
|
config.compression_rate, |
|
config.bidirectional,) |
|
|
|
|
|
def forward(self, input_ids: Tensor, position_ids: Optional[Tensor] = None, labels: Optional[Tensor] = None) -> Tensor: |
|
|
|
|
|
outputs = AutoencoderModelOutput() |
|
|
|
outputs.labels = labels if labels != None else input_ids |
|
|
|
|
|
if self.config.embed: |
|
|
|
input_embeddings = self.word_embeddings(input_ids) |
|
|
|
|
|
seq_length = input_ids.size(1) |
|
position_ids = position_ids or torch.arange(seq_length, dtype=torch.long, device=input_ids.device) |
|
position_ids = position_ids.unsqueeze(0).expand_as(input_ids) |
|
position_embeddings = self.position_embeddings(position_ids) |
|
|
|
|
|
input_ids = input_embeddings + position_embeddings |
|
|
|
|
|
if self.config.layer_types in ['lstm', 'rnn', 'gru']: |
|
|
|
for layer in self.encoder: |
|
if isinstance(layer, nn.LSTM): |
|
input_ids, (h_n, c_n) = layer(input_ids) |
|
|
|
elif isinstance(layer, nn.RNN) or isinstance(layer, nn.GRU): |
|
input_ids, h_o = layer(input_ids) |
|
|
|
else: |
|
input_ids = layer(input_ids) |
|
|
|
outputs.hidden_state = input_ids |
|
|
|
for layer in self.decoder: |
|
if isinstance(layer, nn.LSTM): |
|
input_ids, (h_n, c_n) = layer(input_ids) |
|
|
|
elif isinstance(layer, nn.RNN) or isinstance(layer, nn.GRU): |
|
input_ids, h_o = layer(input_ids) |
|
|
|
else: |
|
input_ids = layer(input_ids) |
|
|
|
|
|
else: |
|
|
|
input_ids = self.encoder(input_ids) |
|
|
|
outputs.hidden_state = input_ids |
|
|
|
input_ids = self.decoder(input_ids) |
|
|
|
outputs.logits = input_ids |
|
|
|
|
|
if torch.is_floating_point(outputs.labels): |
|
loss_fn = nn.MSELoss() |
|
outputs.loss = loss_fn(outputs.logits.view(-1), outputs.labels.view(-1)) |
|
elif not torch.is_floating_point(outputs.labels) and not torch.is_complex(outputs.labels): |
|
loss_fn = nn.CrossEntropyLoss() |
|
outputs.loss = loss_fn(outputs.logits.reshape(-1, self.config.vocab_size), outputs.labels.view(-1)) |
|
else: |
|
raise ValueError("Unsupported tensor dtype for these loss functions") |
|
|
|
return outputs |
|
|
|
|