import math import torch import numpy as np import torch.nn as nn from tqdm import tqdm import scipy.sparse as sp import torch.nn.functional as F import torch.distributed as dist import transformers from transformers import RobertaTokenizer from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel, RobertaLMHead from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel, BertLMPredictionHead from transformers.activations import gelu from transformers.file_utils import ( add_code_sample_docstrings, add_start_docstrings, add_start_docstrings_to_model_forward, replace_return_docstrings, ) from transformers.modeling_outputs import SequenceClassifierOutput, BaseModelOutputWithPoolingAndCrossAttentions init = nn.init.xavier_uniform_ uniformInit = nn.init.uniform """ EasyRec """ def dot_product_scores(q_vectors, ctx_vectors): r = torch.matmul(q_vectors, torch.transpose(ctx_vectors, 0, 1)) return r class MLPLayer(nn.Module): """ Head for getting sentence representations over RoBERTa/BERT's CLS representation. """ def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.activation = nn.Tanh() def forward(self, features, **kwargs): x = self.dense(features) x = self.activation(x) return x class Pooler(nn.Module): """ Parameter-free poolers to get the sentence embedding 'cls': [CLS] representation with BERT/RoBERTa's MLP pooler. 'cls_before_pooler': [CLS] representation without the original MLP pooler. 'avg': average of the last layers' hidden states at each token. 'avg_top2': average of the last two layers. 'avg_first_last': average of the first and the last layers. """ def __init__(self, pooler_type): super().__init__() self.pooler_type = pooler_type assert self.pooler_type in ["cls", "cls_before_pooler", "avg", "avg_top2", "avg_first_last"], "unrecognized pooling type %s" % self.pooler_type def forward(self, attention_mask, outputs): last_hidden = outputs.last_hidden_state pooler_output = outputs.pooler_output hidden_states = outputs.hidden_states if self.pooler_type in ['cls_before_pooler', 'cls']: return last_hidden[:, 0] elif self.pooler_type == "avg": return ((last_hidden * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1)) elif self.pooler_type == "avg_first_last": first_hidden = hidden_states[1] last_hidden = hidden_states[-1] pooled_result = ((first_hidden + last_hidden) / 2.0 * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1) return pooled_result elif self.pooler_type == "avg_top2": second_last_hidden = hidden_states[-2] last_hidden = hidden_states[-1] pooled_result = ((last_hidden + second_last_hidden) / 2.0 * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(-1).unsqueeze(-1) return pooled_result else: raise NotImplementedError class Similarity(nn.Module): """ Dot product or cosine similarity """ def __init__(self, temp): super().__init__() self.temp = temp self.cos = nn.CosineSimilarity(dim=-1) def forward(self, x, y): return self.cos(x, y) / self.temp class Easyrec(RobertaPreTrainedModel): _keys_to_ignore_on_load_missing = [r"position_ids"] def __init__(self, config, *model_args, **model_kargs): super().__init__(config) try: self.model_args = model_kargs["model_args"] self.roberta = RobertaModel(config, add_pooling_layer=False) if self.model_args.pooler_type == "cls": self.mlp = MLPLayer(config) if self.model_args.do_mlm: self.lm_head = RobertaLMHead(config) """ Contrastive learning class init function. """ self.pooler_type = self.model_args.pooler_type self.pooler = Pooler(self.pooler_type) self.sim = Similarity(temp=self.model_args.temp) self.init_weights() except: self.roberta = RobertaModel(config, add_pooling_layer=False) self.mlp = MLPLayer(config) self.lm_head = RobertaLMHead(config) self.pooler_type = 'cls' self.pooler = Pooler(self.pooler_type) self.init_weights() def forward(self, user_input_ids=None, user_attention_mask=None, pos_item_input_ids=None, pos_item_attention_mask=None, neg_item_input_ids=None, neg_item_attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None, mlm_input_ids=None, mlm_attention_mask=None, mlm_labels=None, ): """ Contrastive learning forward function. """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict batch_size = user_input_ids.size(0) # Get user embeddings user_outputs = self.roberta( input_ids=user_input_ids, attention_mask=user_attention_mask, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # Get positive item embeddings pos_item_outputs = self.roberta( input_ids=pos_item_input_ids, attention_mask=pos_item_attention_mask, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # Get negative item embeddings neg_item_outputs = self.roberta( input_ids=neg_item_input_ids, attention_mask=neg_item_attention_mask, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # MLM auxiliary objective if mlm_input_ids is not None: mlm_outputs = self.roberta( input_ids=mlm_input_ids, attention_mask=mlm_attention_mask, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # Pooling user_pooler_output = self.pooler(user_attention_mask, user_outputs) pos_item_pooler_output = self.pooler(pos_item_attention_mask, pos_item_outputs) neg_item_pooler_output = self.pooler(neg_item_attention_mask, neg_item_outputs) # If using "cls", we add an extra MLP layer # (same as BERT's original implementation) over the representation. if self.pooler_type == "cls": user_pooler_output = self.mlp(user_pooler_output) pos_item_pooler_output = self.mlp(pos_item_pooler_output) neg_item_pooler_output = self.mlp(neg_item_pooler_output) # Gather all item embeddings if using distributed training if dist.is_initialized() and self.training: # Dummy vectors for allgather user_list = [torch.zeros_like(user_pooler_output) for _ in range(dist.get_world_size())] pos_item_list = [torch.zeros_like(pos_item_pooler_output) for _ in range(dist.get_world_size())] neg_item_list = [torch.zeros_like(neg_item_pooler_output) for _ in range(dist.get_world_size())] # Allgather dist.all_gather(tensor_list=user_list, tensor=user_pooler_output.contiguous()) dist.all_gather(tensor_list=pos_item_list, tensor=pos_item_pooler_output.contiguous()) dist.all_gather(tensor_list=neg_item_list, tensor=neg_item_pooler_output.contiguous()) # Since allgather results do not have gradients, we replace the # current process's corresponding embeddings with original tensors user_list[dist.get_rank()] = user_pooler_output pos_item_list[dist.get_rank()] = pos_item_pooler_output neg_item_list[dist.get_rank()] = neg_item_pooler_output # Get full batch embeddings user_pooler_output = torch.cat(user_list, dim=0) pos_item_pooler_output = torch.cat(pos_item_list, dim=0) neg_item_pooler_output = torch.cat(neg_item_list, dim=0) cos_sim = self.sim(user_pooler_output.unsqueeze(1), pos_item_pooler_output.unsqueeze(0)) neg_sim = self.sim(user_pooler_output.unsqueeze(1), neg_item_pooler_output.unsqueeze(0)) cos_sim = torch.cat([cos_sim, neg_sim], 1) labels = torch.arange(cos_sim.size(0)).long().to(self.device) loss_fct = nn.CrossEntropyLoss() loss = loss_fct(cos_sim, labels) # Calculate loss for MLM if mlm_outputs is not None and mlm_labels is not None and self.model_args.do_mlm: mlm_labels = mlm_labels.view(-1, mlm_labels.size(-1)) prediction_scores = self.lm_head(mlm_outputs.last_hidden_state) masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), mlm_labels.view(-1)) loss = loss + self.model_args.mlm_weight * masked_lm_loss if not return_dict: raise NotImplementedError return SequenceClassifierOutput( loss=loss, logits=cos_sim, ) def encode(self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.roberta( input_ids=input_ids, attention_mask=attention_mask, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) pooler_output = self.pooler(attention_mask, outputs) if self.pooler_type == "cls": pooler_output = self.mlp(pooler_output) if not return_dict: return (outputs[0], pooler_output) + outputs[2:] return BaseModelOutputWithPoolingAndCrossAttentions( pooler_output=pooler_output, last_hidden_state=outputs.last_hidden_state, hidden_states=outputs.hidden_states, ) def inference(self, user_profile_list, item_profile_list, dataset_name, tokenizer, infer_batch_size=128 ): n_user = len(user_profile_list) profiles = user_profile_list + item_profile_list n_batch = math.ceil(len(profiles) / infer_batch_size) text_embeds = [] for i in tqdm(range(n_batch), desc=f'Encoding Text {dataset_name}'): batch_profiles = profiles[i * infer_batch_size: (i + 1) * infer_batch_size] inputs = tokenizer(batch_profiles, padding=True, truncation=True, max_length=512, return_tensors="pt") for k in inputs: inputs[k] = inputs[k].to(self.device) with torch.inference_mode(): embeds = self.encode( input_ids=inputs.input_ids, attention_mask=inputs.attention_mask ) text_embeds.append(embeds.pooler_output.detach().cpu()) text_embeds = torch.concat(text_embeds, dim=0).cuda() user_embeds = F.normalize(text_embeds[: n_user], dim=-1) item_embeds = F.normalize(text_embeds[n_user: ], dim=-1) return user_embeds, item_embeds