EasyRec / model.py
Balaji S
Removed unnecessary import
2c732ed verified
import math
import torch
import numpy as np
import torch.nn as nn
from tqdm import tqdm
import scipy.sparse as sp
import torch.nn.functional as F
import torch.distributed as dist
import transformers
from transformers import RobertaTokenizer
from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel, RobertaLMHead
from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel, BertLMPredictionHead
from transformers.activations import gelu
from transformers.file_utils import (
add_code_sample_docstrings,
add_start_docstrings,
add_start_docstrings_to_model_forward,
replace_return_docstrings,
)
from transformers.modeling_outputs import SequenceClassifierOutput, BaseModelOutputWithPoolingAndCrossAttentions
init = nn.init.xavier_uniform_
uniformInit = nn.init.uniform
"""
EasyRec
"""
def dot_product_scores(q_vectors, ctx_vectors):
r = torch.matmul(q_vectors, torch.transpose(ctx_vectors, 0, 1))
return r
class MLPLayer(nn.Module):
"""
Head for getting sentence representations over RoBERTa/BERT's CLS representation.
"""
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.activation = nn.Tanh()
def forward(self, features, **kwargs):
x = self.dense(features)
x = self.activation(x)
return x
class Pooler(nn.Module):
"""
Parameter-free poolers to get the sentence embedding
'cls': [CLS] representation with BERT/RoBERTa's MLP pooler.
'cls_before_pooler': [CLS] representation without the original MLP pooler.
'avg': average of the last layers' hidden states at each token.
'avg_top2': average of the last two layers.
'avg_first_last': average of the first and the last layers.
"""
def __init__(self, pooler_type):
super().__init__()
self.pooler_type = pooler_type
assert self.pooler_type in ["cls", "cls_before_pooler", "avg", "avg_top2", "avg_first_last"], "unrecognized pooling type %s" % self.pooler_type
def forward(self, attention_mask, outputs):
last_hidden = outputs.last_hidden_state
pooler_output = outputs.pooler_output
hidden_states = outputs.hidden_states
if self.pooler_type in ['cls_before_pooler', 'cls']:
return last_hidden[:, 0]
elif self.pooler_type == "avg":
return ((last_hidden * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1))
elif self.pooler_type == "avg_first_last":
first_hidden = hidden_states[1]
last_hidden = hidden_states[-1]
pooled_result = ((first_hidden + last_hidden) / 2.0 * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1)
return pooled_result
elif self.pooler_type == "avg_top2":
second_last_hidden = hidden_states[-2]
last_hidden = hidden_states[-1]
pooled_result = ((last_hidden + second_last_hidden) / 2.0 * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1)
return pooled_result
else:
raise NotImplementedError
class Similarity(nn.Module):
"""
Dot product or cosine similarity
"""
def __init__(self, temp):
super().__init__()
self.temp = temp
self.cos = nn.CosineSimilarity(dim=-1)
def forward(self, x, y):
return self.cos(x, y) / self.temp
class Easyrec(RobertaPreTrainedModel):
_keys_to_ignore_on_load_missing = [r"position_ids"]
def __init__(self, config, *model_args, **model_kargs):
super().__init__(config)
try:
self.model_args = model_kargs["model_args"]
self.roberta = RobertaModel(config, add_pooling_layer=False)
if self.model_args.pooler_type == "cls":
self.mlp = MLPLayer(config)
if self.model_args.do_mlm:
self.lm_head = RobertaLMHead(config)
"""
Contrastive learning class init function.
"""
self.pooler_type = self.model_args.pooler_type
self.pooler = Pooler(self.pooler_type)
self.sim = Similarity(temp=self.model_args.temp)
self.init_weights()
except:
self.roberta = RobertaModel(config, add_pooling_layer=False)
self.mlp = MLPLayer(config)
self.lm_head = RobertaLMHead(config)
self.pooler_type = 'cls'
self.pooler = Pooler(self.pooler_type)
self.init_weights()
def forward(self,
user_input_ids=None,
user_attention_mask=None,
pos_item_input_ids=None,
pos_item_attention_mask=None,
neg_item_input_ids=None,
neg_item_attention_mask=None,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
labels=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
mlm_input_ids=None,
mlm_attention_mask=None,
mlm_labels=None,
):
"""
Contrastive learning forward function.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
batch_size = user_input_ids.size(0)
# Get user embeddings
user_outputs = self.roberta(
input_ids=user_input_ids,
attention_mask=user_attention_mask,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# Get positive item embeddings
pos_item_outputs = self.roberta(
input_ids=pos_item_input_ids,
attention_mask=pos_item_attention_mask,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# Get negative item embeddings
neg_item_outputs = self.roberta(
input_ids=neg_item_input_ids,
attention_mask=neg_item_attention_mask,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# MLM auxiliary objective
if mlm_input_ids is not None:
mlm_outputs = self.roberta(
input_ids=mlm_input_ids,
attention_mask=mlm_attention_mask,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# Pooling
user_pooler_output = self.pooler(user_attention_mask, user_outputs)
pos_item_pooler_output = self.pooler(pos_item_attention_mask, pos_item_outputs)
neg_item_pooler_output = self.pooler(neg_item_attention_mask, neg_item_outputs)
# If using "cls", we add an extra MLP layer
# (same as BERT's original implementation) over the representation.
if self.pooler_type == "cls":
user_pooler_output = self.mlp(user_pooler_output)
pos_item_pooler_output = self.mlp(pos_item_pooler_output)
neg_item_pooler_output = self.mlp(neg_item_pooler_output)
# Gather all item embeddings if using distributed training
if dist.is_initialized() and self.training:
# Dummy vectors for allgather
user_list = [torch.zeros_like(user_pooler_output) for _ in range(dist.get_world_size())]
pos_item_list = [torch.zeros_like(pos_item_pooler_output) for _ in range(dist.get_world_size())]
neg_item_list = [torch.zeros_like(neg_item_pooler_output) for _ in range(dist.get_world_size())]
# Allgather
dist.all_gather(tensor_list=user_list, tensor=user_pooler_output.contiguous())
dist.all_gather(tensor_list=pos_item_list, tensor=pos_item_pooler_output.contiguous())
dist.all_gather(tensor_list=neg_item_list, tensor=neg_item_pooler_output.contiguous())
# Since allgather results do not have gradients, we replace the
# current process's corresponding embeddings with original tensors
user_list[dist.get_rank()] = user_pooler_output
pos_item_list[dist.get_rank()] = pos_item_pooler_output
neg_item_list[dist.get_rank()] = neg_item_pooler_output
# Get full batch embeddings
user_pooler_output = torch.cat(user_list, dim=0)
pos_item_pooler_output = torch.cat(pos_item_list, dim=0)
neg_item_pooler_output = torch.cat(neg_item_list, dim=0)
cos_sim = self.sim(user_pooler_output.unsqueeze(1), pos_item_pooler_output.unsqueeze(0))
neg_sim = self.sim(user_pooler_output.unsqueeze(1), neg_item_pooler_output.unsqueeze(0))
cos_sim = torch.cat([cos_sim, neg_sim], 1)
labels = torch.arange(cos_sim.size(0)).long().to(self.device)
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(cos_sim, labels)
# Calculate loss for MLM
if mlm_outputs is not None and mlm_labels is not None and self.model_args.do_mlm:
mlm_labels = mlm_labels.view(-1, mlm_labels.size(-1))
prediction_scores = self.lm_head(mlm_outputs.last_hidden_state)
masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), mlm_labels.view(-1))
loss = loss + self.model_args.mlm_weight * masked_lm_loss
if not return_dict:
raise NotImplementedError
return SequenceClassifierOutput(
loss=loss,
logits=cos_sim,
)
def encode(self,
input_ids=None,
attention_mask=None,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
labels=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
pooler_output = self.pooler(attention_mask, outputs)
if self.pooler_type == "cls":
pooler_output = self.mlp(pooler_output)
if not return_dict:
return (outputs[0], pooler_output) + outputs[2:]
return BaseModelOutputWithPoolingAndCrossAttentions(
pooler_output=pooler_output,
last_hidden_state=outputs.last_hidden_state,
hidden_states=outputs.hidden_states,
)
def inference(self,
user_profile_list,
item_profile_list,
dataset_name,
tokenizer,
infer_batch_size=128
):
n_user = len(user_profile_list)
profiles = user_profile_list + item_profile_list
n_batch = math.ceil(len(profiles) / infer_batch_size)
text_embeds = []
for i in tqdm(range(n_batch), desc=f'Encoding Text {dataset_name}'):
batch_profiles = profiles[i * infer_batch_size: (i + 1) * infer_batch_size]
inputs = tokenizer(batch_profiles, padding=True, truncation=True, max_length=512, return_tensors="pt")
for k in inputs:
inputs[k] = inputs[k].to(self.device)
with torch.inference_mode():
embeds = self.encode(
input_ids=inputs.input_ids,
attention_mask=inputs.attention_mask
)
text_embeds.append(embeds.pooler_output.detach().cpu())
text_embeds = torch.concat(text_embeds, dim=0).cuda()
user_embeds = F.normalize(text_embeds[: n_user], dim=-1)
item_embeds = F.normalize(text_embeds[n_user: ], dim=-1)
return user_embeds, item_embeds