ahmedsalman82 commited on
Commit
f6a0104
Β·
verified Β·
1 Parent(s): f95e5b0

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +234 -0
app.py ADDED
@@ -0,0 +1,234 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import io
3
+ import re
4
+ import json
5
+ import PyPDF2
6
+ import gradio as gr
7
+ import numpy as np
8
+ from datetime import datetime
9
+ from typing import Optional, Dict, List
10
+ from dotenv import load_dotenv
11
+ import tiktoken
12
+ from langchain_groq import ChatGroq
13
+ from langchain.text_splitter import RecursiveCharacterTextSplitter
14
+ from langchain.memory import ConversationSummaryBufferMemory
15
+ from langchain.chains import RetrievalQA
16
+ from langchain.schema import Document
17
+ from langchain_astradb import AstraDBVectorStore
18
+ from langchain_huggingface import HuggingFaceEmbeddings
19
+
20
+ # Load environment variables
21
+ load_dotenv()
22
+
23
+ # System constants
24
+ DEBUG_MODE = False
25
+ MAX_RETRIES = 3
26
+ MODEL_TOKEN_LIMIT = 6000
27
+ DOC_TOKENS = 2500
28
+ REG_TOKENS = 1500
29
+ MEMORY_TOKENS = 1000
30
+
31
+ def log_debug(message: str) -> None:
32
+ if DEBUG_MODE:
33
+ print(f"[DEBUG {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}")
34
+
35
+ # Load API keys
36
+ try:
37
+ GROQ_API_KEY = os.getenv("GROQ_API_KEY")
38
+ ASTRA_DB_API_ENDPOINT = os.getenv("ASTRA_DB_API_ENDPOINT")
39
+ ASTRA_DB_APPLICATION_TOKEN = os.getenv("ASTRA_DB_APPLICATION_TOKEN")
40
+ if not all([GROQ_API_KEY, ASTRA_DB_API_ENDPOINT, ASTRA_DB_APPLICATION_TOKEN]):
41
+ raise ValueError("Missing API keys")
42
+ log_debug("API keys loaded")
43
+ except Exception as e:
44
+ raise ValueError(f"Failed to load API keys: {str(e)}")
45
+
46
+ # Initialize embedding model
47
+ try:
48
+ embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
49
+ log_debug("Embedding model initialized")
50
+ except Exception as e:
51
+ raise ValueError(f"Failed to initialize embedding model: {str(e)}")
52
+
53
+ # Initialize vector store
54
+ try:
55
+ astra_vectorstore = AstraDBVectorStore(
56
+ embedding=embedding_model,
57
+ collection_name="trustguardian_kb",
58
+ api_endpoint=ASTRA_DB_API_ENDPOINT,
59
+ token=ASTRA_DB_APPLICATION_TOKEN
60
+ )
61
+ retriever = astra_vectorstore.as_retriever(
62
+ search_type="mmr",
63
+ search_kwargs={"k": 6, "fetch_k": 12, "lambda_mult": 0.6}
64
+ )
65
+ log_debug("Vector store initialized")
66
+ except Exception as e:
67
+ raise ValueError(f"Failed to initialize vector store: {str(e)}")
68
+
69
+ # Initialize LLM
70
+ try:
71
+ llm = ChatGroq(groq_api_key=GROQ_API_KEY, model_name="mistral-saba-24b")
72
+ log_debug("LLM initialized")
73
+ except Exception as e:
74
+ raise ValueError(f"Failed to initialize LLM: {str(e)}")
75
+
76
+ # Initialize memory
77
+ try:
78
+ memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=8000, return_messages=True)
79
+ doc_memory = {"latest_doc": ""}
80
+ log_debug("Memory initialized")
81
+ except Exception as e:
82
+ raise ValueError(f"Failed to initialize memory: {str(e)}")
83
+
84
+ # Document processing
85
+ class DocumentProcessor:
86
+ @staticmethod
87
+ def clean_text(text: str) -> str:
88
+ text = re.sub(r'%PDF-\d+\.\d+|obj|endobj|stream|endstream|xref|trailer|startxref', '', text)
89
+ text = re.sub(r'[^\x20-\x7E\n]', '', text)
90
+ text = re.sub(r'\s+', ' ', text)
91
+ text = re.sub(r'\\n', '\n', text)
92
+ return text.strip()
93
+
94
+ @staticmethod
95
+ def test_text_quality(text: str) -> tuple:
96
+ if not text.strip():
97
+ return False, "Empty text"
98
+ words = text.split()
99
+ unique_words = set(words)
100
+ if len(words) < 10:
101
+ return False, f"Too few words: {len(words)}"
102
+ if len(unique_words) < 5:
103
+ return False, f"Too little variety: {len(unique_words)} unique words"
104
+ return True, f"Text quality good: {len(words)} words"
105
+
106
+ @staticmethod
107
+ def extract_text_from_pdf(file_data: bytes) -> str:
108
+ try:
109
+ reader = PyPDF2.PdfReader(io.BytesIO(file_data))
110
+ text_parts = [page.extract_text() for page in reader.pages if page.extract_text().strip()]
111
+ return "\n".join(text_parts)
112
+ except Exception as e:
113
+ raise ValueError(f"PDF extraction failed: {str(e)}")
114
+
115
+ def extract_text_from_uploaded_file(uploaded_file) -> str:
116
+ try:
117
+ file_data = uploaded_file.read() if hasattr(uploaded_file, 'read') else uploaded_file
118
+ text = DocumentProcessor.extract_text_from_pdf(file_data)
119
+ cleaned_text = DocumentProcessor.clean_text(text)
120
+ quality, msg = DocumentProcessor.test_text_quality(cleaned_text)
121
+ if not quality:
122
+ raise ValueError(f"Poor text quality: {msg}")
123
+ return cleaned_text
124
+ except Exception as e:
125
+ raise ValueError(f"Document processing failed: {str(e)}\nEnsure valid PDF with text content.")
126
+
127
+ # Token management
128
+ class TokenManager:
129
+ def __init__(self):
130
+ self.encoding = tiktoken.get_encoding("cl100k_base")
131
+
132
+ def count_tokens(self, text: str) -> int:
133
+ return len(self.encoding.encode(text))
134
+
135
+ def truncate_to_limit(self, text: str, max_tokens: int) -> str:
136
+ tokens = self.encoding.encode(text)
137
+ if len(tokens) > max_tokens:
138
+ tokens = tokens[:max_tokens]
139
+ return self.encoding.decode(tokens)
140
+
141
+ token_manager = TokenManager()
142
+
143
+ # Text analysis helpers
144
+ def analyze_document_structure(text: str) -> Dict:
145
+ words = text.split()
146
+ lines = text.split('\n')
147
+ return {
148
+ 'total_chars': len(text),
149
+ 'total_words': len(words),
150
+ 'total_lines': len(lines),
151
+ 'unique_words': len(set(words))
152
+ }
153
+
154
+ def extract_key_sections(text: str) -> List[str]:
155
+ section_patterns = [
156
+ r'^[A-Z][^a-z\n]{2,}[:\-]',
157
+ r'^\d+\.\s+[A-Z][^a-z]{2,}',
158
+ r'^[IVX]+\.\s+[A-Z]'
159
+ ]
160
+ return [line.strip() for line in text.split('\n') if any(re.match(p, line.strip()) for p in section_patterns)]
161
+
162
+ # Main processing logic
163
+ class TrustGuardian:
164
+ def __init__(self):
165
+ self.token_manager = TokenManager()
166
+ self.conversation_history = []
167
+
168
+ def generate_response_prompt(self, doc_text: str, user_query: str, reg_context: str = "") -> str:
169
+ return f"""
170
+ You are TrustGuardian, an expert compliance analyst. Provide precise, clear responses with exact references (e.g., "GDPR Article 32(1)(b)") where applicable.
171
+
172
+ TASK: {user_query}
173
+ {'DOCUMENT CONTENT: ' + doc_text[:2500] if doc_text else 'NO DOCUMENT'}
174
+ {'REGULATORY CONTEXT: ' + reg_context if reg_context else ''}
175
+
176
+ INSTRUCTIONS:
177
+ - For documents: Analyze relevant sections, cite document parts (e.g., "Section 3.2") and standards (e.g., "SOC 2 TSC CC6.1").
178
+ - For regulations: Cite specific sections (e.g., "HIPAA Β§164.308"), explain clearly, provide examples.
179
+ - For general queries: Explain compliance aspects, suggest best practices, note sources.
180
+ - If no reference exists, state "No specific reference available" and use general knowledge.
181
+ - Format with headings, bullets, and citations.
182
+ - Suggest next steps if relevant.
183
+ """
184
+
185
+ def process_regulatory_context(self, query: str) -> tuple:
186
+ try:
187
+ rag_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=True)
188
+ result = rag_chain.invoke({"query": query})
189
+ context = result["result"]
190
+ sources = result.get("source_documents", [])
191
+ citations = [f"{doc.metadata.get('source', 'Unknown')}: \"{doc.page_content[:150].replace('\n', ' ').strip()}...\"" for doc in sources]
192
+ return context, citations
193
+ except Exception as e:
194
+ log_debug(f"Regulatory context error: {str(e)}")
195
+ return "", []
196
+
197
+ def handle_user_input(self, upload, user_query: str) -> str:
198
+ try:
199
+ normalized_query = user_query.lower().strip()
200
+ if normalized_query in ["hi", "hello", "hey", "salaam", "salam", "hola"]:
201
+ return "πŸ‘‹ Hello! I'm TrustGuardian. Upload a PDF or ask about compliance (e.g., 'HIPAA requirements')."
202
+
203
+ doc_text = ""
204
+ if upload:
205
+ doc_text = extract_text_from_uploaded_file(upload)
206
+ analyze_document_structure(doc_text)
207
+ extract_key_sections(doc_text)
208
+
209
+ reg_context, citations = ("", []) if not any(term in normalized_query for term in ['compliance', 'regulation', 'requirement', 'law', 'standard']) else self.process_regulatory_context(user_query)
210
+ prompt = self.generate_response_prompt(doc_text, user_query, reg_context)
211
+ response = llm.invoke(prompt).content.strip()
212
+ final_response = response + ("\n\nSources:\n" + "\n".join(citations) if citations else "")
213
+ self.conversation_history.append({"user": user_query, "assistant": final_response, "timestamp": datetime.now().isoformat()})
214
+ return final_response
215
+ except Exception as e:
216
+ return f"⚠️ Error: {str(e)}\nTry rephrasing or check file format."
217
+
218
+ # Initialize and run
219
+ guardian = TrustGuardian()
220
+ ui = gr.Interface(
221
+ fn=guardian.handle_user_input,
222
+ inputs=[
223
+ gr.File(label="πŸ“„ Upload PDF", type="binary", file_types=[".pdf"]),
224
+ gr.Textbox(label="πŸ’­ Ask a Question", placeholder="E.g., 'Summarize document' or 'GDPR requirements'", lines=2)
225
+ ],
226
+ outputs=gr.Markdown(label="πŸ“ Analysis"),
227
+ title="πŸ›‘οΈ TrustGuardian – Compliance Assistant",
228
+ description="Upload a PDF or ask about compliance regulations. Get precise answers with exact references.",
229
+ examples=[[None, "What are HIPAA requirements?"], [None, "Explain GDPR basics"]],
230
+ theme=gr.themes.Soft()
231
+ )
232
+
233
+ if __name__ == "__main__":
234
+ ui.launch(server_name="0.0.0.0", server_port=7860)