import os import sys import json import logging import warnings from pathlib import Path from typing import List, Dict, Any, Optional, Tuple import hashlib import pickle from datetime import datetime import time import asyncio from concurrent.futures import ThreadPoolExecutor # Suppress warnings for cleaner output warnings.filterwarnings("ignore") # Core dependencies import gradio as gr import numpy as np import pandas as pd from sentence_transformers import SentenceTransformer import faiss import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline ) # Document processing from llama_index.core import Document, VectorStoreIndex, Settings from llama_index.core.node_parser import SentenceSplitter from llama_index.vector_stores.faiss import FaissVectorStore from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core import StorageContext # PDF processing from unstructured.partition.pdf import partition_pdf from llama_index.core.schema import Document as LlamaDocument # Medical knowledge validation import re # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MedicalFactChecker: """Enhanced medical fact checker with faster validation""" def __init__(self): self.medical_facts = self._load_medical_facts() self.contraindications = self._load_contraindications() self.dosage_patterns = self._compile_dosage_patterns() self.definitive_patterns = [ re.compile(r, re.IGNORECASE) for r in [ r'always\s+(?:use|take|apply)', r'never\s+(?:use|take|apply)', r'will\s+(?:cure|heal|fix)', r'guaranteed\s+to', r'completely\s+(?:safe|effective)' ] ] def _load_medical_facts(self) -> Dict[str, Any]: """Pre-loaded medical facts for Gaza context""" return { "burn_treatment": { "cool_water": "Use clean, cool (not ice-cold) water for 10-20 minutes", "no_ice": "Never apply ice directly to burns", "clean_cloth": "Cover with clean, dry cloth if available" }, "wound_care": { "pressure": "Apply direct pressure to control bleeding", "elevation": "Elevate injured limb if possible", "clean_hands": "Clean hands before treating wounds when possible" }, "infection_signs": { "redness": "Increasing redness around wound", "warmth": "Increased warmth at wound site", "pus": "Yellow or green discharge", "fever": "Fever may indicate systemic infection" } } def _load_contraindications(self) -> Dict[str, List[str]]: """Pre-loaded contraindications for common treatments""" return { "aspirin": ["children under 16", "bleeding disorders", "stomach ulcers"], "ibuprofen": ["kidney disease", "heart failure", "stomach bleeding"], "hydrogen_peroxide": ["deep wounds", "closed wounds", "eyes"], "tourniquets": ["non-life-threatening bleeding", "without proper training"] } def _compile_dosage_patterns(self) -> List[re.Pattern]: """Pre-compiled dosage patterns""" patterns = [ r'\d+\s*mg\b', # milligrams r'\d+\s*g\b', # grams r'\d+\s*ml\b', # milliliters r'\d+\s*tablets?\b', # tablets r'\d+\s*times?\s+(?:per\s+)?day\b', # frequency r'every\s+\d+\s+hours?\b' # intervals ] return [re.compile(pattern, re.IGNORECASE) for pattern in patterns] def check_medical_accuracy(self, response: str, context: str) -> Dict[str, Any]: """Enhanced medical accuracy check with Gaza-specific considerations""" issues = [] warnings = [] accuracy_score = 0.0 # Check for contraindications (faster keyword matching) response_lower = response.lower() for medication, contra_list in self.contraindications.items(): if medication in response_lower: for contra in contra_list: if any(word in response_lower for word in contra.split()): issues.append(f"Potential contraindication: {medication} with {contra}") accuracy_score -= 0.3 break # Context alignment using Jaccard similarity if context: resp_words = set(response_lower.split()) ctx_words = set(context.lower().split()) context_similarity = len(resp_words & ctx_words) / len(resp_words | ctx_words) if ctx_words else 0.0 if context_similarity < 0.5: # Lowered threshold for Gaza context warnings.append(f"Low context similarity: {context_similarity:.2f}") accuracy_score -= 0.1 else: context_similarity = 0.0 # Gaza-specific resource checks gaza_resources = ["clean water", "sterile", "hospital", "ambulance", "electricity"] if any(resource in response_lower for resource in gaza_resources): warnings.append("Consider resource limitations in Gaza context") accuracy_score -= 0.05 # Unsupported claims check for pattern in self.definitive_patterns: if pattern.search(response): issues.append(f"Unsupported definitive claim detected") accuracy_score -= 0.4 break # Dosage validation for pattern in self.dosage_patterns: if pattern.search(response): warnings.append("Dosage detected - verify with professional") accuracy_score -= 0.1 break confidence_score = max(0.0, min(1.0, 0.8 + accuracy_score)) return { "confidence_score": confidence_score, "issues": issues, "warnings": warnings, "context_similarity": context_similarity, "is_safe": len(issues) == 0 and confidence_score > 0.5 } class EnhancedGazaKnowledgeBase: """Enhanced knowledge base with better embeddings and indexing""" def __init__(self, data_dir: str = "./data"): self.data_dir = Path(data_dir) self.embedding_model = None self.vector_store = None self.index = None self.chunk_metadata = [] self.index_path = self.data_dir / "enhanced_vector_store" # Enhanced medical priorities for Gaza context self.medical_priorities = { "trauma": ["gunshot", "blast", "burns?", "fracture", "shrapnel", "explosion"], "infectious": ["cholera", "dysentery", "infection", "sepsis", "wound infection"], "chronic": ["diabetes", "hypertension", "malnutrition", "kidney", "heart"], "emergency": ["cardiac", "bleeding", "airway", "unconscious", "shock"], "gaza_specific": ["siege", "blockade", "limited supplies", "no electricity", "water shortage"] } def initialize(self): """Enhanced initialization with better embedding model""" if not self.index_path.exists(): self.index_path.mkdir(parents=True) # Use a more powerful medical embedding model device = 'cuda' if torch.cuda.is_available() else 'cpu' # Try to use a medical-specific embedding model, fallback to general model try: # First try a medical-specific model (if available) self.embedding_model = HuggingFaceEmbedding( model_name="sentence-transformers/all-mpnet-base-v2", # Higher dimension (768) device=device, embed_batch_size=4 ) logger.info("Using all-mpnet-base-v2 (768-dim) embedding model") except Exception as e: logger.warning(f"Failed to load preferred model, using fallback: {e}") self.embedding_model = HuggingFaceEmbedding( model_name="sentence-transformers/all-MiniLM-L6-v2", device=device, embed_batch_size=4 ) logger.info("Using all-MiniLM-L6-v2 (384-dim) embedding model") # Configure global settings Settings.embed_model = self.embedding_model Settings.chunk_size = 512 # Increased chunk size for better context Settings.chunk_overlap = 50 # Increased overlap # Check for existing index if (self.index_path / "index.faiss").exists() and (self.index_path / "docstore.json").exists(): self._load_vector_store() else: self._create_vector_store() def _batch_embed_with_retry(self, texts, batch_size=16, max_retries=3, delay=2): """ Embed texts in batches with retry fallback and logging """ embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] for attempt in range(max_retries): try: batch_embeddings = self.embedding_model.get_text_embedding_batch(batch) embeddings.extend(batch_embeddings) break # success except Exception as e: if attempt < max_retries - 1: logger.warning(f"Batch {i}-{i+len(batch)} failed (attempt {attempt+1}): {e}. Retrying...") time.sleep(delay * (attempt + 1)) else: logger.error(f"ā Final failure embedding batch {i}-{i+len(batch)}: {e}") for text in batch: try: embeddings.append(self.embedding_model.get_text_embedding(text)) except Exception as sub_e: logger.error(f"Failed to embed single text: {sub_e} ā {text[:60]}...") return embeddings def _load_vector_store(self): """Load existing vector store with error handling""" try: # Load the FAISS index directly faiss_index = faiss.read_index(str(self.index_path / "index.faiss")) vector_store = FaissVectorStore(faiss_index=faiss_index) # Create storage context storage_context = StorageContext.from_defaults( vector_store=vector_store, persist_dir=str(self.index_path) ) # Load the index self.index = VectorStoreIndex.load( storage_context=storage_context ) # Load metadata metadata_path = self.index_path / "metadata.pkl" if metadata_path.exists(): with open(metadata_path, 'rb') as f: self.chunk_metadata = pickle.load(f) logger.info(f"Loaded existing vector store with {len(self.chunk_metadata)} chunks") except Exception as e: logger.error(f"Error loading vector store: {e}") # Fallback to creating new store if loading fails self._create_vector_store() def _create_vector_store(self): """Create enhanced vector store with IVF indexing""" documents = self._load_documents() if not documents: logger.warning("No documents found. Creating empty index") self.chunk_metadata = [] return # Determine embedding dimension try: test_embedding = self.embedding_model.get_text_embedding("test") dimension = len(test_embedding) logger.info(f"Embedding dimension: {dimension}") except Exception as e: logger.error(f"Failed to determine embedding dimension: {e}") dimension = 768 # Default for all-mpnet-base-v2 # Create enhanced FAISS index with IVF for better performance try: # For small datasets, use flat index; for larger ones, use IVF if len(documents) < 1000: faiss_index = faiss.IndexFlatL2(dimension) logger.info("Using IndexFlatL2 for small dataset") else: # Use IVF with reasonable number of clusters nlist = min(100, len(documents) // 10) # Adaptive cluster count quantizer = faiss.IndexFlatL2(dimension) faiss_index = faiss.IndexIVFFlat(quantizer, dimension, nlist) logger.info(f"Using IndexIVFFlat with {nlist} clusters") except Exception as e: logger.error(f"Failed to create enhanced index, using flat: {e}") faiss_index = faiss.IndexFlatL2(dimension) vector_store = FaissVectorStore(faiss_index=faiss_index) # Create storage context storage_context = StorageContext.from_defaults( vector_store=vector_store ) # Configure node parser with enhanced settings parser = SentenceSplitter( chunk_size=Settings.chunk_size, chunk_overlap=Settings.chunk_overlap, include_prev_next_rel=True # Include relationships for better context ) # Create index using global settings self.index = VectorStoreIndex.from_documents( documents, storage_context=storage_context, transformations=[parser], show_progress=True ) # Train IVF index if needed if hasattr(faiss_index, 'train') and not faiss_index.is_trained: logger.info("Training IVF index...") # Get some embeddings for training sample_texts = [doc.text[:500] for doc in documents[:100]] # Sample for training sample_embeddings = np.array(self._batch_embed_with_retry(sample_texts, batch_size=16)).astype('float32') faiss_index.train(sample_embeddings) logger.info("IVF index training completed") # Save metadata self.chunk_metadata = [ {"text": node.text, "source": node.metadata.get("source", "unknown")} for node in self.index.docstore.docs.values() ] # Persist the index self.index.storage_context.persist(persist_dir=str(self.index_path)) # Save metadata separately with open(self.index_path / "metadata.pkl", 'wb') as f: pickle.dump(self.chunk_metadata, f) logger.info(f"Created enhanced vector store with {len(self.chunk_metadata)} chunks") def _load_documents(self) -> List[Document]: """Enhanced document loading with better caching""" documents = [] doc_cache = self.index_path / "document_cache.pkl" # Try loading from cache if doc_cache.exists(): try: with open(doc_cache, 'rb') as f: cached_data = pickle.load(f) if isinstance(cached_data, dict) and 'documents' in cached_data: cached_docs = cached_data['documents'] if isinstance(cached_docs, list) and all(isinstance(d, Document) for d in cached_docs): logger.info(f"Loaded {len(cached_docs)} documents from cache") return cached_docs logger.warning("Document cache format invalid") except Exception as e: logger.warning(f"Document cache corrupted: {e}") # Process files with enhanced error handling processed_files = [] for pdf_file in self.data_dir.glob("*.pdf"): try: doc_text = self._extract_pdf_text(pdf_file) if doc_text and len(doc_text.strip()) > 100: # Minimum content check documents.append(Document( text=doc_text, metadata={ "source": str(pdf_file.name), "type": "pdf", "file_size": pdf_file.stat().st_size, "processed_date": datetime.now().isoformat() } )) processed_files.append(str(pdf_file.name)) logger.info(f"Processed: {pdf_file.name} ({len(doc_text)} chars)") except Exception as e: logger.error(f"Error loading {pdf_file}: {e}") # Process text files as well for txt_file in self.data_dir.glob("*.txt"): try: with open(txt_file, 'r', encoding='utf-8') as f: doc_text = f.read() if doc_text and len(doc_text.strip()) > 100: documents.append(Document( text=doc_text, metadata={ "source": str(txt_file.name), "type": "txt", "file_size": txt_file.stat().st_size, "processed_date": datetime.now().isoformat() } )) processed_files.append(str(txt_file.name)) logger.info(f"Processed: {txt_file.name} ({len(doc_text)} chars)") except Exception as e: logger.error(f"Error loading {txt_file}: {e}") # Save to cache if we found documents if documents: cache_data = { 'documents': documents, 'processed_files': processed_files, 'cache_date': datetime.now().isoformat() } with open(doc_cache, 'wb') as f: pickle.dump(cache_data, f) logger.info(f"Cached {len(documents)} documents") return documents def _extract_pdf_text(self, pdf_path: Path) -> str: """Use unstructured to extract and chunk PDF text by title, and save as .txt""" try: elements = partition_pdf(filename=str(pdf_path), strategy="auto") if not elements: logger.warning(f"No elements extracted from {pdf_path}") return "" # Group by title (section-aware) grouped = {} current_title = "Untitled Section" for el in elements: if el.category == "Title" and el.text.strip(): current_title = el.text.strip() elif el.text.strip(): grouped.setdefault(current_title, []).append(el.text.strip()) # Recombine into logical chunks sections = [] for title, paras in grouped.items(): section_text = f"{title}\n" + "\n".join(paras) sections.append(section_text.strip()) full_text = "\n\n".join(sections) if len(full_text.strip()) < 100: logger.warning(f"Extracted text too short from {pdf_path}") return "" # Save extracted output to .txt next to original PDF txt_output = pdf_path.with_suffix(".extracted.txt") with open(txt_output, "w", encoding="utf-8") as f: f.write(full_text) logger.info(f"Saved extracted text to {txt_output.name}") return full_text except Exception as e: logger.error(f"Unstructured PDF parse failed for {pdf_path}: {e}") return "" def search(self, query: str, k: int = 5) -> List[Dict[str, Any]]: """Enhanced search with better error handling and result processing""" if not self.index: logger.warning("Index not available for search") return [] try: retriever = self.index.as_retriever(similarity_top_k=k) results = retriever.retrieve(query) # FIX: Handle the tuple object error by properly extracting node and score processed_results = [] for result in results: try: # Handle both tuple and direct node results if isinstance(result, tuple): node, score = result else: node = result score = getattr(result, 'score', 0.0) # Extract text safely text = getattr(node, 'text', str(node)) source = node.metadata.get("source", "unknown") if hasattr(node, 'metadata') else "unknown" processed_results.append({ "text": text, "source": source, "score": float(score) if score is not None else 0.0, "medical_priority": self._assess_priority(text) }) except Exception as e: logger.error(f"Error processing search result: {e}") continue # Sort by score (higher is better) processed_results.sort(key=lambda x: x['score'], reverse=True) logger.info(f"Search returned {len(processed_results)} results for query: {query[:50]}...") return processed_results except Exception as e: logger.error(f"Error during search: {e}") return [] def _assess_priority(self, text: str) -> str: """Enhanced medical priority assessment""" text_lower = text.lower() # Check priorities in order of importance priority_order = ["emergency", "trauma", "gaza_specific", "infectious", "chronic"] for priority in priority_order: keywords = self.medical_priorities.get(priority, []) if any(re.search(keyword, text_lower) for keyword in keywords): return priority return "general" class EnhancedGazaRAGSystem: """Enhanced RAG system with better performance and error handling""" def __init__(self): self.knowledge_base = EnhancedGazaKnowledgeBase() self.fact_checker = MedicalFactChecker() self.llm = None self.tokenizer = None self.system_prompt = self._create_system_prompt() self.generation_pipeline = None self.response_cache = {} # Simple response caching self.executor = ThreadPoolExecutor(max_workers=2) # For async processing def initialize(self): """Enhanced initialization with better error handling""" logger.info("Initializing Enhanced Gaza RAG System...") try: self.knowledge_base.initialize() logger.info("Knowledge base initialized successfully") except Exception as e: logger.error(f"Failed to initialize knowledge base: {e}") raise # Lazy LLM loading - will load on first request logger.info("RAG system ready (LLM will load on first request)") def _initialize_llm(self): """Enhanced LLM initialization with better error handling""" if self.llm is not None: return model_name = "microsoft/Phi-3-mini-4k-instruct" try: logger.info(f"Loading LLM: {model_name}") # Enhanced quantization configuration quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16, ) self.tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=True, padding_side="left" # Better for generation ) # Add pad token if missing if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.llm = AutoModelForCausalLM.from_pretrained( model_name, quantization_config=quantization_config, device_map="auto", trust_remote_code=True, torch_dtype=torch.float16, low_cpu_mem_usage=True ) # Create enhanced pipeline self.generation_pipeline = pipeline( "text-generation", model=self.llm, tokenizer=self.tokenizer, device_map="auto", torch_dtype=torch.float16, return_full_text=False # Only return generated text ) logger.info("LLM loaded successfully") except Exception as e: logger.error(f"Error loading primary model: {e}") self._initialize_fallback_llm() def _initialize_fallback_llm(self): """Enhanced fallback model with better error handling""" try: logger.info("Loading fallback model...") fallback_model = "microsoft/DialoGPT-small" self.tokenizer = AutoTokenizer.from_pretrained(fallback_model) self.llm = AutoModelForCausalLM.from_pretrained( fallback_model, torch_dtype=torch.float32, low_cpu_mem_usage=True ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.generation_pipeline = pipeline( "text-generation", model=self.llm, tokenizer=self.tokenizer, return_full_text=False ) logger.info("Fallback model loaded successfully") except Exception as e: logger.error(f"Fallback model failed: {e}") self.llm = None self.generation_pipeline = None def _create_system_prompt(self) -> str: """Enhanced system prompt for Gaza context""" return """You are a medical AI assistant specifically designed for Gaza healthcare workers operating under siege conditions. CRITICAL GUIDELINES: - Provide practical first aid guidance considering limited resources (water, electricity, medical supplies) - Always prioritize patient safety and recommend professional medical help when available - Consider Gaza's specific challenges: blockade, limited hospitals, frequent power outages - Suggest alternative treatments when standard medical supplies are unavailable - Never provide definitive diagnoses - only supportive care guidance - Be culturally sensitive and aware of the humanitarian crisis context RESOURCE CONSTRAINTS TO CONSIDER: - Limited clean water availability - Frequent electricity outages - Restricted medical supply access - Overwhelmed healthcare facilities - Limited transportation for medical emergencies Provide clear, actionable advice while emphasizing the need for professional medical care when possible.""" async def generate_response_async(self, query: str, progress_callback=None) -> Dict[str, Any]: """Async response generation with progress tracking""" start_time = time.time() if progress_callback: progress_callback(0.1, "Checking cache...") # Check cache first query_hash = hashlib.md5(query.encode()).hexdigest() if query_hash in self.response_cache: cached_response = self.response_cache[query_hash] cached_response["cached"] = True cached_response["response_time"] = 0.1 if progress_callback: progress_callback(1.0, "Retrieved from cache!") return cached_response try: if progress_callback: progress_callback(0.2, "Initializing LLM...") # Initialize LLM only when needed if self.llm is None: await asyncio.get_event_loop().run_in_executor( self.executor, self._initialize_llm ) if progress_callback: progress_callback(0.4, "Searching knowledge base...") # Enhanced knowledge retrieval search_results = await asyncio.get_event_loop().run_in_executor( self.executor, self.knowledge_base.search, query, 3 ) if progress_callback: progress_callback(0.6, "Preparing context...") context = self._prepare_context(search_results) if progress_callback: progress_callback(0.8, "Generating response...") # Generate response response = await asyncio.get_event_loop().run_in_executor( self.executor, self._generate_response, query, context ) if progress_callback: progress_callback(0.9, "Validating safety...") # Enhanced safety check safety_check = self.fact_checker.check_medical_accuracy(response, context) # Prepare final response final_response = self._prepare_final_response( response, search_results, safety_check, time.time() - start_time ) # Cache the response (limit cache size) if len(self.response_cache) < 100: self.response_cache[query_hash] = final_response if progress_callback: progress_callback(1.0, "Complete!") return final_response except Exception as e: logger.error(f"Error generating response: {e}") if progress_callback: progress_callback(1.0, f"Error: {str(e)}") return self._create_error_response(str(e)) def _generate_response(self, query: str, context: str) -> str: """Enhanced response generation using model.generate() to avoid DynamicCache errors""" if self.llm is None or self.tokenizer is None: return self._generate_fallback_response(query, context) # Build prompt with Gaza-specific context prompt = f"""{self.system_prompt} MEDICAL KNOWLEDGE CONTEXT: {context} PATIENT QUESTION: {query} RESPONSE (provide practical, Gaza-appropriate medical guidance):""" try: # Tokenize and move to correct device inputs = self.tokenizer(prompt, return_tensors="pt").to(self.llm.device) # Generate the response outputs = self.llm.generate( **inputs, max_new_tokens=800, temperature=0.5, pad_token_id=self.tokenizer.eos_token_id, do_sample=True, repetition_penalty=1.15, ) # Decode and clean up response_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) lines = response_text.split('\n') unique_lines = [] for line in lines: line = line.strip() if line and line not in unique_lines: unique_lines.append(line) return '\n'.join(unique_lines) except Exception as e: logger.error(f"Error in LLM generate(): {e}") return self._generate_fallback_response(query, context) # Decode and clean up response_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) lines = response_text.split('\n') unique_lines = [] for line in lines: line = line.strip() if line and line not in unique_lines: unique_lines.append(line) return '\n'.join(unique_lines) def _prepare_context(self, search_results: List[Dict[str, Any]]) -> str: """Enhanced context preparation with better formatting""" if not search_results: return "No specific medical guidance found in knowledge base. Provide general first aid principles." context_parts = [] for i, result in enumerate(search_results, 1): source = result.get('source', 'unknown') text = result.get('text', '') priority = result.get('medical_priority', 'general') # Truncate long text but preserve important information if len(text) > 400: text = text[:400] + "..." context_parts.append(f"[Source {i}: {source} - Priority: {priority}]\n{text}") return "\n\n".join(context_parts) def _generate_response(self, query: str, context: str) -> str: """Enhanced response generation with better prompting""" if not self.generation_pipeline: return self._generate_fallback_response(query, context) # Enhanced prompt structure prompt = f"""{self.system_prompt} MEDICAL KNOWLEDGE CONTEXT: {context} PATIENT QUESTION: {query} RESPONSE (provide practical, Gaza-appropriate medical guidance):""" try: # Enhanced generation parameters response = self.generation_pipeline( prompt, max_new_tokens=300, # Increased for more detailed responses temperature=0.2, # Lower for more consistent medical advice do_sample=True, pad_token_id=self.tokenizer.eos_token_id, repetition_penalty=1.15, truncation=True, num_return_sequences=1 ) if response and len(response) > 0: generated_text = response[0]['generated_text'] # Clean up the response generated_text = generated_text.strip() # Remove any repetitive patterns lines = generated_text.split('\n') unique_lines = [] for line in lines: if line.strip() and line.strip() not in unique_lines: unique_lines.append(line.strip()) return '\n'.join(unique_lines) else: return self._generate_fallback_response(query, context) except Exception as e: logger.error(f"Error in LLM generation: {e}") return self._generate_fallback_response(query, context) def _generate_fallback_response(self, query: str, context: str) -> str: """Enhanced fallback response with Gaza-specific guidance""" gaza_guidance = { "burn": "For burns: Use clean, cool water if available. If water is scarce, use clean cloth. Avoid ice. Seek medical help urgently.", "bleeding": "For bleeding: Apply direct pressure with clean cloth. Elevate if possible. If severe, seek immediate medical attention.", "wound": "For wounds: Clean hands if possible. Apply pressure to stop bleeding. Cover with clean material. Watch for infection signs.", "infection": "Signs of infection: Redness, warmth, swelling, pus, fever. Seek medical care immediately if available.", "pain": "For pain management: Rest, elevation, cold/warm compress as appropriate. Avoid aspirin in children." } query_lower = query.lower() for condition, guidance in gaza_guidance.items(): if condition in query_lower: return f"{guidance}\n\nContext from medical sources:\n{context[:200]}..." return f"Medical guidance for: {query}\n\nGeneral advice: Prioritize safety, seek professional help when available, consider resource limitations in Gaza.\n\nRelevant information:\n{context[:300]}..." def _prepare_final_response( self, response: str, search_results: List[Dict[str, Any]], safety_check: Dict[str, Any], response_time: float ) -> Dict[str, Any]: """Enhanced final response preparation with more metadata""" # Add safety warnings if needed if not safety_check["is_safe"]: response = f"ā ļø MEDICAL CAUTION: {response}\n\nšØ Please verify this guidance with a medical professional when possible." # Add Gaza-specific disclaimer response += "\n\nš Gaza Context: This guidance considers resource limitations. Adapt based on available supplies and seek professional medical care when accessible." # Extract unique sources sources = list(set(res.get("source", "unknown") for res in search_results)) if search_results else [] # Calculate confidence based on multiple factors base_confidence = safety_check.get("confidence_score", 0.5) context_bonus = 0.1 if search_results else 0.0 safety_penalty = 0.2 if not safety_check.get("is_safe", True) else 0.0 final_confidence = max(0.0, min(1.0, base_confidence + context_bonus - safety_penalty)) return { "response": response, "confidence": final_confidence, "sources": sources, "search_results_count": len(search_results), "safety_issues": safety_check.get("issues", []), "safety_warnings": safety_check.get("warnings", []), "response_time": round(response_time, 2), "timestamp": datetime.now().isoformat()[:19], "cached": False } def _create_error_response(self, error_msg: str) -> Dict[str, Any]: """Enhanced error response with helpful information""" return { "response": f"ā ļø System Error: Unable to process your medical query at this time.\n\nError: {error_msg}\n\nšØ For immediate medical emergencies, seek professional help directly.\n\nš Gaza Emergency Numbers:\n- Palestinian Red Crescent: 101\n- Civil Defense: 102", "confidence": 0.0, "sources": [], "search_results_count": 0, "safety_issues": ["System error occurred"], "safety_warnings": ["Unable to validate medical accuracy"], "response_time": 0.0, "timestamp": datetime.now().isoformat()[:19], "cached": False, "error": True } # Global system instance enhanced_rag_system = None def initialize_enhanced_system(): """Initialize enhanced system with better error handling""" global enhanced_rag_system if enhanced_rag_system is None: try: enhanced_rag_system = EnhancedGazaRAGSystem() enhanced_rag_system.initialize() logger.info("Enhanced Gaza RAG System initialized successfully") except Exception as e: logger.error(f"Failed to initialize enhanced system: {e}") raise return enhanced_rag_system def process_medical_query_with_progress(query: str, progress=gr.Progress()) -> Tuple[str, str, str]: """Enhanced query processing with detailed progress tracking and status updates""" if not query.strip(): return "Please enter a medical question.", "", "ā ļø No query provided" try: # Initialize system with progress progress(0.05, desc="š§ Initializing system...") system = initialize_enhanced_system() # Create async event loop for progress tracking loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) def progress_callback(value, desc): progress(value, desc=desc) try: # Run async generation with progress result = loop.run_until_complete( system.generate_response_async(query, progress_callback) ) finally: loop.close() # Prepare response with enhanced metadata response = result["response"] # Prepare detailed metadata metadata_parts = [ f"šÆ Confidence: {result['confidence']:.1%}", f"ā±ļø Response: {result['response_time']}s", f"š Sources: {result['search_results_count']} found" ] if result.get('cached'): metadata_parts.append("š¾ Cached") if result.get('sources'): metadata_parts.append(f"š Refs: {', '.join(result['sources'][:2])}") metadata = " | ".join(metadata_parts) # Prepare status with warnings/issues status_parts = [] if result.get('safety_warnings'): status_parts.append(f"ā ļø {len(result['safety_warnings'])} warnings") if result.get('safety_issues'): status_parts.append(f"šØ {len(result['safety_issues'])} issues") if not status_parts: status_parts.append("ā Safe response") status = " | ".join(status_parts) return response, metadata, status except Exception as e: logger.error(f"Error processing query: {e}") error_response = f"ā ļø Error processing your query: {str(e)}\n\nšØ For medical emergencies, seek immediate professional help." error_metadata = f"ā Error at {datetime.now().strftime('%H:%M:%S')}" error_status = "šØ System error occurred" return error_response, error_metadata, error_status def create_advanced_gradio_interface(): """Create advanced Gradio interface with modern design and enhanced UX""" # Advanced CSS with medical theme and animations css = """ @import url('https://fonts.googleapis.com/css2?family=Love+Ya+Like+A+Sister&display=swap'); * { font-family: 'Love Ya Like A Sister', cursive !important; } .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; } .main-container { background: rgba(255, 255, 255, 0.95); backdrop-filter: blur(10px); border-radius: 20px; padding: 30px; margin: 20px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); border: 1px solid rgba(255,255,255,0.2); } .header-section { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 15px; padding: 25px; margin-bottom: 25px; text-align: center; box-shadow: 0 10px 30px rgba(102, 126, 234, 0.3); } .query-container { background: linear-gradient(135deg, #f8f9ff 0%, #e8f2ff 100%); border-radius: 15px; padding: 20px; margin: 15px 0; border: 2px solid #667eea; transition: all 0.3s ease; } .query-container:hover { transform: translateY(-2px); box-shadow: 0 10px 25px rgba(102, 126, 234, 0.2); } .query-input { border: none !important; background: white !important; border-radius: 12px !important; padding: 15px !important; font-size: 16px !important; box-shadow: 0 4px 15px rgba(0,0,0,0.1) !important; transition: all 0.3s ease !important; } .query-input:focus { transform: scale(1.02) !important; box-shadow: 0 8px 25px rgba(102, 126, 234, 0.3) !important; } .response-container { background: linear-gradient(135deg, #fff 0%, #f8f9ff 100%); border-radius: 15px; padding: 20px; margin: 15px 0; border: 2px solid #4CAF50; min-height: 300px; } .response-output { border: none !important; background: transparent !important; font-size: 15px !important; line-height: 1.7 !important; color: #2c3e50 !important; } .metadata-container { background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); border-radius: 12px; padding: 15px; margin: 10px 0; border-left: 5px solid #2196F3; } .metadata-output { border: none !important; background: transparent !important; font-size: 13px !important; color: #1565c0 !important; font-weight: 500 !important; } .status-container { background: linear-gradient(135deg, #e8f5e8 0%, #c8e6c9 100%); border-radius: 12px; padding: 15px; margin: 10px 0; border-left: 5px solid #4CAF50; } .status-output { border: none !important; background: transparent !important; font-size: 13px !important; color: #2e7d32 !important; font-weight: 500 !important; } .submit-btn { background: linear-gradient(135deg, #4CAF50 0%, #45a049 100%) !important; color: white !important; border: none !important; border-radius: 12px !important; padding: 15px 30px !important; font-size: 16px !important; font-weight: 600 !important; cursor: pointer !important; transition: all 0.3s ease !important; box-shadow: 0 6px 20px rgba(76, 175, 80, 0.3) !important; } .submit-btn:hover { transform: translateY(-3px) !important; box-shadow: 0 10px 30px rgba(76, 175, 80, 0.4) !important; } .clear-btn { background: linear-gradient(135deg, #ff7043 0%, #ff5722 100%) !important; color: white !important; border: none !important; border-radius: 12px !important; padding: 15px 25px !important; font-size: 14px !important; font-weight: 500 !important; transition: all 0.3s ease !important; } .clear-btn:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 20px rgba(255, 87, 34, 0.3) !important; } .emergency-notice { background: linear-gradient(135deg, #ffebee 0%, #ffcdd2 100%); border: 2px solid #f44336; border-radius: 15px; padding: 20px; margin: 20px 0; color: #c62828; font-weight: 600; animation: pulse 2s infinite; } @keyframes pulse { 0% { box-shadow: 0 0 0 0 rgba(244, 67, 54, 0.4); } 70% { box-shadow: 0 0 0 10px rgba(244, 67, 54, 0); } 100% { box-shadow: 0 0 0 0 rgba(244, 67, 54, 0); } } .gaza-context { background: linear-gradient(135deg, #e8f5e8 0%, #c8e6c9 100%); border: 2px solid #4caf50; border-radius: 15px; padding: 20px; margin: 20px 0; color: #2e7d32; font-weight: 500; } .sidebar-container { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); border-radius: 15px; padding: 20px; margin: 10px 0; border: 1px solid rgba(0,0,0,0.1); } .example-container { background: white; border-radius: 12px; padding: 20px; margin: 15px 0; box-shadow: 0 4px 15px rgba(0,0,0,0.1); } .progress-container { margin: 15px 0; padding: 10px; background: rgba(255,255,255,0.8); border-radius: 10px; } .footer-section { background: linear-gradient(135deg, #37474f 0%, #263238 100%); color: white; border-radius: 15px; padding: 20px; margin-top: 30px; text-align: center; } /* GLOBAL TEXT FIXES */ .gradio-container, .query-container, .response-container, .metadata-container, .status-container { color: white !important; } .query-input, .response-output, .metadata-output, .status-output { color: white !important; background-color: rgba(0, 0, 0, 0.2) !important; } /* BANNER-INSPIRED PANEL BACKGROUNDS */ .query-container, .response-container, .metadata-container, .status-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; border: 2px solid #ffffff22 !important; border-radius: 15px !important; box-shadow: 0 10px 30px rgba(102, 126, 234, 0.3); } /* EXAMPLE SECTION BUTTON STYLING */ .example-container .example { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; font-weight: 600 !important; border-radius: 12px !important; padding: 15px !important; margin: 10px !important; text-align: center !important; box-shadow: 0 6px 20px rgba(0, 0, 0, 0.1); transition: all 0.3s ease; cursor: pointer; } .example-container .example:hover { transform: scale(1.03); box-shadow: 0 10px 30px rgba(102, 126, 234, 0.4); } /* MAKE HEADER + EXAMPLES MORE PROMINENT */ .header-section { color: white !important; text-shadow: 0px 0px 6px rgba(0,0,0,0.4); } .example-container { margin-top: -20px !important; } """ with gr.Blocks( css=css, title="š„ Advanced Gaza First Aid Assistant", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="green", neutral_hue="slate" ) ) as interface: # Header Section with gr.Row(elem_classes=["main-container"]): gr.HTML("""
Enhanced with 768-dimensional medical embeddings ⢠Advanced FAISS indexing ⢠Real-time safety validation
For life-threatening emergencies, seek immediate professional medical attention.
š Gaza Emergency Contacts: Palestinian Red Crescent (101) | Civil Defense (102)
This advanced AI system is specifically designed for Gaza's challenging conditions including limited resources, frequent power outages, and restricted medical supply access. All guidance considers these constraints and provides practical alternatives when standard treatments are unavailable.