Spaces:
Running
Running
import os | |
import torch | |
import pandas as pd | |
from transformers import AutoTokenizer, AutoModel, pipeline | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import PyPDFLoader, CSVLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain.prompts import PromptTemplate | |
from pathlib import Path | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.document_loaders import DirectoryLoader | |
from datetime import datetime | |
from concurrent.futures import ThreadPoolExecutor | |
class RagWithScore: | |
def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2", | |
cross_encoder_name="cross-encoder/ms-marco-TinyBERT-L-2-v2", | |
llm_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0", | |
documents_dir="financial_docs"): | |
""" | |
Initialize the Financial RAG system | |
Args: | |
model_name: The embedding model name | |
cross_encoder_name: The cross-encoder model for reranking | |
llm_name: Small language model for generation | |
documents_dir: Directory containing financial statements | |
""" | |
# Initialize embedding model | |
self.embedding_model = HuggingFaceEmbeddings(model_name=model_name) | |
# Initialize cross-encoder for reranking | |
self.cross_encoder = CrossEncoder(cross_encoder_name) | |
# Initialize small language model | |
self.tokenizer = AutoTokenizer.from_pretrained(llm_name) | |
self.llm = pipeline( | |
"text-generation", | |
model=llm_name, | |
tokenizer=self.tokenizer, | |
torch_dtype=torch.bfloat16, | |
device_map="auto", | |
max_new_tokens=512, | |
do_sample=False, # Set to False for deterministic outputs | |
temperature=0.2, # Reduce randomness | |
top_p=1.0 # No nucleus sampling | |
) | |
# Store paths | |
self.documents_dir = documents_dir | |
self.vector_store = None | |
# Input guardrail rules - sensitive terms/patterns | |
self.guardrail_patterns = [ | |
"insider trading", | |
"stock manipulation", | |
"fraud detection", | |
"embezzlement", | |
"money laundering", | |
"tax evasion", | |
"illegal activities" | |
] | |
# Confidence score thresholds | |
self.confidence_thresholds = { | |
"high": 0.75, | |
"medium": 0.5, | |
"low": 0.3 | |
} | |
import os | |
## Loadung document and creating vector index at the start of the application | |
def load_and_process_documents(self): | |
"""Load, split and process financial documents""" | |
print("Processing documents to create FAISS index...") | |
loader = DirectoryLoader('./financial_docs', glob="**/*.pdf") | |
documents = loader.load() | |
# Split documents into chunks | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, chunk_overlap=200 | |
) | |
chunks = text_splitter.split_documents(documents) | |
print(len(chunks)) | |
# Create and save FAISS vector store | |
self.vector_store = FAISS.from_documents(chunks, embedding=self.embedding_model) | |
self.vector_store.save_local("faiss_index") | |
return self.vector_store | |
## generating response with the query and context by the help of the prompt and calling the slm with the prompt | |
def generate_answer(self, query, context): | |
"""Generate answer and calculate confidence score concurrently.""" | |
# Format context into a single string | |
context_str = "\n\n".join([doc.page_content for doc in context]) | |
# Define the prompt | |
prompt = f""" | |
You are a financial analyst assistant that helps answer questions about company financial statements. | |
Use the provided financial information to give accurate and helpful answers. | |
Context: | |
{context_str} | |
Question: {query} | |
Instructions: | |
1. Be concise and avoid repetition. | |
2. If the question is too broad or unclear, ask for clarification or provide a general overview. | |
3. Only if the question requires numerical calculations, show the step-by-step logic behind the calculation before providing the final answer. | |
4. If the context is insufficient, say "Not enough information to answer this question." | |
5. Do not provide sources unless explicitly asked. | |
Answer: | |
""" | |
# Generate answer using the language model | |
response = self.llm(prompt)[0]['generated_text'] | |
answer = response[len(prompt):].strip() | |
# # Calculate confidence score concurrently | |
# with ThreadPoolExecutor() as executor: | |
# future_confidence = executor.submit(self.calculate_confidence_score, query, context, answer) | |
# confidence_score = future_confidence.result() | |
return answer | |
## for confidence score cosine similarity is calculated between the query embedding and answer embedding | |
def calculate_confidence_score(self, query, retrieved_docs, answer): | |
""" | |
Calculate confidence score using embedding similarity (parallelized). | |
""" | |
# Get embeddings for query and answer | |
query_embedding = self.embedding_model.embed_query(query) | |
answer_embedding = self.embedding_model.embed_query(answer) | |
# Calculate cosine similarity | |
dot_product = sum(a * b for a, b in zip(query_embedding, answer_embedding)) | |
magnitude_a = sum(a * a for a in query_embedding) ** 0.5 | |
magnitude_b = sum(b * b for b in answer_embedding) ** 0.5 | |
similarity = dot_product / (magnitude_a * magnitude_b) if magnitude_a * magnitude_b > 0 else 0 | |
return similarity | |
## confidence level is determined from the confidence score | |
def get_confidence_level(self, confidence_score): | |
""" | |
Convert numerical confidence score to a level (high, medium, low) | |
Args: | |
confidence_score: Float between 0 and 1 | |
Returns: | |
str: Confidence level ("high", "medium", or "low") | |
""" | |
if confidence_score >= self.confidence_thresholds["high"]: | |
return "high" | |
elif confidence_score >= self.confidence_thresholds["medium"]: | |
return "medium" | |
elif confidence_score >= self.confidence_thresholds["low"]: | |
return "low" | |
else: | |
return "very low" | |
## guardrail is applied to filter harmful user queries | |
def apply_input_guardrail(self, query): | |
"""Check if query violates input guardrails""" | |
query_lower = query.lower() | |
for pattern in self.guardrail_patterns: | |
if pattern in query_lower: | |
return True, f"I cannot process queries about {pattern}. Please reformulate your question." | |
return False, "" | |
## first the to 5 chunks are retrieved. then after reranking with cross encoder top 2 are rerieved | |
def retrieve_with_reranking(self, query, top_k=5, rerank_top_k=3): | |
print("retrieve_with_reranking start") | |
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) | |
"""Retrieve relevant chunks and rerank them with cross-encoder""" | |
# Initial retrieval using embedding similarity | |
docs_and_scores = self.vector_store.similarity_search_with_score(query, k=top_k) | |
# Sort retrieved documents by FAISS similarity score (deterministic sorting) | |
docs_and_scores.sort(key=lambda x: x[1], reverse=True) | |
# Prepare pairs for cross-encoder | |
pairs = [(query, doc.page_content) for doc, _ in docs_and_scores] | |
print(pairs) | |
print(len(pairs)) | |
# Get scores from cross-encoder | |
scores = self.cross_encoder.predict(pairs) | |
print(scores) | |
# Sort by cross-encoder scores | |
reranked_results = sorted(zip(docs_and_scores, scores), key=lambda x: x[1], reverse=True) | |
print(reranked_results) | |
print(len(reranked_results)) | |
# Return the top reranked results | |
print("retrieve_with_reranking end") | |
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) | |
return [doc for (doc, _), _ in reranked_results[:rerank_top_k]] | |
## to handle irrerelevant questions, a rule based claasifier is bein used to classify the questions | |
def is_financial_question(self,query): | |
financial_keywords = [ | |
"finance", "financial", "revenue", "profit", "loss", "ebitda", "cash flow", | |
"balance sheet", "income statement", "stock", "bond", "investment", "risk", | |
"interest rate", "inflation", "debt", "equity", "valuation", "dividend", | |
"market", "economy", "GDP", "currency", "exchange rate", "tax", "audit", | |
"compliance", "regulation", "SEC", "earnings", "capital", "asset", "liability" | |
] | |
query_lower = query.lower() | |
return any(keyword in query_lower for keyword in financial_keywords) | |
##the pipeline of answer and confidence score generation from the query | |
def answer_question(self, query): | |
"""End-to-end pipeline to answer a question with confidence score""" | |
# Apply input guardrail | |
blocked, message = self.apply_input_guardrail(query) | |
if blocked: | |
return {"answer": message, "source_documents": [], "blocked": True, "confidence_score": 0, "confidence_level": "none"} | |
if not self.is_financial_question(query): | |
return { | |
"answer": "This question is outside the scope of financial data. Please ask a question related to finance.", | |
"source_documents": [], | |
"blocked": True, | |
"confidence_score": 0, | |
"confidence_level": "none" | |
} | |
# Retrieve and rerank relevant contexts | |
reranked_docs = self.retrieve_with_reranking(query) | |
# Generate answer and confidence score | |
answer = self.generate_answer(query, reranked_docs) | |
print(f"answer : {answer}") | |
# Calculate confidence score | |
confidence_score = self.calculate_confidence_score(query, reranked_docs, answer) | |
print(f"confidence score : {confidence_score}") | |
confidence_level = self.get_confidence_level(confidence_score) | |
return { | |
"answer": answer, | |
"source_documents": reranked_docs, | |
"blocked": False, | |
"confidence_score": confidence_score, | |
"confidence_level": confidence_level | |
} |