Spaces:
Runtime error
Runtime error
| import subprocess | |
| import os | |
| import torch | |
| from dotenv import load_dotenv | |
| from langchain_community.vectorstores import Qdrant | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain.schema.output_parser import StrOutputParser | |
| from qdrant_client import QdrantClient, models | |
| from langchain_openai import ChatOpenAI | |
| import gradio as gr | |
| import logging | |
| from typing import List, Tuple, Generator | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from langchain_huggingface.llms import HuggingFacePipeline | |
| from langchain_cerebras import ChatCerebras | |
| from queue import Queue | |
| from threading import Thread | |
| from langchain.chains import LLMChain | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_huggingface import HuggingFaceEndpoint | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class Message: | |
| role: str | |
| content: str | |
| timestamp: str | |
| class ChatHistory: | |
| def __init__(self): | |
| self.messages: List[Message] = [] | |
| def add_message(self, role: str, content: str): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| self.messages.append(Message(role=role, content=content, timestamp=timestamp)) | |
| def get_formatted_history(self, max_messages: int = 10) -> str: | |
| recent_messages = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages | |
| formatted_history = "\n".join([ | |
| f"{msg.role}: {msg.content}" for msg in recent_messages | |
| ]) | |
| return formatted_history | |
| def clear(self): | |
| self.messages = [] | |
| # Load environment variables and setup | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| C_apikey = os.getenv("C_apikey") | |
| OPENAPI_KEY = os.getenv("OPENAPI_KEY") | |
| GEMINI = os.getenv("GEMINI") | |
| if not HF_TOKEN: | |
| logger.error("HF_TOKEN is not set in the environment variables.") | |
| exit(1) | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| try: | |
| client = QdrantClient( | |
| url=os.getenv("QDRANT_URL"), | |
| api_key=os.getenv("QDRANT_API_KEY"), | |
| prefer_grpc=True | |
| ) | |
| except Exception as e: | |
| logger.error("Failed to connect to Qdrant.") | |
| exit(1) | |
| collection_name = "mawared" | |
| try: | |
| client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=models.VectorParams( | |
| size=384, | |
| distance=models.Distance.COSINE | |
| ) | |
| ) | |
| except Exception as e: | |
| if "already exists" not in str(e): | |
| logger.error(f"Error creating collection: {e}") | |
| exit(1) | |
| db = Qdrant( | |
| client=client, | |
| collection_name=collection_name, | |
| embeddings=embeddings, | |
| ) | |
| retriever = db.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={"k": 5} | |
| ) | |
| #llm = ChatCerebras( | |
| # model="llama-3.3-70b", | |
| # api_key=C_apikey, | |
| # streaming=True | |
| #) | |
| # llm = ChatOpenAI( | |
| # model="meta-llama/Llama-3.3-70B-Instruct", | |
| # temperature=0, | |
| # max_tokens=None, | |
| # timeout=None, | |
| # max_retries=2, | |
| # api_key=HF_TOKEN, # if you prefer to pass api key in directly instaed of using env vars | |
| # base_url="https://api-inference.huggingface.co/v1/", | |
| # stream=True, | |
| # ) | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-thinking-exp-01-21", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2, | |
| api_key=GEMINI, | |
| stream=True, | |
| ) | |
| template = """ | |
| You are a specialized friendly AI assistant for the Mawared HR System, designed to provide accurate and contextually relevant support based solely on the provided context and chat history. | |
| Core Principles | |
| Source of Truth: Use only the information available in the retrieved context and chat history. Do not fabricate details or access external knowledge. | |
| Clarity and Precision: Communicate clearly, concisely, and professionally, using straightforward language for easy comprehension. | |
| Actionable Guidance: Deliver practical solutions, step-by-step workflows, and troubleshooting advice directly related to Mawared HR queries. | |
| Structured Instructions: Provide numbered, easy-to-follow instructions when explaining complex processes. | |
| Targeted Clarification: If a query lacks detail, ask specific questions to obtain the necessary information, explicitly stating what is missing. | |
| Exclusive Focus: Address only Mawared HR-related topics and avoid unrelated discussions. | |
| Professional Tone: Maintain a friendly, approachable, and professional demeanor. | |
| Response Guidelines | |
| Analyze the Query Thoughtfully: | |
| Start by thoroughly examining the user's question and reviewing the chat history. | |
| Consider what the user explicitly asked and infer their intent from the context provided. | |
| Mentally identify potential gaps in information before proceeding. | |
| Break Down Context Relevance: | |
| Isolate and interpret relevant pieces of context that apply directly to the query. | |
| Match the user's needs with the most relevant data from the context or chat history. | |
| Develop the Response in a Stepwise Manner: | |
| Construct a logical chain of thoughts: | |
| What does the user want to achieve? | |
| Which parts of the context can address this? | |
| What steps or details are needed for clarity? | |
| Provide responses in a structured, easy-to-follow format (e.g., numbered lists, bullet points). | |
| Ask for Clarifications Strategically: | |
| If the query lacks sufficient detail, identify the precise information missing. | |
| Frame your clarification politely and explicitly (e.g., βCould you confirm [specific detail] to proceed with [action/task]?β). | |
| Ensure Directness and Professionalism: | |
| Avoid unnecessary elaborations or irrelevant information. | |
| Maintain a friendly, professional tone throughout the response. | |
| Double-Check for Exclusivity: | |
| Ensure all guidance is strictly based on the retrieved context and chat history. | |
| Avoid speculating or introducing external knowledge about Mawared HR. | |
| Handling Information Gaps | |
| If the provided context is insufficient to answer the query: | |
| State explicitly that additional information is required to proceed. | |
| Clearly outline what details are missing. | |
| Avoid fabricating details or making assumptions. | |
| Critical Constraint | |
| STRICTLY rely on the provided context and chat history for all responses. Do not generate information about Mawared HR beyond these sources. | |
| Note: Do not mention a human support contact unless explicitly asked. | |
| Refuse to answer any questions thats not related to mawared Hr. | |
| You should think step by step to figure out the answer. | |
| Previous Conversation: {chat_history} | |
| Retrieved Context: {context} | |
| Current Question: {question} | |
| Answer:{{answer}} | |
| """ | |
| prompt = ChatPromptTemplate.from_template(template) | |
| def create_rag_chain(chat_history: str): | |
| chain = ( | |
| { | |
| "context": retriever, | |
| "question": RunnablePassthrough(), | |
| "chat_history": lambda x: chat_history | |
| } | |
| | prompt | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| return chain | |
| chat_history = ChatHistory() | |
| def process_stream(stream_queue: Queue, history: List[List[str]]) -> Generator[List[List[str]], None, None]: | |
| """Process the streaming response and update the chat interface""" | |
| current_response = "" | |
| while True: | |
| chunk = stream_queue.get() | |
| if chunk is None: # Signal that streaming is complete | |
| break | |
| current_response += chunk | |
| new_history = history.copy() | |
| new_history[-1][1] = current_response # Update the assistant's message | |
| yield new_history | |
| def ask_question_gradio(question: str, history: List[List[str]]) -> Generator[tuple, None, None]: | |
| try: | |
| if history is None: | |
| history = [] | |
| chat_history.add_message("user", question) | |
| formatted_history = chat_history.get_formatted_history() | |
| rag_chain = create_rag_chain(formatted_history) | |
| # Update history with user message and empty assistant message | |
| history.append([question, ""]) # User message | |
| # Create a queue for streaming responses | |
| stream_queue = Queue() | |
| # Function to process the stream in a separate thread | |
| def stream_processor(): | |
| try: | |
| for chunk in rag_chain.stream(question): | |
| stream_queue.put(chunk) | |
| stream_queue.put(None) # Signal completion | |
| except Exception as e: | |
| logger.error(f"Streaming error: {e}") | |
| stream_queue.put(None) | |
| # Start streaming in a separate thread | |
| Thread(target=stream_processor).start() | |
| # Yield updates to the chat interface | |
| response = "" | |
| for updated_history in process_stream(stream_queue, history): | |
| response = updated_history[-1][1] | |
| yield "", updated_history | |
| # Add final response to chat history | |
| chat_history.add_message("assistant", response) | |
| except Exception as e: | |
| logger.error(f"Error during question processing: {e}") | |
| if not history: | |
| history = [] | |
| history.append([question, "An error occurred. Please try again later."]) | |
| yield "", history | |
| def clear_chat(): | |
| chat_history.clear() | |
| return [], "" | |
| # Gradio Interface | |
| with gr.Blocks() as iface: | |
| gr.Image("Image.jpg", width=750, height=300, show_label=False, show_download_button=False) | |
| gr.Markdown("# Mawared HR Assistant 3.0.0") | |
| gr.Markdown('### Instructions') | |
| gr.Markdown("Ask a question about MawaredHR and get a detailed answer") | |
| chatbot = gr.Chatbot( | |
| height=750, | |
| show_label=False, | |
| bubble_full_width=False, | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=20): | |
| question_input = gr.Textbox( | |
| label="Ask a question:", | |
| placeholder="Type your question here...", | |
| show_label=False | |
| ) | |
| with gr.Column(scale=4): | |
| with gr.Row(): | |
| with gr.Column(): | |
| send_button = gr.Button("Send", variant="primary", size="sm") | |
| clear_button = gr.Button("Clear Chat", size="sm") | |
| # Handle both submit events (Enter key and Send button) | |
| submit_events = [question_input.submit, send_button.click] | |
| for submit_event in submit_events: | |
| submit_event( | |
| ask_question_gradio, | |
| inputs=[question_input, chatbot], | |
| outputs=[question_input, chatbot] | |
| ) | |
| clear_button.click( | |
| clear_chat, | |
| outputs=[chatbot, question_input] | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |