Mayyar-RAG / app.py
Mojo3's picture
Update app.py
c2defcc verified
import streamlit as st
from docx import Document
import os
from langchain_core.prompts import PromptTemplate
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import time
from sentence_transformers import SentenceTransformer
from langchain.vectorstores import Chroma
from langchain.docstore.document import Document as Document2
from langchain_community.embeddings import HuggingFaceEmbeddings
import cohere
from langchain_core.prompts import PromptTemplate
# Load token from environment variable
token = os.getenv("HF_TOKEN")
print("my token is ", token)
# Save the token to Hugging Face's system directory
docs_folder = "./converted_docs"
# Function to load .docx files from Google Drive folder
def load_docx_files_from_drive(drive_folder):
docx_files = [f for f in os.listdir(drive_folder) if f.endswith(".docx")]
documents = []
for file_name in docx_files:
file_path = os.path.join(drive_folder, file_name)
doc = Document(file_path)
content = "\n".join([p.text for p in doc.paragraphs if p.text.strip()])
documents.append(content)
return documents
# Load .docx files from Google Drive folder
documents = load_docx_files_from_drive(docs_folder)
def split_extracted_text_into_chunks(documents):
print("Splitting text into chunks")
# List to hold all chunks
chunks = []
for doc_text in documents:
# Split the document text into lines
lines = doc_text.splitlines()
# Initialize variables for splitting
current_chunk = []
for line in lines:
# Check if the line starts with "File Name:"
if line.startswith("File Name:"):
# If there's a current chunk, save it before starting a new one
if current_chunk:
chunks.append("\n".join(current_chunk))
current_chunk = [] # Reset the current chunk
# Add the line to the current chunk
current_chunk.append(line)
# Add the last chunk for the current document
if current_chunk:
chunks.append("\n".join(current_chunk))
return chunks
# Split the extracted documents into chunks
chunks = split_extracted_text_into_chunks(documents)
def save_chunks_to_file(chunks, output_file_path):
print("Saving chunks to file")
# Open the file in write mode
with open(output_file_path, "w", encoding="utf-8") as file:
for i, chunk in enumerate(chunks, start=1):
# Write each chunk with a header for easy identification
file.write(f"Chunk {i}:\n")
file.write(chunk)
file.write("\n" + "=" * 50 + "\n")
# Path to save the chunks file
output_file_path = "./chunks_output.txt"
# Split the extracted documents into chunks
chunks = split_extracted_text_into_chunks(documents)
# Save the chunks to the file
save_chunks_to_file(chunks, output_file_path)
# Step 1: Load the model through LangChain's wrapper
embedding_model = HuggingFaceEmbeddings(
model_name="Omartificial-Intelligence-Space/Arabic-Triplet-Matryoshka-V2"
)
print("#0")
# Step 2: Embed the chunks (now simplified)
def embed_chunks(chunks):
status_text = st.empty()
progress_bar = st.progress(0)
results = []
total_chunks = len(chunks)
for i, chunk in enumerate(chunks):
result = {
"chunk": chunk,
"embedding": embedding_model.embed_query(chunk)
}
results.append(result)
progress = (i + 1) / total_chunks
progress_bar.progress(progress)
status_text.text(f"Processed {i+1}/{total_chunks} chunks ({progress:.0%})")
progress_bar.progress(1.0)
status_text.text("Embedding complete!")
return results
embeddings = embed_chunks(chunks)
print("#1")
# Step 3: Prepare documents (unchanged)
def prepare_documents_for_chroma(embeddings):
status_text = st.empty()
progress_bar = st.progress(0)
documents = []
total_entries = len(embeddings)
for i, entry in enumerate(embeddings, start=1):
doc = Document2(
page_content=entry["chunk"],
metadata={"chunk_index": i}
)
documents.append(doc)
progress = i / total_entries
progress_bar.progress(progress)
status_text.text(f"Processing document {i}/{total_entries} ({progress:.0%})")
progress_bar.progress(1.0)
status_text.text(f"✅ Successfully prepared {total_entries} documents")
return documents
print("#2")
documents = prepare_documents_for_chroma(embeddings)
print("Creating the vectore store")
# Step 4: Create Chroma store (fixed)
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embedding_model, # Proper embedding object
persist_directory="./chroma_db", # Optional persistence
)
class RAGPipeline:
def __init__(self, vectorstore, api_key, model_name="c4ai-aya-expanse-8b", k=3):
print("Initializing RAG Pipeline")
self.vectorstore = vectorstore
self.model_name = model_name
self.k = k
self.api_key = api_key
self.client = cohere.Client(api_key) # Initialize the Cohere client
self.retriever = self.vectorstore.as_retriever(
search_type="mmr", search_kwargs={"k": 3}
)
self.prompt_template = PromptTemplate.from_template(self._get_template())
def _get_template(self):
return """<s>[INST] <<SYS>>
أنت مساعد مفيد يقدم إجابات باللغة العربية بناءً على السياق المقدم.
- أجب فقط باللغة العربية
- إذا لم تجد إجابة في السياق، قل أنك لا تعرف
- كن دقيقاً وواضحاً في إجاباتك
-جاوب من السياق حصريا
<</SYS>>
السياق: {context}
السؤال: {question}
الإجابة: [/INST]\
"""
def generate_response(self, question):
retrieved_docs = self._retrieve_documents(question)
prompt = self._create_prompt(retrieved_docs, question)
response = self._generate_response_cohere(prompt)
return response
def _retrieve_documents(self, question):
retrieved_docs = self.retriever.invoke(question)
# print("\n=== المستندات المسترجعة ===")
# for i, doc in enumerate(retrieved_docs):
# print(f"المستند {i+1}: {doc.page_content}")
# print("==========================\n")
# دمج النصوص المسترجعة في سياق واحد
return " ".join([doc.page_content for doc in retrieved_docs])
def _create_prompt(self, docs, question):
return self.prompt_template.format(context=docs, question=question)
def _generate_response_cohere(self, prompt):
# Call Cohere's generate API
response = self.client.generate(
model=self.model_name,
prompt=prompt,
max_tokens=2000, # Adjust token limit based on requirements
temperature=0.3, # Control creativity
stop_sequences=None,
)
if response.generations:
return response.generations[0].text.strip()
else:
raise Exception("No response generated by Cohere API.")
st.title("Simple Text Generator")
api_key = os.getenv("API_KEY")
s = api_key[:5]
print("KEY: ", s)
rag_pipeline = RAGPipeline(vectorstore=vectorstore, api_key=api_key)
print("Enter your question Here: ")
question = st.text_input("أدخل سؤالك هنا")
if st.button("Generate Answer"):
response = rag_pipeline.generate_response(question)
st.write(response)
print("Question: ", question)
print("Response: ", response)