Spaces:
Sleeping
Sleeping
import streamlit as st | |
from docx import Document | |
import os | |
from langchain_core.prompts import PromptTemplate | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import torch | |
import time | |
from sentence_transformers import SentenceTransformer | |
from langchain.vectorstores import Chroma | |
from langchain.docstore.document import Document as Document2 | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import cohere | |
from langchain_core.prompts import PromptTemplate | |
# Load token from environment variable | |
token = os.getenv("HF_TOKEN") | |
print("my token is ", token) | |
# Save the token to Hugging Face's system directory | |
docs_folder = "./converted_docs" | |
# Function to load .docx files from Google Drive folder | |
def load_docx_files_from_drive(drive_folder): | |
docx_files = [f for f in os.listdir(drive_folder) if f.endswith(".docx")] | |
documents = [] | |
for file_name in docx_files: | |
file_path = os.path.join(drive_folder, file_name) | |
doc = Document(file_path) | |
content = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) | |
documents.append(content) | |
return documents | |
# Load .docx files from Google Drive folder | |
documents = load_docx_files_from_drive(docs_folder) | |
def split_extracted_text_into_chunks(documents): | |
print("Splitting text into chunks") | |
# List to hold all chunks | |
chunks = [] | |
for doc_text in documents: | |
# Split the document text into lines | |
lines = doc_text.splitlines() | |
# Initialize variables for splitting | |
current_chunk = [] | |
for line in lines: | |
# Check if the line starts with "File Name:" | |
if line.startswith("File Name:"): | |
# If there's a current chunk, save it before starting a new one | |
if current_chunk: | |
chunks.append("\n".join(current_chunk)) | |
current_chunk = [] # Reset the current chunk | |
# Add the line to the current chunk | |
current_chunk.append(line) | |
# Add the last chunk for the current document | |
if current_chunk: | |
chunks.append("\n".join(current_chunk)) | |
return chunks | |
# Split the extracted documents into chunks | |
chunks = split_extracted_text_into_chunks(documents) | |
def save_chunks_to_file(chunks, output_file_path): | |
print("Saving chunks to file") | |
# Open the file in write mode | |
with open(output_file_path, "w", encoding="utf-8") as file: | |
for i, chunk in enumerate(chunks, start=1): | |
# Write each chunk with a header for easy identification | |
file.write(f"Chunk {i}:\n") | |
file.write(chunk) | |
file.write("\n" + "=" * 50 + "\n") | |
# Path to save the chunks file | |
output_file_path = "./chunks_output.txt" | |
# Split the extracted documents into chunks | |
chunks = split_extracted_text_into_chunks(documents) | |
# Save the chunks to the file | |
save_chunks_to_file(chunks, output_file_path) | |
# Step 1: Load the model through LangChain's wrapper | |
embedding_model = HuggingFaceEmbeddings( | |
model_name="Omartificial-Intelligence-Space/Arabic-Triplet-Matryoshka-V2" | |
) | |
print("#0") | |
# Step 2: Embed the chunks (now simplified) | |
def embed_chunks(chunks): | |
status_text = st.empty() | |
progress_bar = st.progress(0) | |
results = [] | |
total_chunks = len(chunks) | |
for i, chunk in enumerate(chunks): | |
result = { | |
"chunk": chunk, | |
"embedding": embedding_model.embed_query(chunk) | |
} | |
results.append(result) | |
progress = (i + 1) / total_chunks | |
progress_bar.progress(progress) | |
status_text.text(f"Processed {i+1}/{total_chunks} chunks ({progress:.0%})") | |
progress_bar.progress(1.0) | |
status_text.text("Embedding complete!") | |
return results | |
embeddings = embed_chunks(chunks) | |
print("#1") | |
# Step 3: Prepare documents (unchanged) | |
def prepare_documents_for_chroma(embeddings): | |
status_text = st.empty() | |
progress_bar = st.progress(0) | |
documents = [] | |
total_entries = len(embeddings) | |
for i, entry in enumerate(embeddings, start=1): | |
doc = Document2( | |
page_content=entry["chunk"], | |
metadata={"chunk_index": i} | |
) | |
documents.append(doc) | |
progress = i / total_entries | |
progress_bar.progress(progress) | |
status_text.text(f"Processing document {i}/{total_entries} ({progress:.0%})") | |
progress_bar.progress(1.0) | |
status_text.text(f"✅ Successfully prepared {total_entries} documents") | |
return documents | |
print("#2") | |
documents = prepare_documents_for_chroma(embeddings) | |
print("Creating the vectore store") | |
# Step 4: Create Chroma store (fixed) | |
vectorstore = Chroma.from_documents( | |
documents=documents, | |
embedding=embedding_model, # Proper embedding object | |
persist_directory="./chroma_db", # Optional persistence | |
) | |
class RAGPipeline: | |
def __init__(self, vectorstore, api_key, model_name="c4ai-aya-expanse-8b", k=3): | |
print("Initializing RAG Pipeline") | |
self.vectorstore = vectorstore | |
self.model_name = model_name | |
self.k = k | |
self.api_key = api_key | |
self.client = cohere.Client(api_key) # Initialize the Cohere client | |
self.retriever = self.vectorstore.as_retriever( | |
search_type="mmr", search_kwargs={"k": 3} | |
) | |
self.prompt_template = PromptTemplate.from_template(self._get_template()) | |
def _get_template(self): | |
return """<s>[INST] <<SYS>> | |
أنت مساعد مفيد يقدم إجابات باللغة العربية بناءً على السياق المقدم. | |
- أجب فقط باللغة العربية | |
- إذا لم تجد إجابة في السياق، قل أنك لا تعرف | |
- كن دقيقاً وواضحاً في إجاباتك | |
-جاوب من السياق حصريا | |
<</SYS>> | |
السياق: {context} | |
السؤال: {question} | |
الإجابة: [/INST]\ | |
""" | |
def generate_response(self, question): | |
retrieved_docs = self._retrieve_documents(question) | |
prompt = self._create_prompt(retrieved_docs, question) | |
response = self._generate_response_cohere(prompt) | |
return response | |
def _retrieve_documents(self, question): | |
retrieved_docs = self.retriever.invoke(question) | |
# print("\n=== المستندات المسترجعة ===") | |
# for i, doc in enumerate(retrieved_docs): | |
# print(f"المستند {i+1}: {doc.page_content}") | |
# print("==========================\n") | |
# دمج النصوص المسترجعة في سياق واحد | |
return " ".join([doc.page_content for doc in retrieved_docs]) | |
def _create_prompt(self, docs, question): | |
return self.prompt_template.format(context=docs, question=question) | |
def _generate_response_cohere(self, prompt): | |
# Call Cohere's generate API | |
response = self.client.generate( | |
model=self.model_name, | |
prompt=prompt, | |
max_tokens=2000, # Adjust token limit based on requirements | |
temperature=0.3, # Control creativity | |
stop_sequences=None, | |
) | |
if response.generations: | |
return response.generations[0].text.strip() | |
else: | |
raise Exception("No response generated by Cohere API.") | |
st.title("Simple Text Generator") | |
api_key = os.getenv("API_KEY") | |
s = api_key[:5] | |
print("KEY: ", s) | |
rag_pipeline = RAGPipeline(vectorstore=vectorstore, api_key=api_key) | |
print("Enter your question Here: ") | |
question = st.text_input("أدخل سؤالك هنا") | |
if st.button("Generate Answer"): | |
response = rag_pipeline.generate_response(question) | |
st.write(response) | |
print("Question: ", question) | |
print("Response: ", response) | |