|
|
|
|
|
print("Starting TrustGuardian Application...") |
|
|
|
|
|
print("π Importing libraries...") |
|
import os, io, re, sys, json, numpy as np, time, fitz, tiktoken, gradio as gr, traceback |
|
from datetime import datetime |
|
from typing import Optional, Dict, List, Any |
|
from langchain_groq import ChatGroq |
|
from langchain.memory import ConversationSummaryBufferMemory |
|
from langchain.schema import Document |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
from langchain_core.prompts import PromptTemplate |
|
from pinecone import Pinecone |
|
from langchain_pinecone import PineconeVectorStore |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain.chains import ConversationalRetrievalChain |
|
print("β
Libraries imported.") |
|
|
|
|
|
print("\nβοΈ Configuring system settings...") |
|
MAX_RETRIES = 3 |
|
DEBUG_MODE = True |
|
VERSION = "2.0" |
|
MEMORY_TOKENS = 2000 |
|
MAX_HISTORY_TOKENS = 4000 |
|
MAX_DOC_TOKENS_DIRECT = 3000 |
|
MAX_RAG_TOKENS = 4000 |
|
|
|
|
|
def log_debug(message: str) -> None: |
|
"""Debug logger function""" |
|
if DEBUG_MODE: |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
print(f"[DEBUG {timestamp}] {message}") |
|
|
|
log_debug("Debug logging enabled.") |
|
|
|
|
|
print("\nβοΈ Setting up tokenizer functions...") |
|
|
|
def count_tokens_fallback(text: str) -> int: log_debug("Using estimated token count"); return len(text) // 4 |
|
def truncate_to_limit_fallback(text: str, max_tokens: int) -> str: log_debug("Using estimated truncation"); return text[:max_tokens*4] |
|
|
|
count_tokens = count_tokens_fallback |
|
truncate_to_limit = truncate_to_limit_fallback |
|
|
|
try: |
|
token_manager = tiktoken.get_encoding("cl100k_base") |
|
def count_tokens_real(text: str) -> int: |
|
try: return len(token_manager.encode(text)) |
|
except Exception as e: log_debug(f"Tiktoken count error: {e}. Falling back."); return count_tokens_fallback(text) |
|
def truncate_to_limit_real(text: str, max_tokens: int) -> str: |
|
try: tokens=token_manager.encode(text); T=tokens[:max_tokens] if len(tokens)>max_tokens else tokens; log_debug(f"Truncated tokens: {len(T)}/{len(tokens)}"); return token_manager.decode(T) |
|
except Exception as e: log_debug(f"Tiktoken truncate error: {e}. Falling back."); return truncate_to_limit_fallback(text, max_tokens) |
|
|
|
count_tokens = count_tokens_real |
|
truncate_to_limit = truncate_to_limit_real |
|
print("β
Tiktoken tokenizer functions ready.") |
|
except Exception as e: |
|
print(f"β οΈ Warning: Failed tiktoken init: {e}. Using estimated token functions.") |
|
|
|
|
|
|
|
|
|
print("\nπ Loading API keys from environment variables...") |
|
try: |
|
GROQ_API_KEY = os.environ.get("GROQ_API_KEY") |
|
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") |
|
|
|
if not GROQ_API_KEY: raise ValueError("Secret 'GROQ_API_KEY' not found in environment variables.") |
|
if not PINECONE_API_KEY: raise ValueError("Secret 'PINECONE_API_KEY' not found in environment variables.") |
|
|
|
|
|
os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY |
|
log_debug("API Keys retrieved from environment variables.") |
|
print("β
API keys ready.") |
|
except Exception as e: |
|
log_debug(f"Error loading API keys: {e}") |
|
|
|
print(f"FATAL ERROR: Could not load API keys from Secrets. Please check Space settings. Error: {e}") |
|
sys.exit(1) |
|
|
|
|
|
print("\nπ§ Initializing embedding model...") |
|
try: |
|
|
|
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
|
log_debug("Embedding model initialized.") |
|
print("β
Embedding model ready.") |
|
except Exception as e: |
|
log_debug(f"Embedding init error: {e}\n{traceback.format_exc()}") |
|
print(f"FATAL ERROR: Could not initialize embedding model: {e}") |
|
sys.exit(1) |
|
|
|
|
|
print("\nπ Setting up Pinecone vector store...") |
|
PINECONE_INDEX_NAME = "trustguardian" |
|
try: |
|
|
|
vectorstore = PineconeVectorStore.from_existing_index( |
|
index_name=PINECONE_INDEX_NAME, |
|
embedding=embedding_model |
|
) |
|
|
|
log_debug(f"Attempting connection to Pinecone index '{PINECONE_INDEX_NAME}'...") |
|
|
|
|
|
log_debug(f"Successfully initialized connection to Pinecone index '{PINECONE_INDEX_NAME}'.") |
|
print("β
Pinecone vector store ready.") |
|
except Exception as e: |
|
log_debug(f"Pinecone init error: {e}\n{traceback.format_exc()}") |
|
print(f"FATAL ERROR: Could not connect to Pinecone index '{PINECONE_INDEX_NAME}': {e}") |
|
sys.exit(1) |
|
|
|
|
|
print("\nπ€ Initializing LLM...") |
|
try: |
|
|
|
llm = ChatGroq(groq_api_key=GROQ_API_KEY, model_name="llama-3.1-8b-instant") |
|
log_debug(f"LLM initialized with model: {llm.model_name}.") |
|
print(f"β
LLM ready ({llm.model_name}).") |
|
except Exception as e: |
|
log_debug(f"LLM init error: {e}\n{traceback.format_exc()}") |
|
print(f"FATAL ERROR: Could not initialize LLM: {e}") |
|
sys.exit(1) |
|
|
|
|
|
print("\nπ Setting up conversation memory...") |
|
try: |
|
memory = ConversationSummaryBufferMemory( |
|
llm=llm, |
|
max_token_limit=MEMORY_TOKENS, |
|
return_messages=True, |
|
memory_key="chat_history", |
|
output_key='answer' |
|
) |
|
log_debug("Memory system initialized.") |
|
print("β
Memory systems ready.") |
|
except Exception as e: |
|
log_debug(f"Memory init error: {e}\n{traceback.format_exc()}") |
|
print(f"FATAL ERROR: Could not initialize memory: {e}") |
|
sys.exit(1) |
|
|
|
|
|
print("\nπ Initializing ConversationalRetrievalChain...") |
|
try: |
|
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 4, "fetch_k": 8, "lambda_mult": 0.5}) |
|
qa_chain = ConversationalRetrievalChain.from_llm( |
|
llm=llm, |
|
retriever=retriever, |
|
memory=memory, |
|
return_source_documents=True, |
|
verbose=DEBUG_MODE |
|
) |
|
print("β
ConversationalRetrievalChain ready.") |
|
except Exception as e: |
|
log_debug(f"Chain init error: {e}\n{traceback.format_exc()}") |
|
print(f"FATAL ERROR: Could not initialize qa_chain: {e}") |
|
sys.exit(1) |
|
|
|
|
|
print("\nπ Setting up document processing functions...") |
|
class DocumentProcessor: |
|
@staticmethod |
|
def clean_text(t): log_debug("Cleaning(simplified)...");t=re.sub(r'\b(obj|endobj|stream|endstream|xref|trailer|startxref)\b','',t,flags=re.IGNORECASE);t=re.sub(r'\s+',' ',t).strip();return t |
|
@staticmethod |
|
def test_text_quality(t): |
|
log_debug(f"Testing quality(len:{len(t)})..."); |
|
if not t or not t.strip():log_debug("Fail:Empty");return False,"Empty text" |
|
w=t.split();wc=len(w);uc=len(set(w));log_debug(f"W:{wc},U:{uc}") |
|
if wc<10:log_debug("Fail:W<10");return False,f"Too few words:{wc}" |
|
if uc<5:log_debug("Fail:U<5");return False,f"Too little variety:{uc}" |
|
log_debug("Pass.");return True,f"Quality OK:{wc} words" |
|
@staticmethod |
|
def extract_text_from_pdf(d): |
|
log_debug("Extracting(PyMuPDF)...");tp=[];doc=None |
|
try: |
|
doc=fitz.open(stream=d,filetype="pdf");[tp.append(p.get_text("text",sort=True))for i in range(len(doc))if(p:=doc.load_page(i))and p.get_text("text")] |
|
full_text="\\n".join(filter(None, tp));log_debug(f"Extracted len:{len(full_text)}") |
|
if not full_text: log_debug("Warning: PyMuPDF extracted no text.") |
|
return full_text |
|
except Exception as e: log_debug(f"PyMuPDF error:{e}"); raise ValueError(f"PyMuPDF failed:{e}") |
|
finally: |
|
if doc: doc.close() |
|
|
|
def extract_text_from_uploaded_file(b): |
|
log_debug("\\nπ Processing upload...");t="";ct="" |
|
try: |
|
if not isinstance(b,bytes):raise ValueError("Expected bytes.") |
|
t=DocumentProcessor.extract_text_from_pdf(b);ct=DocumentProcessor.clean_text(t) |
|
log_debug(f"Cleaned length:{len(ct)}") |
|
quality,msg=DocumentProcessor.test_text_quality(ct);log_debug(f"Quality check:{msg}") |
|
if not quality:raise ValueError(f"Poor quality:{msg}") |
|
return ct |
|
except Exception as e:err=f"Doc processing fail:{e}";log_debug(err);raise ValueError(err) |
|
print("β
Document processing functions ready.") |
|
|
|
|
|
|
|
def analyze_document_structure(t): log_debug("Analyzing doc structure (optional)..."); return {} |
|
def extract_key_sections(t): log_debug("Extracting key sections (optional)..."); return [] |
|
print("β
Text analysis helpers ready.") |
|
|
|
|
|
|
|
def query_seems_doc_specific(query: str) -> bool: |
|
query_lower=query.lower();dk=["this document","this file","uploaded document","uploaded file","summarize","summarise","analyze this","analyse this","extract from"]; is_s=any(k in query_lower for k in dk);log_debug(f"Query doc-specific check: {is_s}");return is_s |
|
|
|
|
|
print("\nπ Setting up main application logic...") |
|
class TrustGuardian: |
|
def __init__(self): log_debug("TrustGuardian initialized (uses global components)") |
|
def handle_user_input(self, upload_data: Optional[bytes], user_query: str) -> str: |
|
log_debug(f"\\nπ Processing Request: '{user_query[:100]}...'"); text_to_return="" |
|
try: |
|
normalized_query = user_query.lower().strip() |
|
if normalized_query in ["hi","hello","hey","salaam","salam","hola"]: return "π Hello! ..." |
|
|
|
doc_is_uploaded = upload_data is not None |
|
is_doc_query = doc_is_uploaded and query_seems_doc_specific(user_query) |
|
|
|
if is_doc_query: |
|
log_debug("Mode: Doc Query - Direct LLM Call") |
|
try: |
|
doc_text=extract_text_from_uploaded_file(upload_data) |
|
truncated_doc = truncate_to_limit(doc_text, MAX_DOC_TOKENS_DIRECT) |
|
prompt=f"User Query:{user_query}\n\nDocument Content(Truncated):\n{truncated_doc}\n\nInstructions:Answer based ONLY on doc." |
|
log_debug(f"Doc-only prompt (~{count_tokens(prompt)} tokens)") |
|
|
|
response_message = llm.invoke(prompt) |
|
text_to_return = response_message.content.strip(); log_debug("Generated doc-specific response.") |
|
log_debug("Skipping memory update for doc-specific query.") |
|
except Exception as e: log_debug(f"Error during doc processing/query: {e}"); text_to_return = f"β οΈ Doc Error: {e}" |
|
else: |
|
log_debug("Mode: KB/Chat Query - Using ConversationalRetrievalChain") |
|
|
|
chat_history_messages = memory.chat_memory.messages |
|
log_debug(f"Passing {len(chat_history_messages)} history messages to chain.") |
|
chain_input = {"question": user_query, "chat_history": chat_history_messages} |
|
result = qa_chain.invoke(chain_input) |
|
log_debug(f"qa_chain completed.") |
|
text_to_return = result.get("answer", "Sorry, I couldn't generate response.") |
|
if result.get("source_documents"): |
|
citations=[f"π {doc.metadata.get('source',f'Src{i+1}')}" for i,doc in enumerate(result["source_documents"])] |
|
if citations: text_to_return += "\n\n---\nπ Sources Consulted:\n" + "\n".join(list(set(citations))) |
|
except Exception as e: error_msg=f"Request error: {e}"; log_debug(f"Error: {error_msg}\n{traceback.format_exc()}"); text_to_return=f"β οΈ Error: {error_msg}" |
|
return text_to_return if text_to_return else "Unexpected issue." |
|
|
|
|
|
guardian = TrustGuardian() |
|
print("β
Main application logic ready.") |
|
|
|
|
|
|
|
print("\nπ¨ Setting up Gradio user interface...") |
|
def ui_handler(upload_file_input, query): |
|
"""Wrapper function for Gradio interface.""" |
|
try: |
|
upload_bytes=None |
|
if upload_file_input is not None: |
|
if isinstance(upload_file_input, bytes): upload_bytes = upload_file_input; log_debug(f"Received {len(upload_bytes)} bytes.") |
|
else: log_debug(f"Warning: Received unexpected type: {type(upload_file_input)}"); raise ValueError("Unexpected file data type.") |
|
else: log_debug("No file uploaded.") |
|
if not isinstance(query,str): query=str(query) if query is not None else "" |
|
|
|
response_markdown = guardian.handle_user_input(upload_bytes, query) |
|
return response_markdown |
|
except Exception as e: |
|
log_debug(f"Gradio Handler Error: {e}\n{traceback.format_exc()}") |
|
return f"β οΈ System Error in UI Handler: {str(e)}" |
|
|
|
|
|
file_input = gr.File(label="π Upload Document (PDF)", type="binary", file_types=[".pdf"]) |
|
text_input = gr.Textbox(label="π Ask a Question", placeholder="E.g., 'Summarize doc' or 'HIPAA requirements?'", lines=3) |
|
markdown_output = gr.Markdown(label="π Analysis & Response") |
|
|
|
|
|
ui = gr.Interface( |
|
fn=ui_handler, |
|
inputs=[file_input, text_input], |
|
outputs=[markdown_output], |
|
title="π‘οΈ TrustGuardian β Compliance Analysis Assistant (v" + VERSION + ")", |
|
description="Upload a PDF document for analysis (summary/Q&A based on first ~3000 tokens) or ask a general compliance question about standards like GDPR, HIPAA, NIST, ISO 27001, SOC 2, PCI DSS.", |
|
allow_flagging="never" |
|
) |
|
print("β
User interface defined.") |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
print("\nπ Launching Gradio UI...") |
|
|
|
ui.launch(server_name="0.0.0.0", server_port=7860, debug=DEBUG_MODE) |
|
print(" Gradio launch initiated. App should be running.") |