Spaces:
Sleeping
Sleeping
import math | |
import torch | |
import numpy as np | |
import torch.nn as nn | |
from tqdm import tqdm | |
import scipy.sparse as sp | |
import torch.nn.functional as F | |
import torch.distributed as dist | |
import transformers | |
from transformers import RobertaTokenizer | |
from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel, RobertaLMHead | |
from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel, BertLMPredictionHead | |
from transformers.activations import gelu | |
from transformers.file_utils import ( | |
add_code_sample_docstrings, | |
add_start_docstrings, | |
add_start_docstrings_to_model_forward, | |
replace_return_docstrings, | |
) | |
from transformers.modeling_outputs import SequenceClassifierOutput, BaseModelOutputWithPoolingAndCrossAttentions | |
init = nn.init.xavier_uniform_ | |
uniformInit = nn.init.uniform | |
""" | |
EasyRec | |
""" | |
def dot_product_scores(q_vectors, ctx_vectors): | |
r = torch.matmul(q_vectors, torch.transpose(ctx_vectors, 0, 1)) | |
return r | |
class MLPLayer(nn.Module): | |
""" | |
Head for getting sentence representations over RoBERTa/BERT's CLS representation. | |
""" | |
def __init__(self, config): | |
super().__init__() | |
self.dense = nn.Linear(config.hidden_size, config.hidden_size) | |
self.activation = nn.Tanh() | |
def forward(self, features, **kwargs): | |
x = self.dense(features) | |
x = self.activation(x) | |
return x | |
class Pooler(nn.Module): | |
""" | |
Parameter-free poolers to get the sentence embedding | |
'cls': [CLS] representation with BERT/RoBERTa's MLP pooler. | |
'cls_before_pooler': [CLS] representation without the original MLP pooler. | |
'avg': average of the last layers' hidden states at each token. | |
'avg_top2': average of the last two layers. | |
'avg_first_last': average of the first and the last layers. | |
""" | |
def __init__(self, pooler_type): | |
super().__init__() | |
self.pooler_type = pooler_type | |
assert self.pooler_type in ["cls", "cls_before_pooler", "avg", "avg_top2", "avg_first_last"], "unrecognized pooling type %s" % self.pooler_type | |
def forward(self, attention_mask, outputs): | |
last_hidden = outputs.last_hidden_state | |
pooler_output = outputs.pooler_output | |
hidden_states = outputs.hidden_states | |
if self.pooler_type in ['cls_before_pooler', 'cls']: | |
return last_hidden[:, 0] | |
elif self.pooler_type == "avg": | |
return ((last_hidden * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1)) | |
elif self.pooler_type == "avg_first_last": | |
first_hidden = hidden_states[1] | |
last_hidden = hidden_states[-1] | |
pooled_result = ((first_hidden + last_hidden) / 2.0 * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1) | |
return pooled_result | |
elif self.pooler_type == "avg_top2": | |
second_last_hidden = hidden_states[-2] | |
last_hidden = hidden_states[-1] | |
pooled_result = ((last_hidden + second_last_hidden) / 2.0 * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1) | |
return pooled_result | |
else: | |
raise NotImplementedError | |
class Similarity(nn.Module): | |
""" | |
Dot product or cosine similarity | |
""" | |
def __init__(self, temp): | |
super().__init__() | |
self.temp = temp | |
self.cos = nn.CosineSimilarity(dim=-1) | |
def forward(self, x, y): | |
return self.cos(x, y) / self.temp | |
class Easyrec(RobertaPreTrainedModel): | |
_keys_to_ignore_on_load_missing = [r"position_ids"] | |
def __init__(self, config, *model_args, **model_kargs): | |
super().__init__(config) | |
try: | |
self.model_args = model_kargs["model_args"] | |
self.roberta = RobertaModel(config, add_pooling_layer=False) | |
if self.model_args.pooler_type == "cls": | |
self.mlp = MLPLayer(config) | |
if self.model_args.do_mlm: | |
self.lm_head = RobertaLMHead(config) | |
""" | |
Contrastive learning class init function. | |
""" | |
self.pooler_type = self.model_args.pooler_type | |
self.pooler = Pooler(self.pooler_type) | |
self.sim = Similarity(temp=self.model_args.temp) | |
self.init_weights() | |
except: | |
self.roberta = RobertaModel(config, add_pooling_layer=False) | |
self.mlp = MLPLayer(config) | |
self.lm_head = RobertaLMHead(config) | |
self.pooler_type = 'cls' | |
self.pooler = Pooler(self.pooler_type) | |
self.init_weights() | |
def forward(self, | |
user_input_ids=None, | |
user_attention_mask=None, | |
pos_item_input_ids=None, | |
pos_item_attention_mask=None, | |
neg_item_input_ids=None, | |
neg_item_attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
labels=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
mlm_input_ids=None, | |
mlm_attention_mask=None, | |
mlm_labels=None, | |
): | |
""" | |
Contrastive learning forward function. | |
""" | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
batch_size = user_input_ids.size(0) | |
# Get user embeddings | |
user_outputs = self.roberta( | |
input_ids=user_input_ids, | |
attention_mask=user_attention_mask, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
# Get positive item embeddings | |
pos_item_outputs = self.roberta( | |
input_ids=pos_item_input_ids, | |
attention_mask=pos_item_attention_mask, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
# Get negative item embeddings | |
neg_item_outputs = self.roberta( | |
input_ids=neg_item_input_ids, | |
attention_mask=neg_item_attention_mask, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
# MLM auxiliary objective | |
if mlm_input_ids is not None: | |
mlm_outputs = self.roberta( | |
input_ids=mlm_input_ids, | |
attention_mask=mlm_attention_mask, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
# Pooling | |
user_pooler_output = self.pooler(user_attention_mask, user_outputs) | |
pos_item_pooler_output = self.pooler(pos_item_attention_mask, pos_item_outputs) | |
neg_item_pooler_output = self.pooler(neg_item_attention_mask, neg_item_outputs) | |
# If using "cls", we add an extra MLP layer | |
# (same as BERT's original implementation) over the representation. | |
if self.pooler_type == "cls": | |
user_pooler_output = self.mlp(user_pooler_output) | |
pos_item_pooler_output = self.mlp(pos_item_pooler_output) | |
neg_item_pooler_output = self.mlp(neg_item_pooler_output) | |
# Gather all item embeddings if using distributed training | |
if dist.is_initialized() and self.training: | |
# Dummy vectors for allgather | |
user_list = [torch.zeros_like(user_pooler_output) for _ in range(dist.get_world_size())] | |
pos_item_list = [torch.zeros_like(pos_item_pooler_output) for _ in range(dist.get_world_size())] | |
neg_item_list = [torch.zeros_like(neg_item_pooler_output) for _ in range(dist.get_world_size())] | |
# Allgather | |
dist.all_gather(tensor_list=user_list, tensor=user_pooler_output.contiguous()) | |
dist.all_gather(tensor_list=pos_item_list, tensor=pos_item_pooler_output.contiguous()) | |
dist.all_gather(tensor_list=neg_item_list, tensor=neg_item_pooler_output.contiguous()) | |
# Since allgather results do not have gradients, we replace the | |
# current process's corresponding embeddings with original tensors | |
user_list[dist.get_rank()] = user_pooler_output | |
pos_item_list[dist.get_rank()] = pos_item_pooler_output | |
neg_item_list[dist.get_rank()] = neg_item_pooler_output | |
# Get full batch embeddings | |
user_pooler_output = torch.cat(user_list, dim=0) | |
pos_item_pooler_output = torch.cat(pos_item_list, dim=0) | |
neg_item_pooler_output = torch.cat(neg_item_list, dim=0) | |
cos_sim = self.sim(user_pooler_output.unsqueeze(1), pos_item_pooler_output.unsqueeze(0)) | |
neg_sim = self.sim(user_pooler_output.unsqueeze(1), neg_item_pooler_output.unsqueeze(0)) | |
cos_sim = torch.cat([cos_sim, neg_sim], 1) | |
labels = torch.arange(cos_sim.size(0)).long().to(self.device) | |
loss_fct = nn.CrossEntropyLoss() | |
loss = loss_fct(cos_sim, labels) | |
# Calculate loss for MLM | |
if mlm_outputs is not None and mlm_labels is not None and self.model_args.do_mlm: | |
mlm_labels = mlm_labels.view(-1, mlm_labels.size(-1)) | |
prediction_scores = self.lm_head(mlm_outputs.last_hidden_state) | |
masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), mlm_labels.view(-1)) | |
loss = loss + self.model_args.mlm_weight * masked_lm_loss | |
if not return_dict: | |
raise NotImplementedError | |
return SequenceClassifierOutput( | |
loss=loss, | |
logits=cos_sim, | |
) | |
def encode(self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
labels=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.roberta( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
pooler_output = self.pooler(attention_mask, outputs) | |
if self.pooler_type == "cls": | |
pooler_output = self.mlp(pooler_output) | |
if not return_dict: | |
return (outputs[0], pooler_output) + outputs[2:] | |
return BaseModelOutputWithPoolingAndCrossAttentions( | |
pooler_output=pooler_output, | |
last_hidden_state=outputs.last_hidden_state, | |
hidden_states=outputs.hidden_states, | |
) | |
def inference(self, | |
user_profile_list, | |
item_profile_list, | |
dataset_name, | |
tokenizer, | |
infer_batch_size=128 | |
): | |
n_user = len(user_profile_list) | |
profiles = user_profile_list + item_profile_list | |
n_batch = math.ceil(len(profiles) / infer_batch_size) | |
text_embeds = [] | |
for i in tqdm(range(n_batch), desc=f'Encoding Text {dataset_name}'): | |
batch_profiles = profiles[i * infer_batch_size: (i + 1) * infer_batch_size] | |
inputs = tokenizer(batch_profiles, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
for k in inputs: | |
inputs[k] = inputs[k].to(self.device) | |
with torch.inference_mode(): | |
embeds = self.encode( | |
input_ids=inputs.input_ids, | |
attention_mask=inputs.attention_mask | |
) | |
text_embeds.append(embeds.pooler_output.detach().cpu()) | |
text_embeds = torch.concat(text_embeds, dim=0).cuda() | |
user_embeds = F.normalize(text_embeds[: n_user], dim=-1) | |
item_embeds = F.normalize(text_embeds[n_user: ], dim=-1) | |
return user_embeds, item_embeds | |