import gradio as gr import os import re import json import tempfile import hashlib from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional, Union import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Optional imports for document processing try: from docx import Document DOCX_AVAILABLE = True except ImportError: DOCX_AVAILABLE = False logger.warning("python-docx not installed. DOCX processing will be disabled.") try: import PyPDF2 PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False logger.warning("PyPDF2 not installed. PDF processing will be disabled.") try: import fitz # PyMuPDF - alternative PDF processor PYMUPDF_AVAILABLE = True except ImportError: PYMUPDF_AVAILABLE = False # Optional imports for advanced text processing try: import nltk from nltk.tokenize import sent_tokenize, word_tokenize from nltk.corpus import stopwords from nltk.frequency import FreqDist from nltk.sentiment import SentimentIntensityAnalyzer NLTK_AVAILABLE = True # Download required NLTK data required_nltk_data = ['punkt', 'stopwords', 'vader_lexicon'] for data_name in required_nltk_data: try: if data_name == 'punkt': nltk.data.find('tokenizers/punkt') elif data_name == 'stopwords': nltk.data.find('corpora/stopwords') elif data_name == 'vader_lexicon': nltk.data.find('vader_lexicon') except LookupError: nltk.download(data_name, quiet=True) except ImportError: NLTK_AVAILABLE = False logger.warning("NLTK not installed. Advanced text analysis will be limited.") try: from transformers import pipeline import torch TRANSFORMERS_AVAILABLE = True DEVICE = 0 if torch.cuda.is_available() else -1 except ImportError: TRANSFORMERS_AVAILABLE = False DEVICE = -1 logger.warning("transformers not installed. AI summarization will use basic extraction methods.") class AdvancedDocumentSummarizer: """CatalystGPT-4 Advanced Document Summarizer with enhanced features""" def __init__(self): self.summarizer = None self.sentiment_analyzer = None self.cache = {} # Initialize AI models if TRANSFORMERS_AVAILABLE: self._initialize_ai_models() # Initialize sentiment analyzer if NLTK_AVAILABLE: try: self.sentiment_analyzer = SentimentIntensityAnalyzer() except Exception as e: logger.warning(f"Failed to initialize sentiment analyzer: {e}") def _initialize_ai_models(self): """Initialize AI models with error handling and fallbacks""" models_to_try = [ "facebook/bart-large-cnn", "t5-small", "google/pegasus-xsum" ] for model_name in models_to_try: try: self.summarizer = pipeline( "summarization", model=model_name, device=DEVICE, torch_dtype=torch.float16 if DEVICE >= 0 else torch.float32 ) logger.info(f"Successfully loaded {model_name}") break except Exception as e: logger.warning(f"Failed to load {model_name}: {e}") continue def _get_file_hash(self, file_path: str) -> str: """Generate hash for file caching""" try: with open(file_path, 'rb') as f: content = f.read() return hashlib.md5(content).hexdigest() except Exception: return str(datetime.now().timestamp()) def extract_text_from_pdf(self, file_path: str) -> str: """Enhanced PDF text extraction with better error handling""" text = "" # Try PyMuPDF first (generally better) if PYMUPDF_AVAILABLE: try: doc = fitz.open(file_path) for page_num, page in enumerate(doc): page_text = page.get_text() if page_text.strip(): # Only add non-empty pages text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" doc.close() if text.strip(): return text except Exception as e: logger.error(f"PyMuPDF extraction failed: {e}") # Fallback to PyPDF2 if PDF_AVAILABLE: try: with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num, page in enumerate(pdf_reader.pages): page_text = page.extract_text() if page_text.strip(): text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" if text.strip(): return text except Exception as e: logger.error(f"PyPDF2 extraction failed: {e}") return "PDF processing libraries not available or extraction failed." def extract_text_from_docx(self, file_path: str) -> str: """Enhanced DOCX extraction with better formatting preservation""" if not DOCX_AVAILABLE: return "python-docx library not available." try: doc = Document(file_path) text_parts = [] # Extract paragraphs for paragraph in doc.paragraphs: if paragraph.text.strip(): text_parts.append(paragraph.text) # Extract tables for table_num, table in enumerate(doc.tables): text_parts.append(f"\n--- Table {table_num + 1} ---") for row in table.rows: row_text = " | ".join(cell.text.strip() for cell in row.cells) if row_text.strip(): text_parts.append(row_text) return "\n".join(text_parts) except Exception as e: logger.error(f"Error processing DOCX file: {e}") return f"Error processing DOCX file: {str(e)}" def get_enhanced_document_stats(self, text: str) -> Dict: """Get comprehensive document statistics with sentiment analysis""" if not text.strip(): return {} # Basic stats word_count = len(text.split()) char_count = len(text) char_count_no_spaces = len(text.replace(' ', '')) paragraph_count = len([p for p in text.split('\n\n') if p.strip()]) stats = { 'word_count': word_count, 'character_count': char_count, 'character_count_no_spaces': char_count_no_spaces, 'paragraph_count': paragraph_count, 'estimated_reading_time': max(1, round(word_count / 200)), # 200 WPM average 'estimated_speaking_time': max(1, round(word_count / 150)) # 150 WPM speaking } if NLTK_AVAILABLE: sentences = sent_tokenize(text) stats['sentence_count'] = len(sentences) stats['avg_sentence_length'] = round(word_count / len(sentences), 1) if sentences else 0 # Word frequency analysis words = word_tokenize(text.lower()) stop_words = set(stopwords.words('english')) filtered_words = [w for w in words if w.isalpha() and w not in stop_words and len(w) > 2] if filtered_words: freq_dist = FreqDist(filtered_words) stats['top_words'] = freq_dist.most_common(15) stats['unique_words'] = len(set(filtered_words)) stats['lexical_diversity'] = round(len(set(filtered_words)) / len(filtered_words), 3) if filtered_words else 0 # Sentiment analysis if self.sentiment_analyzer: try: sentiment_scores = self.sentiment_analyzer.polarity_scores(text[:5000]) # Limit for performance stats['sentiment'] = { 'compound': round(sentiment_scores['compound'], 3), 'positive': round(sentiment_scores['pos'], 3), 'negative': round(sentiment_scores['neg'], 3), 'neutral': round(sentiment_scores['neu'], 3) } except Exception as e: logger.error(f"Sentiment analysis failed: {e}") else: # Fallback without NLTK sentences = [s.strip() for s in text.split('.') if s.strip()] stats['sentence_count'] = len(sentences) stats['avg_sentence_length'] = round(word_count / len(sentences), 1) if sentences else 0 words = re.findall(r'\b\w+\b', text.lower()) word_freq = {} for word in words: if len(word) > 2: word_freq[word] = word_freq.get(word, 0) + 1 stats['top_words'] = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:15] stats['unique_words'] = len(set(words)) return stats def advanced_extractive_summary(self, text: str, num_sentences: int = 3) -> str: """Enhanced extractive summarization with improved sentence scoring""" if not text.strip(): return "No text to summarize." if NLTK_AVAILABLE: sentences = sent_tokenize(text) else: sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()] if len(sentences) <= num_sentences: return text # Enhanced sentence scoring scored_sentences = [] total_sentences = len(sentences) # Calculate word frequencies for TF scoring all_words = re.findall(r'\b\w+\b', text.lower()) word_freq = {} for word in all_words: if len(word) > 2: word_freq[word] = word_freq.get(word, 0) + 1 # Important keywords that boost sentence scores importance_keywords = [ 'conclusion', 'summary', 'result', 'finding', 'important', 'significant', 'key', 'main', 'primary', 'essential', 'crucial', 'objective', 'goal', 'recommendation', 'suggest', 'propose', 'indicate', 'show', 'demonstrate' ] for i, sentence in enumerate(sentences): if len(sentence.split()) < 5: # Skip very short sentences continue score = 0 sentence_lower = sentence.lower() sentence_words = sentence.split() # Position scoring (beginning and end are more important) if i < total_sentences * 0.15: # First 15% score += 3 elif i > total_sentences * 0.85: # Last 15% score += 2 elif total_sentences * 0.4 <= i <= total_sentences * 0.6: # Middle section score += 1 # Length scoring (prefer moderate length) word_count = len(sentence_words) if 12 <= word_count <= 25: score += 3 elif 8 <= word_count <= 35: score += 2 elif 5 <= word_count <= 45: score += 1 # Keyword importance scoring keyword_score = sum(2 if keyword in sentence_lower else 0 for keyword in importance_keywords) score += min(keyword_score, 6) # Cap keyword bonus # TF-based scoring (frequency of important words) tf_score = 0 for word in sentence_words: word_lower = word.lower() if word_lower in word_freq and len(word_lower) > 3: tf_score += min(word_freq[word_lower], 5) # Cap individual word contribution score += min(tf_score / len(sentence_words), 3) # Normalize by sentence length # Structural indicators if any(indicator in sentence for indicator in [':', '—', '"', '(']): score += 1 # Numerical data (often important) if re.search(r'\b\d+(?:\.\d+)?%?\b', sentence): score += 1 scored_sentences.append((sentence, score, i)) # Sort by score and select top sentences scored_sentences.sort(key=lambda x: x[1], reverse=True) selected_sentences = scored_sentences[:num_sentences] # Sort selected sentences by original position to maintain flow selected_sentences.sort(key=lambda x: x[2]) return ' '.join([s[0] for s in selected_sentences]) def intelligent_chunking(self, text: str, max_chunk_size: int = 1024) -> List[str]: """Intelligently chunk text while preserving semantic boundaries""" if len(text) <= max_chunk_size: return [text] chunks = [] # Try to split by double newlines first (paragraphs) paragraphs = text.split('\n\n') current_chunk = "" for paragraph in paragraphs: # If single paragraph is too long, split by sentences if len(paragraph) > max_chunk_size: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = "" # Split long paragraph by sentences if NLTK_AVAILABLE: sentences = sent_tokenize(paragraph) else: sentences = [s.strip() for s in paragraph.split('.') if s.strip()] temp_chunk = "" for sentence in sentences: if len(temp_chunk + sentence) <= max_chunk_size: temp_chunk += sentence + ". " else: if temp_chunk: chunks.append(temp_chunk.strip()) temp_chunk = sentence + ". " if temp_chunk: current_chunk = temp_chunk else: # Normal paragraph processing if len(current_chunk + paragraph) <= max_chunk_size: current_chunk += paragraph + "\n\n" else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = paragraph + "\n\n" if current_chunk: chunks.append(current_chunk.strip()) return [chunk for chunk in chunks if chunk.strip()] def ai_summary(self, text: str, max_length: int = 150, min_length: int = 50) -> str: """Enhanced AI-powered summarization with better chunking and error handling""" if not self.summarizer: return self.advanced_extractive_summary(text) try: # Intelligent chunking chunks = self.intelligent_chunking(text, 1000) # Slightly smaller chunks for better quality if not chunks: return "No meaningful content found for summarization." summaries = [] for i, chunk in enumerate(chunks): if len(chunk.strip()) < 50: # Skip very short chunks continue try: # Adjust parameters based on chunk size chunk_max_length = min(max_length, max(50, len(chunk.split()) // 3)) chunk_min_length = min(min_length, chunk_max_length // 2) summary = self.summarizer( chunk, max_length=chunk_max_length, min_length=chunk_min_length, do_sample=False, truncation=True ) summaries.append(summary[0]['summary_text']) except Exception as e: logger.warning(f"Error summarizing chunk {i}: {e}") # Fallback to extractive summary for this chunk fallback_summary = self.advanced_extractive_summary(chunk, 2) if fallback_summary and fallback_summary != "No text to summarize.": summaries.append(fallback_summary) if not summaries: return self.advanced_extractive_summary(text) # Combine and refine summaries if len(summaries) == 1: return summaries[0] else: combined_summary = ' '.join(summaries) # If combined summary is still too long, summarize again if len(combined_summary.split()) > max_length * 1.5: try: final_summary = self.summarizer( combined_summary, max_length=max_length, min_length=min_length, do_sample=False, truncation=True ) return final_summary[0]['summary_text'] except Exception: return combined_summary[:max_length * 10] # Rough character limit fallback return combined_summary except Exception as e: logger.error(f"AI summarization failed: {e}") return self.advanced_extractive_summary(text) def generate_enhanced_key_points(self, text: str, num_points: int = 7) -> List[str]: """Generate key points with improved extraction and categorization""" if not text.strip(): return [] if NLTK_AVAILABLE: sentences = sent_tokenize(text) else: sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()] # Enhanced key point indicators with categories key_indicators = { 'conclusions': ['conclusion', 'conclude', 'result', 'outcome', 'finding', 'discovered'], 'objectives': ['objective', 'goal', 'purpose', 'aim', 'target', 'mission'], 'methods': ['method', 'approach', 'technique', 'procedure', 'process', 'way'], 'importance': ['important', 'significant', 'crucial', 'essential', 'key', 'main', 'primary'], 'recommendations': ['recommend', 'suggest', 'propose', 'should', 'must', 'need to'], 'problems': ['problem', 'issue', 'challenge', 'difficulty', 'obstacle', 'concern'], 'benefits': ['benefit', 'advantage', 'improvement', 'enhancement', 'positive', 'gain'] } scored_sentences = [] for sentence in sentences: if len(sentence.split()) < 6: # Skip very short sentences continue score = 0 sentence_lower = sentence.lower() category = 'general' # Category-based scoring for cat, indicators in key_indicators.items(): category_score = sum(2 if indicator in sentence_lower else 0 for indicator in indicators) if category_score > score: score = category_score category = cat # Structural scoring if sentence.strip().startswith(('•', '-', '1.', '2.', '3.', '4.', '5.')): score += 4 # Punctuation indicators if any(punct in sentence for punct in [':', ';', '—', '"']): score += 1 # Length scoring (prefer moderate length for key points) word_count = len(sentence.split()) if 8 <= word_count <= 20: score += 3 elif 6 <= word_count <= 30: score += 2 elif 4 <= word_count <= 40: score += 1 # Numerical data bonus if re.search(r'\b\d+(?:\.\d+)?%?\b', sentence): score += 2 # Avoid very generic sentences generic_words = ['the', 'this', 'that', 'there', 'it', 'they'] if sentence.split()[0].lower() in generic_words: score -= 1 if score > 0: scored_sentences.append((sentence.strip(), score, category)) # Sort by score and diversify by category scored_sentences.sort(key=lambda x: x[1], reverse=True) # Select diverse key points selected_points = [] used_categories = set() # First pass: get the highest scoring point from each category for sentence, score, category in scored_sentences: if len(selected_points) >= num_points: break if category not in used_categories: selected_points.append(sentence) used_categories.add(category) # Second pass: fill remaining slots with highest scoring sentences for sentence, score, category in scored_sentences: if len(selected_points) >= num_points: break if sentence not in selected_points: selected_points.append(sentence) return selected_points[:num_points] def generate_document_outline(self, text: str) -> List[str]: """Generate a structured outline of the document""" if not text.strip(): return [] lines = text.split('\n') outline = [] # Look for headers, numbered sections, etc. header_patterns = [ r'^#{1,6}\s+(.+)$', # Markdown headers r'^(\d+\.?\s+[A-Z][^.]{10,})$', # Numbered sections r'^([A-Z][A-Z\s]{5,})$', # ALL CAPS headers r'^([A-Z][a-z\s]{10,}:)$', # Title Case with colon ] for line in lines: line = line.strip() if not line: continue for pattern in header_patterns: match = re.match(pattern, line) if match: outline.append(match.group(1).strip()) break return outline[:10] # Limit to 10 outline items def process_document(self, file_path: str, summary_type: str = "ai", summary_length: str = "medium") -> Tuple[Optional[Dict], Optional[str]]: """Enhanced document processing with caching and comprehensive analysis""" if not file_path: return None, "No file provided." try: # Check cache file_hash = self._get_file_hash(file_path) cache_key = f"{file_hash}_{summary_type}_{summary_length}" if cache_key in self.cache: logger.info("Returning cached result") return self.cache[cache_key], None # Extract text based on file type file_extension = Path(file_path).suffix.lower() if file_extension == '.pdf': text = self.extract_text_from_pdf(file_path) elif file_extension == '.docx': text = self.extract_text_from_docx(file_path) elif file_extension in ['.txt', '.md', '.rtf']: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text = f.read() else: return None, f"Unsupported file type: {file_extension}" if not text.strip() or "not available" in text.lower(): return None, "No text could be extracted from the document or extraction failed." # Clean text text = re.sub(r'\n{3,}', '\n\n', text) # Reduce excessive newlines text = re.sub(r' {2,}', ' ', text) # Reduce excessive spaces # Get comprehensive statistics stats = self.get_enhanced_document_stats(text) # Generate summary based on type and length length_params = { "short": {"sentences": 2, "max_length": 80, "min_length": 30}, "medium": {"sentences": 4, "max_length": 150, "min_length": 50}, "long": {"sentences": 6, "max_length": 250, "min_length": 100}, "detailed": {"sentences": 8, "max_length": 400, "min_length": 150} } params = length_params.get(summary_length, length_params["medium"]) # Generate summary if summary_type == "ai" and self.summarizer: summary = self.ai_summary(text, params["max_length"], params["min_length"]) else: summary = self.advanced_extractive_summary(text, params["sentences"]) # Generate enhanced features key_points = self.generate_enhanced_key_points(text, 7) outline = self.generate_document_outline(text) # Calculate readability (simple approximation) avg_sentence_length = stats.get('avg_sentence_length', 0) readability_score = max(0, min(100, 100 - (avg_sentence_length * 2))) result = { 'original_text': text[:2000] + "..." if len(text) > 2000 else text, # Truncate for display 'full_text_length': len(text), 'summary': summary, 'key_points': key_points, 'outline': outline, 'stats': stats, 'readability_score': readability_score, 'file_name': Path(file_path).name, 'file_size': os.path.getsize(file_path), 'processing_time': datetime.now().isoformat(), 'summary_type': summary_type, 'summary_length': summary_length, 'model_used': 'AI (BART/T5)' if self.summarizer else 'Extractive' } # Cache result self.cache[cache_key] = result return result, None except Exception as e: logger.error(f"Document processing error: {e}") return None, f"Error processing document: {str(e)}" def create_catalyst_interface(): """Create the CatalystGPT-4 document summarizer interface""" summarizer = AdvancedDocumentSummarizer() # Enhanced CSS with modern styling css = """ .catalyst-header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; border-radius: 20px; text-align: center; margin-bottom: 25px; box-shadow: 0 10px 30px rgba(0,0,0,0.2); } .summary-container { background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); color: white; padding: 25px; border-radius: 15px; margin: 15px 0; box-shadow: 0 8px 25px rgba(0,0,0,0.15); } .stats-container { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); color: white; padding: 20px; border-radius: 12px; margin: 15px 0; box-shadow: 0 6px 20px rgba(0,0,0,0.1); } .key-points-container { background: linear-gradient(135deg, #4ecdc4 0%, #44a08d 100%); color: white; padding: 20px; border-radius: 12px; margin: 15px 0; box-shadow: 0 6px 20px rgba(0,0,0,0.1); } .outline-container { background: linear-gradient(135deg, #fa709a 0%, #fee140 100%); color: white; padding: 20px; border-radius: 12px; margin: 15px 0; box-shadow: 0 6px 20px rgba(0,0,0,0.1); } .error-container { background: linear-gradient(135deg, #ff9a9e 0%, #fecfef 100%); color: #721c24; padding: 20px; border-radius: 12px; margin: 15px 0; border-left: 5px solid #dc3545; } .control-panel { background: linear-gradient(135deg, #f6f9fc 0%, #e9ecef 100%); padding: 25px; border-radius: 15px; margin: 15px 0; border: 1px solid #dee2e6; box-shadow: 0 4px 15px rgba(0,0,0,0.05); } .file-upload-area { border: 3px dashed #007bff; border-radius: 15px; padding: 40px; text-align: center; background: linear-gradient(135deg, #f8f9ff 0%, #e3f2fd 100%); transition: all 0.3s ease; margin: 15px 0; } .file-upload-area:hover { border-color: #0056b3; background: linear-gradient(135deg, #f0f7ff 0%, #e1f5fe 100%); transform: translateY(-2px); } .metric-card { background: white; padding: 15px; border-radius: 10px; margin: 5px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); text-align: center; } .sentiment-indicator { display: inline-block; padding: 5px 12px; border-radius: 20px; font-weight: bold; font-size: 12px; margin: 2px; } .sentiment-positive { background: #d4edda; color: #155724; } .sentiment-negative { background: #f8d7da; color: #721c24; } .sentiment-neutral { background: #d1ecf1; color: #0c5460; } .progress-bar { background: #e9ecef; border-radius: 10px; overflow: hidden; height: 8px; margin: 5px 0; } .progress-fill { height: 100%; background: linear-gradient(90deg, #28a745, #20c997); transition: width 0.3s ease; } """ def format_file_size(size_bytes): """Convert bytes to human readable format""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB" def get_sentiment_indicator(sentiment_score): """Get sentiment indicator HTML""" if sentiment_score > 0.1: return 'Positive' elif sentiment_score < -0.1: return 'Negative' else: return 'Neutral' def process_and_display(file, summary_type, summary_length, enable_ai_features): """Enhanced processing with comprehensive results display""" if file is None: return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(value="""
Upload a document to begin advanced AI-powered analysis
Supports: PDF, Word (.docx), Text (.txt, .md, .rtf)
Error: {error}
Please try a different file or check the file format.
{readability:.1f}/100 - {readability_text} to read
Details: {str(e)}
Please try again or contact support if the issue persists.
Powered by AI • Extractive & Abstractive Summarization • Comprehensive Analytics
Upload any document to unlock AI-powered insights
Supports PDF, Word, Text, Markdown, and RTF files
CatalystGPT-4 - Advanced Document Analysis Platform