karthikvarunn's picture
Update app.py
8c5f5ba verified
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import HumanMessage
from langchain_openai import OpenAIEmbeddings
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Tuple
from langchain.schema import BaseRetriever
from langchain_core.documents import Document
from langchain_core.runnables import chain
from pinecone import Pinecone, ServerlessSpec
import openai
import numpy as np
from pinecone.grpc import PineconeGRPC as Pinecone
import gradio as gr
import asyncio
load_dotenv()
# Initialize OpenAI and Pinecone credentials
openai.api_key = os.environ.get("OPENAI_API_KEY")
pinecone_api_key = os.environ.get("PINECONE_API_KEY")
pinecone_environment = os.environ.get("PINECONE_ENV")
voyage_api_key = os.environ.get("VOYAGE_API_KEY")
pinecone_index_name = "briefmeta"
# Initialize Pinecone
try:
pc = Pinecone(api_key=pinecone_api_key)
except Exception as e:
print(f"Error connecting to Pinecone: {str(e)}")
embeddings = VoyageAIEmbeddings(
voyage_api_key=voyage_api_key, model="voyage-law-2"
)
def search_documents(query):
try:
vector_store = PineconeVectorStore(index_name=pinecone_index_name, embedding=embeddings)
results = vector_store.max_marginal_relevance_search(query, k=10, fetch_k=30) # Adjust fetch_k for more diverse results
# Filter results to ensure uniqueness based on metadata.id
seen_ids = set()
unique_results = []
for result in results:
unique_id = result.metadata.get("id")
if unique_id not in seen_ids:
seen_ids.add(unique_id)
unique_results.append(result)
# Collect relevant context from unique results
context = []
for result in unique_results:
context.append({
"doc_id": result.metadata.get("doc_id", "N/A"),
"chunk_id": result.metadata.get("id", "N/A"),
"title": result.metadata.get("source", "N/A"),
"text": result.page_content,
"page_number": str(result.metadata.get("page", "N/A")),
"score": str(result.metadata.get("score", "N/A")),
})
return context
except Exception as e:
return [], f"Error searching documents: {str(e)}"
# Reranker
def rerank(query, context):
result = pc.inference.rerank(
model="bge-reranker-v2-m3",
query=query,
documents=context,
top_n=5,
return_documents=True,
# parameters={
# "truncate": "END"
# }
)
return result
def generate_output(context, query):
try:
llm = ChatOpenAI(model="gpt-4", openai_api_key=openai.api_key, temperature=0.7)
prompt_template = PromptTemplate(
template="""
Use the following context to answer the question as accurately as possible:
Context: {context}
Question: {question}
Answer:""",
input_variables=["context", "question"]
)
prompt = prompt_template.format(context=context, question=query)
response = llm([HumanMessage(content=prompt)])
return response.content
except Exception as e:
return f"Error generating output: {str(e)}"
def complete_workflow(query):
try:
context_data = search_documents(query)
reranked = rerank(query, context_data)
context_data= []
for i, entry in enumerate(reranked.data): # Access the 'data' attribute
context_data.append({
'chunk_id': entry['document']['chunk_id'],
'doc_id': entry['document']['doc_id'],
'title': entry['document']['title'],
'text': entry['document']['text'],
'page_number': str(entry['document']['page_number']),
'score': str(entry['score'])
})
document_titles = list({os.path.basename(doc["title"]) for doc in context_data}) # Get only file names
formatted_titles = " " + "\n".join(document_titles)
total_results = len(context_data) # Count the total number of results
results = {
"results": [
{
"natural_language_output": generate_output(doc["text"], query),
"chunk_id": doc["chunk_id"],
"document_id": doc["doc_id"], # Assuming doc_id is the UUID
"title": doc["title"],
"text": doc["text"],
"page_number": doc["page_number"],
"score": doc["score"],
}
for doc in context_data
],
"total_results": total_results # Added total_results field
}
return results, formatted_titles # Return results and formatted document titles
except Exception as e:
return {"results": [], "total_results": 0}, f"Error in workflow: {str(e)}"
def gradio_app():
with gr.Blocks(css=".result-output {width: 150%; font-size: 16px; padding: 10px;}") as app:
gr.Markdown("### Intelligent Document Search Prototype-v0.1.2 ")
with gr.Row():
user_query = gr.Textbox(label="Enter Your Search Query")
search_btn = gr.Button("Search")
with gr.Row():
result_output = gr.JSON(label="Search Results", elem_id="result-output")
with gr.Row():
titles_output = gr.Textbox(label="Document Titles", interactive=False) # New Textbox for Titles
search_btn.click(
complete_workflow,
inputs=user_query,
outputs=[result_output, titles_output],
)
return app
# Launch the app
gradio_app().launch()