File size: 6,431 Bytes
7bf4b88 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
from pathlib import Path
import re
import torch
import openai
from functools import partial
import time
import multiprocessing
from sentence_transformers import SentenceTransformer
from concurrent.futures import ProcessPoolExecutor
from torch.multiprocessing import set_start_method
import os
from transformers import AutoModel, AutoTokenizer, AutoConfig
from Reranking.rerankers.node import Contriever
import torch.nn as nn
# Global cache to store loaded models
MODEL_CACHE = {}
def get_openai_embedding(text: str,
model: str = "text-embedding-ada-002",
max_retry: int = 10,
sleep_time: int = 0) -> torch.FloatTensor:
"""
Get the OpenAI embedding for a given text.
Args:
text (str): The input text to be embedded.
model (str): The model to use for embedding. Default is "text-embedding-ada-002".
max_retry (int): Maximum number of retries in case of an error. Default is 1.
sleep_time (int): Sleep time between retries in seconds. Default is 0.
Returns:
torch.FloatTensor: The embedding of the input text.
"""
assert isinstance(text, str), f'text must be str, but got {type(text)}'
assert len(text) > 0, 'text to be embedded should be non-empty'
client = openai.OpenAI()
for _ in range(max_retry):
try:
emb = client.embeddings.create(input=[text], model=model)
return torch.FloatTensor(emb.data[0].embedding).view(1, -1)
except openai.BadRequestError as e:
print(f'{e}')
e = str(e)
ori_length = len(text.split(' '))
match = re.search(r'maximum context length is (\d+) tokens, however you requested (\d+) tokens', e)
if match is not None:
max_length = int(match.group(1))
cur_length = int(match.group(2))
ratio = float(max_length) / cur_length
for reduce_rate in range(9, 0, -1):
shorten_text = text.split(' ')
length = int(ratio * ori_length * (reduce_rate * 0.1))
shorten_text = ' '.join(shorten_text[:length])
try:
emb = client.embeddings.create(input=[shorten_text], model=model)
print(f'length={length} works! reduce_rate={0.1 * reduce_rate}.')
return torch.FloatTensor(emb.data[0].embedding).view(1, -1)
except:
continue
except (openai.RateLimitError, openai.APITimeoutError) as e:
print(f'{e}, sleep for {sleep_time} seconds')
time.sleep(sleep_time)
raise RuntimeError("Failed to get embedding after maximum retries")
def get_openai_embeddings(texts: list,
model: str = "text-embedding-ada-002",
n_max_nodes: int = 5) -> torch.FloatTensor:
"""
Get embeddings for a list of texts using OpenAI's embedding model.
Args:
texts (list): List of input texts to be embedded.
n_max_nodes (int): Maximum number of parallel processes. Default is 5.
model (str): The model to use for embedding. Default is "text-embedding-ada-002".
Returns:
torch.FloatTensor: A tensor containing embeddings for all input texts.
"""
assert isinstance(texts, list), f'texts must be list, but got {type(texts)}'
assert all([len(s) > 0 for s in texts]), 'every string in the `texts` list to be embedded should be non-empty'
processes = min(len(texts), n_max_nodes)
ada_encoder = partial(get_openai_embedding, model=model)
with multiprocessing.Pool(processes=processes) as pool:
results = pool.map(ada_encoder, texts)
results = torch.cat(results, dim=0)
return results
# Define the batch processing function at global scope
def process(text, encoder):
embeddings = encoder.encode(text, convert_to_tensor=True)
return embeddings
def get_sentence_transformer_embeddings(texts: list,
model_name: str = "all-mpnet-base-v2") -> torch.FloatTensor:
assert isinstance(texts, list), f'texts must be list, but got {type(texts)}'
assert all([len(s) > 0 for s in texts]), 'every string in the `texts` list to be embedded should be non-empty'
# only load the model once
if model_name not in MODEL_CACHE:
# Use partial to pass the model name to the get_embedding function
encoder = SentenceTransformer(model_name)
MODEL_CACHE[model_name] = encoder
else:
# print(f"Using cached model: {model_name}")
encoder = MODEL_CACHE[model_name]
# if os.name == 'posix':
# set_start_method('spawn', force=True)
results = process(texts, encoder=encoder)
results = results.cpu()
# Free up CUDA memory
torch.cuda.empty_cache()
return results
def get_contriever(dataset_name) -> torch.FloatTensor:
# Use partial to pass the model name to the get_embedding function
contriever_name = 'facebook/contriever'
encoder = AutoModel.from_pretrained(contriever_name)
tokenizer = AutoTokenizer.from_pretrained(contriever_name)
emb_dim = 768
encoder = Contriever(encoder, emb_dim=emb_dim)
current_file = Path(__file__).resolve()
project_root = current_file.parents[3]
checkpoint_path = f"{project_root}/Reasoning/text_retrievers/model_checkpoint/contriever/{dataset_name}/best_20250204.pth"
checkpoint = torch.load(checkpoint_path)
encoder.load_state_dict(checkpoint)
encoder.to('cuda')
encoder.eval()
return encoder, tokenizer
@torch.no_grad()
def get_contriever_embeddings(texts, encoder, tokenizer, device) -> torch.FloatTensor:
if isinstance(texts, str):
texts = [texts]
text_enc = tokenizer(texts, padding='max_length', truncation=True, return_tensors='pt', max_length=512)
input_ids = text_enc['input_ids'].to(device)
attention_mask = text_enc['attention_mask'].to(device)
token_type_ids = text_enc.get('token_type_ids', None)
if token_type_ids is not None:
token_type_ids = token_type_ids.to(device)
embeddings = encoder.get_text_emb(input_ids, attention_mask, token_type_ids)
embeddings = embeddings.cpu()
# Free up CUDA memory
torch.cuda.empty_cache()
return embeddings |