ahmedsalman82's picture
Update app.py
1227a81 verified
# app.py for TrustGuardian Hugging Face Space
print("Starting TrustGuardian Application...")
# --- πŸ”₯ Import Libraries ---
print("πŸ“š Importing libraries...")
import os, io, re, sys, json, numpy as np, time, fitz, tiktoken, gradio as gr, traceback
from datetime import datetime
from typing import Optional, Dict, List, Any
from langchain_groq import ChatGroq
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import Document
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.prompts import PromptTemplate
from pinecone import Pinecone
from langchain_pinecone import PineconeVectorStore
from langchain_core.messages import HumanMessage, AIMessage
from langchain.chains import ConversationalRetrievalChain
print("βœ… Libraries imported.")
# --- βš™οΈ System Configuration & Globals ---
print("\nβš™οΈ Configuring system settings...")
MAX_RETRIES = 3
DEBUG_MODE = True # Kept True as requested
VERSION = "2.0"
MEMORY_TOKENS = 2000
MAX_HISTORY_TOKENS = 4000
MAX_DOC_TOKENS_DIRECT = 3000 # Aggressive truncation for doc-only queries
MAX_RAG_TOKENS = 4000
# --- Logger ---
def log_debug(message: str) -> None:
"""Debug logger function"""
if DEBUG_MODE:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[DEBUG {timestamp}] {message}")
log_debug("Debug logging enabled.")
# --- Tokenizer Setup (with robust fallback) ---
print("\nβš™οΈ Setting up tokenizer functions...")
# Define Fallback Functions FIRST
def count_tokens_fallback(text: str) -> int: log_debug("Using estimated token count"); return len(text) // 4
def truncate_to_limit_fallback(text: str, max_tokens: int) -> str: log_debug("Using estimated truncation"); return text[:max_tokens*4]
# Assign default functions
count_tokens = count_tokens_fallback
truncate_to_limit = truncate_to_limit_fallback
# Try to get real Tiktoken functions
try:
token_manager = tiktoken.get_encoding("cl100k_base")
def count_tokens_real(text: str) -> int:
try: return len(token_manager.encode(text))
except Exception as e: log_debug(f"Tiktoken count error: {e}. Falling back."); return count_tokens_fallback(text)
def truncate_to_limit_real(text: str, max_tokens: int) -> str:
try: tokens=token_manager.encode(text); T=tokens[:max_tokens] if len(tokens)>max_tokens else tokens; log_debug(f"Truncated tokens: {len(T)}/{len(tokens)}"); return token_manager.decode(T)
except Exception as e: log_debug(f"Tiktoken truncate error: {e}. Falling back."); return truncate_to_limit_fallback(text, max_tokens)
# Overwrite the globals with the real functions
count_tokens = count_tokens_real
truncate_to_limit = truncate_to_limit_real
print("βœ… Tiktoken tokenizer functions ready.")
except Exception as e:
print(f"⚠️ Warning: Failed tiktoken init: {e}. Using estimated token functions.")
# --- End Tokenizer Setup ---
# --- πŸ”‘ Load API Keys from Environment Variables (Hugging Face Secrets) ---
print("\nπŸ” Loading API keys from environment variables...")
try:
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
if not GROQ_API_KEY: raise ValueError("Secret 'GROQ_API_KEY' not found in environment variables.")
if not PINECONE_API_KEY: raise ValueError("Secret 'PINECONE_API_KEY' not found in environment variables.")
# IMPORTANT: Set Pinecone key in environment for Langchain wrapper if needed
os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY
log_debug("API Keys retrieved from environment variables.")
print("βœ… API keys ready.")
except Exception as e:
log_debug(f"Error loading API keys: {e}")
# Raising SystemExit might cause issues on HF, better to print and exit gracefully?
print(f"FATAL ERROR: Could not load API keys from Secrets. Please check Space settings. Error: {e}")
sys.exit(1) # Exit if keys are missing
# --- πŸ“š Initialize Embedding Model ---
print("\n🧠 Initializing embedding model...")
try:
# Consider adding cache_folder='./models' for HF persistence if needed
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
log_debug("Embedding model initialized.")
print("βœ… Embedding model ready.")
except Exception as e:
log_debug(f"Embedding init error: {e}\n{traceback.format_exc()}")
print(f"FATAL ERROR: Could not initialize embedding model: {e}")
sys.exit(1)
# --- 🌲 Initialize Pinecone Vector Store ---
print("\nπŸ”„ Setting up Pinecone vector store...")
PINECONE_INDEX_NAME = "trustguardian" # Make sure this matches your index name
try:
# Uses PINECONE_API_KEY from environment variable set earlier
vectorstore = PineconeVectorStore.from_existing_index(
index_name=PINECONE_INDEX_NAME,
embedding=embedding_model
)
# Add a simple check to confirm connection (optional but recommended)
log_debug(f"Attempting connection to Pinecone index '{PINECONE_INDEX_NAME}'...")
# Try a dummy search or fetch stats if possible with vectorstore object
# Example: vectorstore.similarity_search("test connection", k=1)
log_debug(f"Successfully initialized connection to Pinecone index '{PINECONE_INDEX_NAME}'.")
print("βœ… Pinecone vector store ready.")
except Exception as e:
log_debug(f"Pinecone init error: {e}\n{traceback.format_exc()}")
print(f"FATAL ERROR: Could not connect to Pinecone index '{PINECONE_INDEX_NAME}': {e}")
sys.exit(1)
# --- πŸ€– Initialize LLM ---
print("\nπŸ€– Initializing LLM...")
try:
# Using llama-3.1-8b-instant
llm = ChatGroq(groq_api_key=GROQ_API_KEY, model_name="llama-3.1-8b-instant")
log_debug(f"LLM initialized with model: {llm.model_name}.")
print(f"βœ… LLM ready ({llm.model_name}).")
except Exception as e:
log_debug(f"LLM init error: {e}\n{traceback.format_exc()}")
print(f"FATAL ERROR: Could not initialize LLM: {e}")
sys.exit(1)
# --- 🧠 Initialize Memory ---
print("\nπŸ’­ Setting up conversation memory...")
try:
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=MEMORY_TOKENS,
return_messages=True,
memory_key="chat_history",
output_key='answer' # Matches chain output key
)
log_debug("Memory system initialized.")
print("βœ… Memory systems ready.")
except Exception as e:
log_debug(f"Memory init error: {e}\n{traceback.format_exc()}")
print(f"FATAL ERROR: Could not initialize memory: {e}")
sys.exit(1)
# --- πŸ”— Initialize Conversational Retrieval Chain ---
print("\nπŸ”— Initializing ConversationalRetrievalChain...")
try:
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 4, "fetch_k": 8, "lambda_mult": 0.5})
qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=retriever,
memory=memory, # Pass the memory object here
return_source_documents=True, # To display sources
verbose=DEBUG_MODE # Chain will log intermediate steps if True
)
print("βœ… ConversationalRetrievalChain ready.")
except Exception as e:
log_debug(f"Chain init error: {e}\n{traceback.format_exc()}")
print(f"FATAL ERROR: Could not initialize qa_chain: {e}")
sys.exit(1)
# --- πŸ“„ Document Processing Functions ---
print("\nπŸ“„ Setting up document processing functions...")
class DocumentProcessor:
@staticmethod
def clean_text(t): log_debug("Cleaning(simplified)...");t=re.sub(r'\b(obj|endobj|stream|endstream|xref|trailer|startxref)\b','',t,flags=re.IGNORECASE);t=re.sub(r'\s+',' ',t).strip();return t
@staticmethod
def test_text_quality(t): # Verified Multi-line formatting
log_debug(f"Testing quality(len:{len(t)})...");
if not t or not t.strip():log_debug("Fail:Empty");return False,"Empty text"
w=t.split();wc=len(w);uc=len(set(w));log_debug(f"W:{wc},U:{uc}")
if wc<10:log_debug("Fail:W<10");return False,f"Too few words:{wc}"
if uc<5:log_debug("Fail:U<5");return False,f"Too little variety:{uc}"
log_debug("Pass.");return True,f"Quality OK:{wc} words"
@staticmethod
def extract_text_from_pdf(d): # Using PyMuPDF
log_debug("Extracting(PyMuPDF)...");tp=[];doc=None
try:
doc=fitz.open(stream=d,filetype="pdf");[tp.append(p.get_text("text",sort=True))for i in range(len(doc))if(p:=doc.load_page(i))and p.get_text("text")]
full_text="\\n".join(filter(None, tp));log_debug(f"Extracted len:{len(full_text)}")
if not full_text: log_debug("Warning: PyMuPDF extracted no text.")
return full_text
except Exception as e: log_debug(f"PyMuPDF error:{e}"); raise ValueError(f"PyMuPDF failed:{e}")
finally:
if doc: doc.close() # Ensure document is closed
def extract_text_from_uploaded_file(b):
log_debug("\\nπŸ” Processing upload...");t="";ct=""
try:
if not isinstance(b,bytes):raise ValueError("Expected bytes.")
t=DocumentProcessor.extract_text_from_pdf(b);ct=DocumentProcessor.clean_text(t)
log_debug(f"Cleaned length:{len(ct)}")
quality,msg=DocumentProcessor.test_text_quality(ct);log_debug(f"Quality check:{msg}")
if not quality:raise ValueError(f"Poor quality:{msg}")
return ct
except Exception as e:err=f"Doc processing fail:{e}";log_debug(err);raise ValueError(err)
print("βœ… Document processing functions ready.")
# --- Text Analysis Helpers ---
# (Keep as is - can be removed if not called in final logic)
def analyze_document_structure(t): log_debug("Analyzing doc structure (optional)..."); return {} # Dummy implementation if not used
def extract_key_sections(t): log_debug("Extracting key sections (optional)..."); return [] # Dummy implementation if not used
print("βœ… Text analysis helpers ready.")
# --- Helper for Conditional Logic ---
def query_seems_doc_specific(query: str) -> bool: # ... (Definition as before) ...
query_lower=query.lower();dk=["this document","this file","uploaded document","uploaded file","summarize","summarise","analyze this","analyse this","extract from"]; is_s=any(k in query_lower for k in dk);log_debug(f"Query doc-specific check: {is_s}");return is_s
# --- 🧠 Main Application Class & Logic (Approach 1 - Conditional) ---
print("\nπŸ”„ Setting up main application logic...")
class TrustGuardian:
def __init__(self): log_debug("TrustGuardian initialized (uses global components)")
def handle_user_input(self, upload_data: Optional[bytes], user_query: str) -> str:
log_debug(f"\\nπŸ”„ Processing Request: '{user_query[:100]}...'"); text_to_return=""
try:
normalized_query = user_query.lower().strip()
if normalized_query in ["hi","hello","hey","salaam","salam","hola"]: return "πŸ‘‹ Hello! ..."
doc_is_uploaded = upload_data is not None
is_doc_query = doc_is_uploaded and query_seems_doc_specific(user_query)
if is_doc_query: # Mode 1: Doc-specific Query
log_debug("Mode: Doc Query - Direct LLM Call")
try:
doc_text=extract_text_from_uploaded_file(upload_data)
truncated_doc = truncate_to_limit(doc_text, MAX_DOC_TOKENS_DIRECT) # Use constant
prompt=f"User Query:{user_query}\n\nDocument Content(Truncated):\n{truncated_doc}\n\nInstructions:Answer based ONLY on doc."
log_debug(f"Doc-only prompt (~{count_tokens(prompt)} tokens)")
# Use global llm object
response_message = llm.invoke(prompt)
text_to_return = response_message.content.strip(); log_debug("Generated doc-specific response.")
log_debug("Skipping memory update for doc-specific query.")
except Exception as e: log_debug(f"Error during doc processing/query: {e}"); text_to_return = f"⚠️ Doc Error: {e}"
else: # Mode 2: KB/Chat Query
log_debug("Mode: KB/Chat Query - Using ConversationalRetrievalChain")
# Use global qa_chain object (which includes memory)
chat_history_messages = memory.chat_memory.messages # Get history in correct format
log_debug(f"Passing {len(chat_history_messages)} history messages to chain.")
chain_input = {"question": user_query, "chat_history": chat_history_messages}
result = qa_chain.invoke(chain_input) # Memory is updated by the chain
log_debug(f"qa_chain completed.")
text_to_return = result.get("answer", "Sorry, I couldn't generate response.")
if result.get("source_documents"): # Append sources
citations=[f"πŸ“š {doc.metadata.get('source',f'Src{i+1}')}" for i,doc in enumerate(result["source_documents"])]
if citations: text_to_return += "\n\n---\nπŸ“š Sources Consulted:\n" + "\n".join(list(set(citations)))
except Exception as e: error_msg=f"Request error: {e}"; log_debug(f"Error: {error_msg}\n{traceback.format_exc()}"); text_to_return=f"⚠️ Error: {error_msg}"
return text_to_return if text_to_return else "Unexpected issue."
# --- Initialize Guardian Instance ---
guardian = TrustGuardian()
print("βœ… Main application logic ready.")
# --- 🎨 Gradio Interface Definition ---
print("\n🎨 Setting up Gradio user interface...")
def ui_handler(upload_file_input, query):
"""Wrapper function for Gradio interface."""
try:
upload_bytes=None
if upload_file_input is not None:
if isinstance(upload_file_input, bytes): upload_bytes = upload_file_input; log_debug(f"Received {len(upload_bytes)} bytes.")
else: log_debug(f"Warning: Received unexpected type: {type(upload_file_input)}"); raise ValueError("Unexpected file data type.")
else: log_debug("No file uploaded.")
if not isinstance(query,str): query=str(query) if query is not None else ""
# Call main handler in the guardian instance
response_markdown = guardian.handle_user_input(upload_bytes, query)
return response_markdown
except Exception as e:
log_debug(f"Gradio Handler Error: {e}\n{traceback.format_exc()}")
return f"⚠️ System Error in UI Handler: {str(e)}"
# Define Gradio components
file_input = gr.File(label="πŸ“„ Upload Document (PDF)", type="binary", file_types=[".pdf"])
text_input = gr.Textbox(label="πŸ’­ Ask a Question", placeholder="E.g., 'Summarize doc' or 'HIPAA requirements?'", lines=3)
markdown_output = gr.Markdown(label="πŸ“ Analysis & Response")
# Define the Interface
ui = gr.Interface(
fn=ui_handler,
inputs=[file_input, text_input],
outputs=[markdown_output],
title="πŸ›‘οΈ TrustGuardian – Compliance Analysis Assistant (v" + VERSION + ")",
description="Upload a PDF document for analysis (summary/Q&A based on first ~3000 tokens) or ask a general compliance question about standards like GDPR, HIPAA, NIST, ISO 27001, SOC 2, PCI DSS.",
allow_flagging="never"
)
print("βœ… User interface defined.")
# --- Launch Gradio App ---
if __name__ == "__main__":
print("\nπŸš€ Launching Gradio UI...")
# Set server_name for HF Spaces compatibility
ui.launch(server_name="0.0.0.0", server_port=7860, debug=DEBUG_MODE) # Use port 7860 common for HF
print(" Gradio launch initiated. App should be running.")