Spaces:
Build error
Build error
| import torch | |
| from fastapi import FastAPI, Depends, status | |
| from fastapi.responses import PlainTextResponse | |
| from transformers import AutoTokenizer, AutoModel, DPRQuestionEncoder | |
| from datasets import load_from_disk | |
| import time | |
| from typing import Dict | |
| import jwt | |
| from decouple import config | |
| from fastapi import Request, HTTPException | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| JWT_SECRET = config("secret") | |
| JWT_ALGORITHM = config("algorithm") | |
| app = FastAPI() | |
| app.ready = False | |
| columns = ['kilt_id', 'wikipedia_id', 'wikipedia_title', 'text', 'anchors', 'categories', | |
| 'wikidata_info', 'history'] | |
| min_snippet_length = 20 | |
| topk = 21 | |
| device = ("cuda" if torch.cuda.is_available() else "cpu") | |
| model = DPRQuestionEncoder.from_pretrained("vblagoje/dpr-question_encoder-single-lfqa-wiki").to(device) | |
| tokenizer = AutoTokenizer.from_pretrained("vblagoje/dpr-question_encoder-single-lfqa-wiki") | |
| _ = model.eval() | |
| index_file_name = "./data/kilt_wikipedia.faiss" | |
| kilt_wikipedia_paragraphs = load_from_disk("./data/kilt_wiki_prepared") | |
| # use paragraphs that are not simple fragments or very short sentences | |
| kilt_wikipedia_paragraphs = kilt_wikipedia_paragraphs.filter(lambda x: x["end_character"] > 200) | |
| class JWTBearer(HTTPBearer): | |
| def __init__(self, auto_error: bool = True): | |
| super(JWTBearer, self).__init__(auto_error=auto_error) | |
| async def __call__(self, request: Request): | |
| credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request) | |
| if credentials: | |
| if not credentials.scheme == "Bearer": | |
| raise HTTPException(status_code=403, detail="Invalid authentication scheme.") | |
| if not self.verify_jwt(credentials.credentials): | |
| raise HTTPException(status_code=403, detail="Invalid token or expired token.") | |
| return credentials.credentials | |
| else: | |
| raise HTTPException(status_code=403, detail="Invalid authorization code.") | |
| def verify_jwt(self, jwtoken: str) -> bool: | |
| isTokenValid: bool = False | |
| try: | |
| payload = decodeJWT(jwtoken) | |
| except: | |
| payload = None | |
| if payload: | |
| isTokenValid = True | |
| return isTokenValid | |
| def token_response(token: str): | |
| return { | |
| "access_token": token | |
| } | |
| def signJWT(user_id: str) -> Dict[str, str]: | |
| payload = { | |
| "user_id": user_id, | |
| "expires": time.time() + 6000 | |
| } | |
| token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) | |
| return token_response(token) | |
| def decodeJWT(token: str) -> dict: | |
| try: | |
| decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) | |
| return decoded_token if decoded_token["expires"] >= time.time() else None | |
| except: | |
| return {} | |
| def embed_questions_for_retrieval(questions): | |
| query = tokenizer(questions, max_length=128, padding=True, truncation=True, return_tensors="pt") | |
| with torch.no_grad(): | |
| q_reps = model(query["input_ids"].to(device), query["attention_mask"].to(device)).pooler_output | |
| return q_reps.cpu().numpy() | |
| def query_index(question): | |
| question_embedding = embed_questions_for_retrieval([question]) | |
| scores, wiki_passages = kilt_wikipedia_paragraphs.get_nearest_examples("embeddings", question_embedding, k=topk) | |
| columns = ['wikipedia_id', 'title', 'text', 'section', 'start_paragraph_id', 'end_paragraph_id', | |
| 'start_character', 'end_character'] | |
| retrieved_examples = [] | |
| r = list(zip(wiki_passages[k] for k in columns)) | |
| for i in range(topk): | |
| retrieved_examples.append({k: v for k, v in zip(columns, [r[j][0][i] for j in range(len(columns))])}) | |
| return retrieved_examples | |
| def startup(): | |
| kilt_wikipedia_paragraphs.load_faiss_index("embeddings", index_file_name, device=0) | |
| app.ready = True | |
| def healthz(): | |
| if app.ready: | |
| return PlainTextResponse("ok") | |
| return PlainTextResponse("service unavailable", status_code=status.HTTP_503_SERVICE_UNAVAILABLE) | |
| def find_context(question: str = None): | |
| return [res for res in query_index(question) if len(res["text"].split()) > min_snippet_length][:int(topk / 3)] | |