Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Main processing logic for FLUX Prompt Optimizer | |
| Handles image analysis, prompt optimization, and scoring | |
| """ | |
| import logging | |
| import time | |
| from typing import Tuple, Dict, Any, Optional | |
| from PIL import Image | |
| from datetime import datetime | |
| from config import APP_CONFIG, PROCESSING_CONFIG, get_device_config | |
| from utils import ( | |
| optimize_image, validate_image, apply_flux_rules, | |
| calculate_prompt_score, get_score_grade, format_analysis_report, | |
| clean_memory, safe_execute | |
| ) | |
| from models import analyze_image | |
| logger = logging.getLogger(__name__) | |
| class FluxOptimizer: | |
| """Main optimizer class for FLUX prompt generation""" | |
| def __init__(self, model_name: str = None): | |
| self.model_name = model_name | |
| self.device_config = get_device_config() | |
| self.processing_stats = { | |
| "total_processed": 0, | |
| "successful_analyses": 0, | |
| "failed_analyses": 0, | |
| "average_processing_time": 0.0 | |
| } | |
| logger.info(f"FluxOptimizer initialized - Device: {self.device_config['device']}") | |
| def process_image(self, image: Any) -> Tuple[str, str, str, Dict[str, Any]]: | |
| """ | |
| Complete image processing pipeline | |
| Args: | |
| image: Input image (PIL, numpy array, or file path) | |
| Returns: | |
| Tuple of (optimized_prompt, analysis_report, score_html, metadata) | |
| """ | |
| start_time = time.time() | |
| metadata = { | |
| "processing_time": 0.0, | |
| "success": False, | |
| "model_used": self.model_name or "auto", | |
| "device": self.device_config["device"], | |
| "error": None | |
| } | |
| try: | |
| # Step 1: Validate and optimize input image | |
| logger.info("Starting image processing pipeline...") | |
| if not validate_image(image): | |
| error_msg = "Invalid or unsupported image format" | |
| logger.error(error_msg) | |
| return self._create_error_response(error_msg, metadata) | |
| optimized_image = optimize_image(image) | |
| if optimized_image is None: | |
| error_msg = "Image optimization failed" | |
| logger.error(error_msg) | |
| return self._create_error_response(error_msg, metadata) | |
| logger.info(f"Image optimized to size: {optimized_image.size}") | |
| # Step 2: Analyze image with selected model | |
| logger.info("Running image analysis...") | |
| analysis_success, analysis_result = safe_execute( | |
| analyze_image, | |
| optimized_image, | |
| self.model_name | |
| ) | |
| if not analysis_success: | |
| error_msg = f"Image analysis failed: {analysis_result}" | |
| logger.error(error_msg) | |
| return self._create_error_response(error_msg, metadata) | |
| description, analysis_metadata = analysis_result | |
| logger.info(f"Analysis complete: {len(description)} characters") | |
| # Step 3: Apply FLUX optimization rules | |
| logger.info("Applying FLUX optimization rules...") | |
| optimized_prompt = apply_flux_rules(description) | |
| if not optimized_prompt: | |
| optimized_prompt = "A professional photograph" | |
| logger.warning("Empty prompt after optimization, using fallback") | |
| # Step 4: Calculate quality score | |
| logger.info("Calculating quality score...") | |
| score, score_breakdown = calculate_prompt_score(optimized_prompt, analysis_metadata) | |
| grade_info = get_score_grade(score) | |
| # Step 5: Generate analysis report | |
| processing_time = time.time() - start_time | |
| metadata.update({ | |
| "processing_time": processing_time, | |
| "success": True, | |
| "prompt_length": len(optimized_prompt), | |
| "score": score, | |
| "grade": grade_info["grade"], | |
| "analysis_metadata": analysis_metadata | |
| }) | |
| analysis_report = self._generate_detailed_report( | |
| optimized_prompt, analysis_metadata, score, | |
| score_breakdown, processing_time | |
| ) | |
| # Step 6: Create score HTML | |
| score_html = self._generate_score_html(score, grade_info) | |
| # Update statistics | |
| self._update_stats(processing_time, True) | |
| logger.info(f"Processing complete - Score: {score}, Time: {processing_time:.1f}s") | |
| return optimized_prompt, analysis_report, score_html, metadata | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| error_msg = f"Unexpected error in processing pipeline: {str(e)}" | |
| logger.error(error_msg, exc_info=True) | |
| metadata.update({ | |
| "processing_time": processing_time, | |
| "error": error_msg | |
| }) | |
| self._update_stats(processing_time, False) | |
| return self._create_error_response(error_msg, metadata) | |
| finally: | |
| # Always clean up memory | |
| clean_memory() | |
| def _create_error_response(self, error_msg: str, metadata: Dict[str, Any]) -> Tuple[str, str, str, Dict[str, Any]]: | |
| """Create standardized error response""" | |
| error_prompt = "❌ Processing failed" | |
| error_report = f"**Error:** {error_msg}\n\nPlease try with a different image or check the logs for more details." | |
| error_html = self._generate_score_html(0, get_score_grade(0)) | |
| metadata["success"] = False | |
| metadata["error"] = error_msg | |
| return error_prompt, error_report, error_html, metadata | |
| def _generate_detailed_report(self, prompt: str, analysis_metadata: Dict[str, Any], | |
| score: int, breakdown: Dict[str, int], | |
| processing_time: float) -> str: | |
| """Generate comprehensive analysis report""" | |
| model_used = analysis_metadata.get("model", "Unknown") | |
| device_used = analysis_metadata.get("device", self.device_config["device"]) | |
| confidence = analysis_metadata.get("confidence", 0.0) | |
| # Device status emoji | |
| device_emoji = "⚡" if device_used == "cuda" else "💻" | |
| report = f"""**Analysis Complete** | |
| **Processing:** {device_emoji} {device_used.upper()} • {processing_time:.1f}s • Model: {model_used} | |
| **Score:** {score}/100 • Confidence: {confidence:.0%} | |
| **Score Breakdown:** | |
| • **Prompt Quality:** {breakdown.get('prompt_quality', 0)}/30 - Content detail and description | |
| • **Technical Details:** {breakdown.get('technical_details', 0)}/25 - Camera and photography settings | |
| • **Artistic Value:** {breakdown.get('artistic_value', 0)}/25 - Creative elements | |
| • **FLUX Optimization:** {breakdown.get('flux_optimization', 0)}/20 - Platform optimizations | |
| **Analysis Summary:** | |
| **Description Length:** {len(prompt)} characters | |
| **Model Used:** {analysis_metadata.get('model', 'N/A')} | |
| **Applied Optimizations:** | |
| ✅ Camera settings added | |
| ✅ Lighting configuration applied | |
| ✅ Technical parameters optimized | |
| ✅ FLUX rules implemented | |
| ✅ Content cleaned and enhanced | |
| **Performance:** | |
| • **Processing Time:** {processing_time:.1f}s | |
| • **Device:** {device_used.upper()} | |
| • **Model Confidence:** {confidence:.0%} | |
| **Frame 0 Laboratory for MIA**""" | |
| return report | |
| def _generate_score_html(self, score: int, grade_info: Dict[str, str]) -> str: | |
| """Generate HTML for score display""" | |
| html = f''' | |
| <div style="text-align: center; padding: 2rem; background: linear-gradient(135deg, #f0fdf4 0%, #dcfce7 100%); border: 3px solid {grade_info["color"]}; border-radius: 16px; margin: 1rem 0; box-shadow: 0 8px 25px -5px rgba(0, 0, 0, 0.1);"> | |
| <div style="font-size: 3rem; font-weight: 800; color: {grade_info["color"]}; margin: 0; text-shadow: 0 2px 4px rgba(0,0,0,0.1);">{score}</div> | |
| <div style="font-size: 1.25rem; color: #15803d; margin: 0.5rem 0; text-transform: uppercase; letter-spacing: 0.1em; font-weight: 700;">{grade_info["grade"]}</div> | |
| <div style="font-size: 1rem; color: #15803d; margin: 0; text-transform: uppercase; letter-spacing: 0.05em; font-weight: 500;">FLUX Quality Score</div> | |
| </div> | |
| ''' | |
| return html | |
| def _update_stats(self, processing_time: float, success: bool) -> None: | |
| """Update processing statistics""" | |
| self.processing_stats["total_processed"] += 1 | |
| if success: | |
| self.processing_stats["successful_analyses"] += 1 | |
| else: | |
| self.processing_stats["failed_analyses"] += 1 | |
| # Update rolling average of processing time | |
| current_avg = self.processing_stats["average_processing_time"] | |
| total_count = self.processing_stats["total_processed"] | |
| self.processing_stats["average_processing_time"] = ( | |
| (current_avg * (total_count - 1) + processing_time) / total_count | |
| ) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get current processing statistics""" | |
| stats = self.processing_stats.copy() | |
| if stats["total_processed"] > 0: | |
| stats["success_rate"] = stats["successful_analyses"] / stats["total_processed"] | |
| else: | |
| stats["success_rate"] = 0.0 | |
| stats["device_info"] = self.device_config | |
| return stats | |
| def reset_stats(self) -> None: | |
| """Reset processing statistics""" | |
| self.processing_stats = { | |
| "total_processed": 0, | |
| "successful_analyses": 0, | |
| "failed_analyses": 0, | |
| "average_processing_time": 0.0 | |
| } | |
| logger.info("Processing statistics reset") | |
| class BatchProcessor: | |
| """Handle batch processing of multiple images""" | |
| def __init__(self, optimizer: FluxOptimizer): | |
| self.optimizer = optimizer | |
| self.batch_results = [] | |
| def process_batch(self, images: list) -> list: | |
| """Process multiple images in batch""" | |
| results = [] | |
| for i, image in enumerate(images): | |
| logger.info(f"Processing batch item {i+1}/{len(images)}") | |
| try: | |
| result = self.optimizer.process_image(image) | |
| results.append({ | |
| "index": i, | |
| "success": result[3]["success"], | |
| "result": result | |
| }) | |
| except Exception as e: | |
| logger.error(f"Batch item {i} failed: {e}") | |
| results.append({ | |
| "index": i, | |
| "success": False, | |
| "error": str(e) | |
| }) | |
| self.batch_results = results | |
| return results | |
| def get_batch_summary(self) -> Dict[str, Any]: | |
| """Get summary of batch processing results""" | |
| if not self.batch_results: | |
| return {"total": 0, "successful": 0, "failed": 0} | |
| successful = sum(1 for r in self.batch_results if r["success"]) | |
| total = len(self.batch_results) | |
| return { | |
| "total": total, | |
| "successful": successful, | |
| "failed": total - successful, | |
| "success_rate": successful / total if total > 0 else 0.0 | |
| } | |
| # Global optimizer instance | |
| flux_optimizer = FluxOptimizer() | |
| def process_image_simple(image: Any, model_name: str = None) -> Tuple[str, str, str]: | |
| """ | |
| Simple interface for image processing | |
| Args: | |
| image: Input image | |
| model_name: Optional model name | |
| Returns: | |
| Tuple of (prompt, report, score_html) | |
| """ | |
| if model_name and model_name != flux_optimizer.model_name: | |
| # Create temporary optimizer with specified model | |
| temp_optimizer = FluxOptimizer(model_name) | |
| prompt, report, score_html, _ = temp_optimizer.process_image(image) | |
| else: | |
| prompt, report, score_html, _ = flux_optimizer.process_image(image) | |
| return prompt, report, score_html | |
| # Export main components | |
| __all__ = [ | |
| "FluxOptimizer", | |
| "BatchProcessor", | |
| "flux_optimizer", | |
| "process_image_simple" | |
| ] |