import spaces import streamlit as st from PIL import Image import torch from transformers import ( DonutProcessor, VisionEncoderDecoderModel, LayoutLMv3Processor, LayoutLMv3ForSequenceClassification, AutoProcessor, AutoModelForCausalLM, AutoModelForVisualQuestionAnswering ) from ultralytics import YOLO import io import base64 import json from datetime import datetime import os import logging # Add this near the top of the file, after imports logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @st.cache_resource def load_model(model_name): """Load the selected model and processor""" try: if model_name == "OmniParser": try: # Load model directly using official implementation processor = AutoProcessor.from_pretrained( "microsoft/OmniParser", trust_remote_code=True ) model = AutoModelForVisualQuestionAnswering.from_pretrained( "microsoft/OmniParser", trust_remote_code=True, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) if torch.cuda.is_available(): model ="cuda") st.success("Successfully loaded OmniParser model") return { 'processor': processor, 'model': model } except Exception as e: st.error(f"Failed to load OmniParser from HuggingFace Hub: {str(e)}") logger.error(f"OmniParser loading error: {str(e)}", exc_info=True) return None elif model_name == "Donut": processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base") model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base") # Configure Donut specific parameters model.config.decoder_start_token_id = processor.tokenizer.bos_token_id model.config.pad_token_id = processor.tokenizer.pad_token_id model.config.vocab_size = len(processor.tokenizer) return {'model': model, 'processor': processor} elif model_name == "LayoutLMv3": processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base") model = LayoutLMv3ForSequenceClassification.from_pretrained("microsoft/layoutlmv3-base") return {'model': model, 'processor': processor} else: raise ValueError(f"Unknown model name: {model_name}") except Exception as e: st.error(f"Error loading model {model_name}: {str(e)}") logger.error(f"Error details: {str(e)}", exc_info=True) return None @spaces.GPU @torch.inference_mode() def analyze_document(image, model_name, models_dict): """Analyze document using selected model""" try: if models_dict is None: return {"error": "Model failed to load", "type": "model_error"} if model_name == "OmniParser": # Process image with OmniParser inputs = models_dict['processor']( images=image, return_tensors="pt", ) if torch.cuda.is_available(): inputs = {k:"cuda") if hasattr(v, "to") else v for k, v in inputs.items()} # Generate outputs outputs = models_dict['model'](**inputs) # Process results # The exact processing will depend on the model's output format results = { "predictions": outputs.logits.softmax(-1).tolist(), "detected_elements": len(outputs.logits[0]), "model_output": { k: v.tolist() if hasattr(v, "tolist") else str(v) for k, v in outputs.items() if k != "last_hidden_state" # Skip large tensors } } return results elif model_name == "Donut": model = models_dict['model'] processor = models_dict['processor'] # Process image with Donut pixel_values = processor(image, return_tensors="pt").pixel_values task_prompt = "<s_cord>analyze the document and extract information</s_cord>" decoder_input_ids = processor.tokenizer( task_prompt, add_special_tokens=False, return_tensors="pt" ).input_ids outputs = model.generate( pixel_values, decoder_input_ids=decoder_input_ids, max_length=512, early_stopping=True, pad_token_id=processor.tokenizer.pad_token_id, eos_token_id=processor.tokenizer.eos_token_id, use_cache=True, num_beams=4, bad_words_ids=[[processor.tokenizer.unk_token_id]], return_dict_in_generate=True ) sequence = processor.batch_decode(outputs.sequences)[0] sequence = sequence.replace(task_prompt, "").replace("</s_cord>", "").strip() try: result = json.loads(sequence) except json.JSONDecodeError: result = {"raw_text": sequence} return result elif model_name == "LayoutLMv3": model = models_dict['model'] processor = models_dict['processor'] # Process image with LayoutLMv3 encoded_inputs = processor( image, return_tensors="pt", add_special_tokens=True, return_offsets_mapping=True ) outputs = model(**encoded_inputs) predictions = outputs.logits.argmax(-1).squeeze().tolist() # Convert predictions to labels words = processor.tokenizer.convert_ids_to_tokens( encoded_inputs.input_ids.squeeze().tolist() ) result = { "predictions": [ { "text": word, "label": pred } for word, pred in zip(words, predictions) if word not in ["<s>", "</s>", "<pad>"] ], "confidence_scores": outputs.logits.softmax(-1).max(-1).values.squeeze().tolist() } return result else: return {"error": f"Unknown model: {model_name}", "type": "model_error"} except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Analysis error: {str(e)}\n{error_details}") return { "error": str(e), "type": "processing_error", "details": error_details } # Set page config with improved layout st.set_page_config( page_title="Document Analysis Comparison", layout="wide", initial_sidebar_state="expanded" ) # Add custom CSS for better styling st.markdown(""" <style> .stAlert { margin-top: 1rem; } .upload-text { font-size: 1.2rem; margin-bottom: 1rem; } .model-info { padding: 1rem; border-radius: 0.5rem; background-color: #f8f9fa; } </style> """, unsafe_allow_html=True) # Title and description st.title("Document Understanding Model Comparison") st.markdown(""" Compare different models for document analysis and understanding. Upload an image and select a model to analyze it. """) # Create two columns for layout col1, col2 = st.columns([1, 1]) with col1: # File uploader with improved error handling uploaded_file = st.file_uploader( "Choose a document image", type=['png', 'jpg', 'jpeg', 'pdf'], help="Supported formats: PNG, JPEG, PDF" ) if uploaded_file is not None: try: # Display uploaded image image = st.image(image, caption='Uploaded Document', use_column_width=True) except Exception as e: st.error(f"Error loading image: {str(e)}") with col2: # Model selection with detailed information model_info = { "Donut": { "description": "Best for structured OCR and document format understanding", "memory": "6-8GB", "strengths": ["Structured OCR", "Memory efficient", "Good with fixed formats"], "best_for": ["Invoices", "Forms", "Structured documents", "Tables"] }, "LayoutLMv3": { "description": "Strong layout understanding with reasoning capabilities", "memory": "12-15GB", "strengths": ["Layout understanding", "Reasoning", "Pre-trained knowledge"], "best_for": ["Complex documents", "Mixed layouts", "Documents with tables", "Multi-column text"] }, "OmniParser": { "description": "General screen parsing tool for UI understanding", "memory": "8-10GB", "strengths": ["UI element detection", "Interactive element recognition", "Function description"], "best_for": ["Screenshots", "UI analysis", "Interactive elements", "Web interfaces"] } } selected_model = st.selectbox( "Select Model", list(model_info.keys()) ) # Display enhanced model information st.markdown("### Model Details") with st.expander("Model Information", expanded=True): st.markdown(f"**Description:** {model_info[selected_model]['description']}") st.markdown(f"**Memory Required:** {model_info[selected_model]['memory']}") st.markdown("**Strengths:**") for strength in model_info[selected_model]['strengths']: st.markdown(f"- {strength}") st.markdown("**Best For:**") for use_case in model_info[selected_model]['best_for']: st.markdown(f"- {use_case}") # Inside the analysis section, replace the existing if-block with: if uploaded_file is not None and selected_model: if st.button("Analyze Document", help="Click to start document analysis"): # Create two columns for results and debug info result_col, debug_col = st.columns([1, 1]) with st.spinner('Processing...'): try: # Create a progress bar in results column with result_col: st.markdown("### Analysis Progress") progress_bar = st.progress(0) # Initialize debug column with debug_col: st.markdown("### Debug Information") debug_container = st.empty() def update_debug(message, level="info"): """Update debug information with timestamp""" timestamp ="%H:%M:%S.%f")[:-3] color = { "info": "blue", "warning": "orange", "error": "red", "success": "green" }.get(level, "black") return f"<div style='color: {color};'>[{timestamp}] {message}</div>" debug_messages = [] def add_debug(message, level="info"): debug_messages.append(update_debug(message, level)) debug_container.markdown( "\n".join(debug_messages), unsafe_allow_html=True ) # Load model with progress update with result_col: progress_bar.progress(25)"Loading model...") add_debug(f"Loading {selected_model} model and processor...") models_dict = load_model(selected_model) if models_dict is None: with result_col: st.error("Failed to load model. Please try again.") add_debug("Model loading failed!", "error") else: add_debug("Model loaded successfully", "success") # For device info, we need to check which model we're using if selected_model == "OmniParser": model_device = next(models_dict['model'].parameters()).device else: model_device = next(models_dict['model'].parameters()).device add_debug(f"Model device: {model_device}") # Update progress with result_col: progress_bar.progress(50)"Analyzing document...") # Log image details add_debug(f"Image size: {image.size}") add_debug(f"Image mode: {image.mode}") # Analyze document add_debug("Starting document analysis...") results = analyze_document(image, selected_model, models_dict) add_debug("Analysis completed", "success") # Update progress with result_col: progress_bar.progress(75) st.markdown("### Analysis Results") if isinstance(results, dict) and "error" in results: st.error(f"Analysis Error: {results['error']}") add_debug(f"Analysis error: {results['error']}", "error") else: # Pretty print the results in results column st.json(results) # Show detailed results breakdown in debug column add_debug("Results breakdown:", "info") if isinstance(results, dict): for key, value in results.items(): add_debug(f"- {key}: {type(value)}") else: add_debug(f"Result type: {type(results)}") # Complete progress progress_bar.progress(100) st.success("Analysis completed!") # Final debug info add_debug("Process completed successfully", "success") with debug_col: if torch.cuda.is_available(): st.markdown("### Resource Usage") st.markdown(f""" - GPU Memory: {torch.cuda.max_memory_allocated()/1024**2:.2f}MB - GPU Utilization: {torch.cuda.utilization()}% """) except Exception as e: with result_col: st.error(f"Error during analysis: {str(e)}") add_debug(f"Error: {str(e)}", "error") add_debug(f"Error type: {type(e)}", "error") if hasattr(e, '__traceback__'): add_debug("Traceback available in logs", "warning") # Add improved information about usage and limitations def verify_weights_directory(): """Verify the weights directory structure and files""" weights_path = "weights" required_files = { os.path.join(weights_path, "icon_detect", "model.safetensors"): "YOLO model weights", os.path.join(weights_path, "icon_detect", "model.yaml"): "YOLO model config", os.path.join(weights_path, "icon_caption_florence", "model.safetensors"): "Florence model weights", os.path.join(weights_path, "icon_caption_florence", "config.json"): "Florence model config", os.path.join(weights_path, "icon_caption_florence", "generation_config.json"): "Florence generation config" } missing_files = [] for file_path, description in required_files.items(): if not os.path.exists(file_path): missing_files.append(f"{description} at {file_path}") if missing_files: st.warning("Missing required model files:") for missing in missing_files: st.write(f"- {missing}") return False return True # Add this in your app's initialization if st.checkbox("Check Model Files"): if verify_weights_directory(): st.success("All required model files are present") else: st.error("Some model files are missing. Please ensure all required files are in the weights directory") st.markdown(""" --- ### Usage Notes: - Different models excel at different types of documents - Processing time and memory requirements vary by model - Image quality significantly affects results - Some models may require specific document formats """) # Add performance metrics section if st.checkbox("Show Performance Metrics"): st.markdown(""" ### Model Performance Metrics | Model | Avg. Processing Time | Memory Usage | Accuracy* | |-------|---------------------|--------------|-----------| | Donut | 2-3 seconds | 6-8GB | 85-90% | | LayoutLMv3 | 3-4 seconds | 12-15GB | 88-93% | | OmniParser | 2-3 seconds | 8-10GB | 85-90% | *Accuracy varies based on document type and quality """) # Add a footer with version and contact information st.markdown("---") st.markdown(""" v1.1 - Created with Streamlit \nPowered by Hugging Face Spaces 🤗 """) # Add model selection guidance if st.checkbox("Show Model Selection Guide"): st.markdown(""" ### How to Choose the Right Model 1. **Donut**: Choose for structured documents with clear layouts 2. **LayoutLMv3**: Best for documents with complex layouts and relationships 3. **OmniParser**: Best for UI elements and screen parsing """)