Midterm-streamlit / utils /_admin_util.py
deepali1021's picture
Added files
b092604
import os
import tiktoken
import PyPDF2
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import streamlit as st
from langchain.prompts import ChatPromptTemplate
from dotenv import load_dotenv
HUMAN_TEMPLATE = """
You are a helpful assistant who answers questions based on provided context.
You must only use the provided context, and cannot use your own knowledge.
If you do not know the answer, or it's not contained in the provided context response with "I don't know"
#Question:
{query}
#CONTEXT:
{context}
"""
def check_api_key():
load_dotenv()
"""Verify that the API key is set and valid"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OpenAI API key not found in environment variables")
return api_key
#Read PDF data
def read_pdf_data(pdf_file):
try:
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
if not text.strip():
raise ValueError("No text extracted from PDF")
return text
except Exception as e:
raise Exception(f"Error reading PDF: {str(e)}")
def tiktoken_len(text):
try:
tokens = tiktoken.encoding_for_model("gpt-4").encode(text)
return len(tokens)
except Exception as e:
raise Exception(f"Error in token calculation: {str(e)}")
#Split data into chunks
def split_data(text):
try:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Increased for better context
chunk_overlap=200, # Added overlap for better continuity
)
chunks = text_splitter.split_text(text)
if not chunks:
raise ValueError("Text splitting produced no chunks")
return chunks
except Exception as e:
raise Exception(f"Error splitting text: {str(e)}")
#Create embeddings instance
def create_embeddings():
try:
api_key = check_api_key()
embedding_model = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=api_key
)
return embedding_model
except Exception as e:
raise Exception(f"Error creating embeddings model: {str(e)}")
# Create a vector database using Qdrant
def create_vector_store(embedding_model, chunks):
try:
embedding_dim = 1536
client = QdrantClient(":memory:") # Consider using persistent storage for production
# Create collection with error handling
try:
client.create_collection(
collection_name="lcel_doc_v2",
vectors_config=VectorParams(size=embedding_dim, distance=Distance.COSINE),
)
except Exception as e:
raise Exception(f"Error creating Qdrant collection: {str(e)}")
vector_store = QdrantVectorStore(
client=client,
collection_name="lcel_doc_v2",
embedding=embedding_model,
)
# Add texts with progress tracking
try:
_ = vector_store.add_texts(texts=chunks)
except Exception as e:
raise Exception(f"Error adding texts to vector store: {str(e)}")
return vector_store
except Exception as e:
raise Exception(f"Error in vector store creation: {str(e)}")
# create RAG
def create_rag():
try:
api_key = check_api_key()
openai_chat_model = ChatOpenAI(
model="gpt-4o-mini",
openai_api_key=api_key
)
chat_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that answers questions based on the provided context."),
("human", HUMAN_TEMPLATE)
])
if 'vector_store' in st.session_state:
vector_store = st.session_state.vector_store
else:
raise ValueError("Vector store not found in session state")
retriever = vector_store.as_retriever(search_kwargs={"k": 5})
simple_rag = (
{"context": retriever, "query": RunnablePassthrough()}
| chat_prompt
| openai_chat_model
| StrOutputParser()
)
return simple_rag
except Exception as e:
raise Exception(f"Error creating RAG chain: {str(e)}")
# Invoke RAG
def invoke_rag(query):
try:
rag_chain = create_rag()
response = rag_chain.invoke(query)
return response
except Exception as e:
raise Exception(f"Error invoking RAG chain: {str(e)}")