Spaces:
Sleeping
Sleeping
| import requests | |
| import io | |
| import re | |
| import numpy as np | |
| import faiss | |
| import torch | |
| from pypdf import PdfReader | |
| from rank_bm25 import BM25Okapi | |
| from sentence_transformers import SentenceTransformer | |
| from accelerate import Accelerator | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from bert_score import score | |
| import gradio as gr | |
| # --- Preload Data --- | |
| DEFAULT_PDF_URLS = [ | |
| "https://www.latentview.com/wp-content/uploads/2023/07/LatentView-Annual-Report-2022-23.pdf", | |
| "https://www.latentview.com/wp-content/uploads/2024/08/LatentView-Annual-Report-2023-24.pdf" | |
| ] | |
| embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| def preload_data(pdf_urls): | |
| def download_pdf(url): | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| return response.content | |
| def extract_text_from_pdf(pdf_bytes): | |
| pdf_file = io.BytesIO(pdf_bytes) | |
| reader = PdfReader(pdf_file) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() or "" | |
| return text | |
| def preprocess_text(text): | |
| financial_symbols = r"\$\€\₹\£\¥\₩\₽\₮\₦\₲" | |
| text = re.sub(fr"[^\w\s{financial_symbols}.,%/₹$€¥£-]", "", text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def chunk_text(text, chunk_size=1024, overlap_size=100): | |
| chunks = [] | |
| start = 0 | |
| text_length = len(text) | |
| while start < text_length: | |
| end = min(start + chunk_size, text_length) | |
| if end < text_length and text[end].isalnum(): | |
| last_space = text.rfind(" ", start, end) | |
| if last_space != -1: | |
| end = last_space | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| if end == text_length: | |
| break | |
| overlap_start = max(0, end - overlap_size) | |
| if overlap_start < end: | |
| last_overlap_space = text.rfind(" ", 0, overlap_start) | |
| if last_overlap_space != -1 and last_overlap_space > start: | |
| start = last_overlap_space + 1 | |
| else: | |
| start = end | |
| else: | |
| start = end | |
| return chunks | |
| all_data = [] | |
| for url in pdf_urls: | |
| pdf_bytes = download_pdf(url) | |
| text = extract_text_from_pdf(pdf_bytes) | |
| preprocessed_text = preprocess_text(text) | |
| all_data.append(preprocessed_text) | |
| chunks = [] | |
| for data in all_data: | |
| chunks.extend(chunk_text(data)) | |
| embeddings = embedding_model.encode(chunks) | |
| index = faiss.IndexFlatL2(embeddings.shape[1]) | |
| index.add(embeddings) | |
| return index, chunks | |
| index, chunks = preload_data(DEFAULT_PDF_URLS) | |
| accelerator = Accelerator() | |
| MODEL_NAME = "microsoft/phi-2" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto", trust_remote_code=True, cache_dir="./my_models") | |
| model = accelerator.prepare(model) | |
| generator = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
| def bm25_retrieval(query, documents, top_k=3): | |
| tokenized_docs = [doc.split() for doc in documents] | |
| bm25 = BM25Okapi(tokenized_docs) | |
| return [documents[i] for i in np.argsort(bm25.get_scores(query.split()))[::-1][:top_k]] | |
| def adaptive_retrieval(query, index, chunks, top_k=3, bm25_weight=0.5): | |
| query_embedding = embedding_model.encode([query], convert_to_numpy=True, dtype=np.float16) | |
| _, indices = index.search(query_embedding, top_k) | |
| vector_results = [chunks[i] for i in indices[0]] | |
| bm25_results = bm25_retrieval(query, chunks, top_k) | |
| return list(set(vector_results + bm25_results)) | |
| def rerank(query, results): | |
| query_embedding = embedding_model.encode([query], convert_to_numpy=True) | |
| result_embeddings = embedding_model.encode(results, convert_to_numpy=True) | |
| similarities = np.dot(result_embeddings, query_embedding.T).flatten() | |
| return [results[i] for i in np.argsort(similarities)[::-1]], similarities | |
| def merge_chunks(retrieved_chunks, overlap_size=100): | |
| merged_chunks = [] | |
| buffer = retrieved_chunks[0] if retrieved_chunks else "" | |
| for i in range(1, len(retrieved_chunks)): | |
| chunk = retrieved_chunks[i] | |
| overlap_start = buffer[-overlap_size:] | |
| overlap_index = chunk.find(overlap_start) | |
| if overlap_index != -1: | |
| buffer += chunk[overlap_index + overlap_size:] | |
| else: | |
| merged_chunks.append(buffer) | |
| buffer = chunk | |
| if buffer: | |
| merged_chunks.append(buffer) | |
| return merged_chunks | |
| def calculate_confidence(query, answer): | |
| P, R, F1 = score([answer], [query], lang="en", verbose=False) | |
| return F1.item() | |
| def generate_response(query, context): | |
| prompt = f"""Your task is to analyze the given Context and answer the Question concisely in plain English. | |
| **Guidelines:** | |
| - Do NOT include </think> tag, just provide the final answer only. | |
| - Provide a direct, factual answer based strictly on the Context. | |
| - Avoid generating Python code, solutions, or any irrelevant information. | |
| Context: {context} | |
| Question: {query} | |
| Answer: | |
| """ | |
| response = generator(prompt, max_new_tokens=150, num_return_sequences=1)[0]['generated_text'] | |
| answer = response.split("Answer:")[1].strip() | |
| return answer | |
| def process_query(query): | |
| retrieved_chunks = adaptive_retrieval(query, index, chunks) | |
| merged_chunks = merge_chunks(retrieved_chunks, 50) | |
| reranked_chunks, similarities = rerank(query, merged_chunks) | |
| context = " ".join(reranked_chunks[:3]) | |
| answer = generate_response(query, context) | |
| confidence = calculate_confidence(query, answer) | |
| full_response = f"{answer}\n\nConfidence: {confidence:.2f}" | |
| return full_response | |
| iface = gr.Interface( | |
| fn=process_query, | |
| inputs=gr.Textbox(placeholder="Enter your financial question"), | |
| outputs="text", | |
| title="Financial Document Q&A Chatbot", | |
| description="Ask questions about the preloaded financial documents." | |
| ) | |
| iface.launch() | |
| accelerator.free_memory() |