|
|
|
""" Model definition functions and weight loading. |
|
""" |
|
|
|
from __future__ import print_function, division, unicode_literals |
|
|
|
from os.path import exists |
|
|
|
import torch |
|
import torch.nn as nn |
|
from torch.autograd import Variable |
|
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, PackedSequence |
|
|
|
from torchmoji.lstm import LSTMHardSigmoid |
|
from torchmoji.attlayer import Attention |
|
from torchmoji.global_variables import NB_TOKENS, NB_EMOJI_CLASSES |
|
|
|
|
|
def torchmoji_feature_encoding(weight_path, return_attention=False): |
|
""" Loads the pretrained torchMoji model for extracting features |
|
from the penultimate feature layer. In this way, it transforms |
|
the text into its emotional encoding. |
|
|
|
# Arguments: |
|
weight_path: Path to model weights to be loaded. |
|
return_attention: If true, output will include weight of each input token |
|
used for the prediction |
|
|
|
# Returns: |
|
Pretrained model for encoding text into feature vectors. |
|
""" |
|
|
|
model = TorchMoji(nb_classes=None, |
|
nb_tokens=NB_TOKENS, |
|
feature_output=True, |
|
return_attention=return_attention) |
|
load_specific_weights(model, weight_path, exclude_names=['output_layer']) |
|
return model |
|
|
|
|
|
def torchmoji_emojis(weight_path, return_attention=False): |
|
""" Loads the pretrained torchMoji model for extracting features |
|
from the penultimate feature layer. In this way, it transforms |
|
the text into its emotional encoding. |
|
|
|
# Arguments: |
|
weight_path: Path to model weights to be loaded. |
|
return_attention: If true, output will include weight of each input token |
|
used for the prediction |
|
|
|
# Returns: |
|
Pretrained model for encoding text into feature vectors. |
|
""" |
|
|
|
model = TorchMoji(nb_classes=NB_EMOJI_CLASSES, |
|
nb_tokens=NB_TOKENS, |
|
return_attention=return_attention) |
|
model.load_state_dict(torch.load(weight_path)) |
|
return model |
|
|
|
|
|
def torchmoji_transfer(nb_classes, weight_path=None, extend_embedding=0, |
|
embed_dropout_rate=0.1, final_dropout_rate=0.5): |
|
""" Loads the pretrained torchMoji model for finetuning/transfer learning. |
|
Does not load weights for the softmax layer. |
|
|
|
Note that if you are planning to use class average F1 for evaluation, |
|
nb_classes should be set to 2 instead of the actual number of classes |
|
in the dataset, since binary classification will be performed on each |
|
class individually. |
|
|
|
Note that for the 'new' method, weight_path should be left as None. |
|
|
|
# Arguments: |
|
nb_classes: Number of classes in the dataset. |
|
weight_path: Path to model weights to be loaded. |
|
extend_embedding: Number of tokens that have been added to the |
|
vocabulary on top of NB_TOKENS. If this number is larger than 0, |
|
the embedding layer's dimensions are adjusted accordingly, with the |
|
additional weights being set to random values. |
|
embed_dropout_rate: Dropout rate for the embedding layer. |
|
final_dropout_rate: Dropout rate for the final Softmax layer. |
|
|
|
# Returns: |
|
Model with the given parameters. |
|
""" |
|
|
|
model = TorchMoji(nb_classes=nb_classes, |
|
nb_tokens=NB_TOKENS + extend_embedding, |
|
embed_dropout_rate=embed_dropout_rate, |
|
final_dropout_rate=final_dropout_rate, |
|
output_logits=True) |
|
if weight_path is not None: |
|
load_specific_weights(model, weight_path, |
|
exclude_names=['output_layer'], |
|
extend_embedding=extend_embedding) |
|
return model |
|
|
|
|
|
class TorchMoji(nn.Module): |
|
def __init__(self, nb_classes, nb_tokens, feature_output=False, output_logits=False, |
|
embed_dropout_rate=0, final_dropout_rate=0, return_attention=False): |
|
""" |
|
torchMoji model. |
|
IMPORTANT: The model is loaded in evaluation mode by default (self.eval()) |
|
|
|
# Arguments: |
|
nb_classes: Number of classes in the dataset. |
|
nb_tokens: Number of tokens in the dataset (i.e. vocabulary size). |
|
feature_output: If True the model returns the penultimate |
|
feature vector rather than Softmax probabilities |
|
(defaults to False). |
|
output_logits: If True the model returns logits rather than probabilities |
|
(defaults to False). |
|
embed_dropout_rate: Dropout rate for the embedding layer. |
|
final_dropout_rate: Dropout rate for the final Softmax layer. |
|
return_attention: If True the model also returns attention weights over the sentence |
|
(defaults to False). |
|
""" |
|
super(TorchMoji, self).__init__() |
|
|
|
embedding_dim = 256 |
|
hidden_size = 512 |
|
attention_size = 4 * hidden_size + embedding_dim |
|
|
|
self.feature_output = feature_output |
|
self.embed_dropout_rate = embed_dropout_rate |
|
self.final_dropout_rate = final_dropout_rate |
|
self.return_attention = return_attention |
|
self.hidden_size = hidden_size |
|
self.output_logits = output_logits |
|
self.nb_classes = nb_classes |
|
|
|
self.add_module('embed', nn.Embedding(nb_tokens, embedding_dim)) |
|
|
|
|
|
self.add_module('embed_dropout', nn.Dropout2d(embed_dropout_rate)) |
|
self.add_module('lstm_0', LSTMHardSigmoid(embedding_dim, hidden_size, batch_first=True, bidirectional=True)) |
|
self.add_module('lstm_1', LSTMHardSigmoid(hidden_size*2, hidden_size, batch_first=True, bidirectional=True)) |
|
self.add_module('attention_layer', Attention(attention_size=attention_size, return_attention=return_attention)) |
|
if not feature_output: |
|
self.add_module('final_dropout', nn.Dropout(final_dropout_rate)) |
|
if output_logits: |
|
self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1))) |
|
else: |
|
self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1), |
|
nn.Softmax() if self.nb_classes > 2 else nn.Sigmoid())) |
|
self.init_weights() |
|
|
|
self.eval() |
|
|
|
def init_weights(self): |
|
""" |
|
Here we reproduce Keras default initialization weights for consistency with Keras version |
|
""" |
|
ih = (param.data for name, param in self.named_parameters() if 'weight_ih' in name) |
|
hh = (param.data for name, param in self.named_parameters() if 'weight_hh' in name) |
|
b = (param.data for name, param in self.named_parameters() if 'bias' in name) |
|
nn.init.uniform(self.embed.weight.data, a=-0.5, b=0.5) |
|
for t in ih: |
|
nn.init.xavier_uniform(t) |
|
for t in hh: |
|
nn.init.orthogonal(t) |
|
for t in b: |
|
nn.init.constant(t, 0) |
|
if not self.feature_output: |
|
nn.init.xavier_uniform(self.output_layer[0].weight.data) |
|
|
|
def forward(self, input_seqs): |
|
""" Forward pass. |
|
|
|
# Arguments: |
|
input_seqs: Can be one of Numpy array, Torch.LongTensor, Torch.Variable, Torch.PackedSequence. |
|
|
|
# Return: |
|
Same format as input format (except for PackedSequence returned as Variable). |
|
""" |
|
|
|
return_numpy = False |
|
return_tensor = False |
|
if isinstance(input_seqs, (torch.LongTensor, torch.cuda.LongTensor)): |
|
input_seqs = Variable(input_seqs) |
|
return_tensor = True |
|
elif not isinstance(input_seqs, Variable): |
|
input_seqs = Variable(torch.from_numpy(input_seqs.astype('int64')).long()) |
|
return_numpy = True |
|
|
|
|
|
reorder_output = False |
|
if not isinstance(input_seqs, PackedSequence): |
|
ho = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
|
co = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
|
|
|
|
|
input_lengths = torch.LongTensor([torch.max(input_seqs[i, :].data.nonzero()) + 1 for i in range(input_seqs.size()[0])]) |
|
input_lengths, perm_idx = input_lengths.sort(0, descending=True) |
|
input_seqs = input_seqs[perm_idx][:, :input_lengths.max()] |
|
|
|
|
|
packed_input = pack_padded_sequence(input_seqs, input_lengths.cpu().numpy(), batch_first=True) |
|
reorder_output = True |
|
else: |
|
ho = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
|
co = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() |
|
input_lengths = input_seqs.batch_sizes |
|
packed_input = input_seqs |
|
|
|
hidden = (Variable(ho, requires_grad=False), Variable(co, requires_grad=False)) |
|
|
|
|
|
x = self.embed(packed_input.data) |
|
x = nn.Tanh()(x) |
|
|
|
|
|
x = self.embed_dropout(x) |
|
|
|
|
|
packed_input = PackedSequence(data=x, batch_sizes=packed_input.batch_sizes) |
|
|
|
|
|
|
|
lstm_0_output, _ = self.lstm_0(packed_input, hidden) |
|
lstm_1_output, _ = self.lstm_1(lstm_0_output, hidden) |
|
|
|
|
|
packed_input = PackedSequence(data=torch.cat((lstm_1_output.data, |
|
lstm_0_output.data, |
|
packed_input.data), dim=1), |
|
batch_sizes=packed_input.batch_sizes) |
|
|
|
input_seqs, _ = pad_packed_sequence(packed_input, batch_first=True) |
|
|
|
x, att_weights = self.attention_layer(input_seqs, input_lengths) |
|
|
|
|
|
if not self.feature_output: |
|
x = self.final_dropout(x) |
|
outputs = self.output_layer(x) |
|
else: |
|
outputs = x |
|
|
|
|
|
if reorder_output: |
|
reorered = Variable(outputs.data.new(outputs.size())) |
|
reorered[perm_idx] = outputs |
|
outputs = reorered |
|
|
|
|
|
if return_tensor: |
|
outputs = outputs.data |
|
if return_numpy: |
|
outputs = outputs.data.numpy() |
|
|
|
if self.return_attention: |
|
return outputs, att_weights |
|
else: |
|
return outputs |
|
|
|
|
|
def load_specific_weights(model, weight_path, exclude_names=[], extend_embedding=0, verbose=True): |
|
""" Loads model weights from the given file path, excluding any |
|
given layers. |
|
|
|
# Arguments: |
|
model: Model whose weights should be loaded. |
|
weight_path: Path to file containing model weights. |
|
exclude_names: List of layer names whose weights should not be loaded. |
|
extend_embedding: Number of new words being added to vocabulary. |
|
verbose: Verbosity flag. |
|
|
|
# Raises: |
|
ValueError if the file at weight_path does not exist. |
|
""" |
|
if not exists(weight_path): |
|
raise ValueError('ERROR (load_weights): The weights file at {} does ' |
|
'not exist. Refer to the README for instructions.' |
|
.format(weight_path)) |
|
|
|
if extend_embedding and 'embed' in exclude_names: |
|
raise ValueError('ERROR (load_weights): Cannot extend a vocabulary ' |
|
'without loading the embedding weights.') |
|
|
|
|
|
|
|
weights = torch.load(weight_path) |
|
for key, weight in weights.items(): |
|
if any(excluded in key for excluded in exclude_names): |
|
if verbose: |
|
print('Ignoring weights for {}'.format(key)) |
|
continue |
|
|
|
try: |
|
model_w = model.state_dict()[key] |
|
except KeyError: |
|
raise KeyError("Weights had parameters {},".format(key) |
|
+ " but could not find this parameters in model.") |
|
|
|
if verbose: |
|
print('Loading weights for {}'.format(key)) |
|
|
|
|
|
|
|
if 'embed' in key and extend_embedding > 0: |
|
weight = torch.cat((weight, model_w[NB_TOKENS:, :]), dim=0) |
|
if verbose: |
|
print('Extended vocabulary for embedding layer ' + |
|
'from {} to {} tokens.'.format( |
|
NB_TOKENS, NB_TOKENS + extend_embedding)) |
|
try: |
|
model_w.copy_(weight) |
|
except: |
|
print('While copying the weigths named {}, whose dimensions in the model are' |
|
' {} and whose dimensions in the saved file are {}, ...'.format( |
|
key, model_w.size(), weight.size())) |
|
raise |
|
|