|
""" |
|
Image processing utilities for CompI Phase 2.E: Style Reference/Example Image Integration |
|
|
|
This module provides utilities for: |
|
- Image loading from files and URLs |
|
- Image validation and preprocessing |
|
- Style analysis and feature extraction |
|
- Image format conversion and optimization |
|
""" |
|
|
|
import os |
|
import io |
|
import requests |
|
import hashlib |
|
from typing import Optional, Tuple, Dict, Any, Union, List |
|
from pathlib import Path |
|
import logging |
|
|
|
import torch |
|
import numpy as np |
|
from PIL import Image, ImageStat, ImageFilter |
|
import cv2 |
|
|
|
from src.utils.logging_utils import setup_logger |
|
|
|
logger = setup_logger(__name__) |
|
|
|
class ImageProcessor: |
|
""" |
|
Handles image loading, validation, and preprocessing for style reference |
|
""" |
|
|
|
def __init__(self, max_size: Tuple[int, int] = (1024, 1024)): |
|
self.max_size = max_size |
|
self.supported_formats = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} |
|
|
|
def load_image_from_url( |
|
self, |
|
url: str, |
|
timeout: int = 10, |
|
max_file_size: int = 10 * 1024 * 1024 |
|
) -> Optional[Image.Image]: |
|
""" |
|
Load image from URL with validation and error handling |
|
|
|
Args: |
|
url: Image URL |
|
timeout: Request timeout in seconds |
|
max_file_size: Maximum file size in bytes |
|
|
|
Returns: |
|
PIL Image or None if failed |
|
""" |
|
try: |
|
logger.info(f"Loading image from URL: {url}") |
|
|
|
|
|
if not url.startswith(('http://', 'https://')): |
|
logger.error(f"Invalid URL format: {url}") |
|
return None |
|
|
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
} |
|
|
|
response = requests.get(url, timeout=timeout, headers=headers, stream=True) |
|
response.raise_for_status() |
|
|
|
|
|
content_type = response.headers.get('content-type', '').lower() |
|
if not any(img_type in content_type for img_type in ['image/', 'jpeg', 'png', 'webp']): |
|
logger.error(f"Invalid content type: {content_type}") |
|
return None |
|
|
|
|
|
content_length = response.headers.get('content-length') |
|
if content_length and int(content_length) > max_file_size: |
|
logger.error(f"File too large: {content_length} bytes") |
|
return None |
|
|
|
|
|
image_data = io.BytesIO() |
|
downloaded_size = 0 |
|
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
downloaded_size += len(chunk) |
|
if downloaded_size > max_file_size: |
|
logger.error(f"File too large during download: {downloaded_size} bytes") |
|
return None |
|
image_data.write(chunk) |
|
|
|
image_data.seek(0) |
|
|
|
|
|
image = Image.open(image_data) |
|
image = image.convert('RGB') |
|
|
|
logger.info(f"Successfully loaded image: {image.size}") |
|
return image |
|
|
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Request error loading image from {url}: {e}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error loading image from {url}: {e}") |
|
return None |
|
|
|
def load_image_from_file(self, file_path: Union[str, Path]) -> Optional[Image.Image]: |
|
""" |
|
Load image from local file with validation |
|
|
|
Args: |
|
file_path: Path to image file |
|
|
|
Returns: |
|
PIL Image or None if failed |
|
""" |
|
try: |
|
file_path = Path(file_path) |
|
|
|
if not file_path.exists(): |
|
logger.error(f"File does not exist: {file_path}") |
|
return None |
|
|
|
if file_path.suffix.lower() not in self.supported_formats: |
|
logger.error(f"Unsupported format: {file_path.suffix}") |
|
return None |
|
|
|
image = Image.open(file_path) |
|
image = image.convert('RGB') |
|
|
|
logger.info(f"Successfully loaded image from file: {image.size}") |
|
return image |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading image from {file_path}: {e}") |
|
return None |
|
|
|
def preprocess_image( |
|
self, |
|
image: Image.Image, |
|
target_size: Optional[Tuple[int, int]] = None, |
|
maintain_aspect_ratio: bool = True |
|
) -> Image.Image: |
|
""" |
|
Preprocess image for stable diffusion |
|
|
|
Args: |
|
image: Input PIL Image |
|
target_size: Target size (width, height) |
|
maintain_aspect_ratio: Whether to maintain aspect ratio |
|
|
|
Returns: |
|
Preprocessed PIL Image |
|
""" |
|
if target_size is None: |
|
target_size = (512, 512) |
|
|
|
try: |
|
|
|
if maintain_aspect_ratio: |
|
image.thumbnail(target_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
new_image = Image.new('RGB', target_size, (255, 255, 255)) |
|
paste_x = (target_size[0] - image.width) // 2 |
|
paste_y = (target_size[1] - image.height) // 2 |
|
new_image.paste(image, (paste_x, paste_y)) |
|
image = new_image |
|
else: |
|
image = image.resize(target_size, Image.Resampling.LANCZOS) |
|
|
|
logger.info(f"Preprocessed image to size: {image.size}") |
|
return image |
|
|
|
except Exception as e: |
|
logger.error(f"Error preprocessing image: {e}") |
|
return image |
|
|
|
def analyze_image_properties(self, image: Image.Image) -> Dict[str, Any]: |
|
""" |
|
Analyze image properties for style reference |
|
|
|
Args: |
|
image: PIL Image to analyze |
|
|
|
Returns: |
|
Dictionary of image properties |
|
""" |
|
try: |
|
|
|
width, height = image.size |
|
aspect_ratio = width / height |
|
|
|
|
|
stat = ImageStat.Stat(image) |
|
avg_brightness = sum(stat.mean) / len(stat.mean) |
|
avg_contrast = sum(stat.stddev) / len(stat.stddev) |
|
|
|
|
|
img_array = np.array(image) |
|
|
|
|
|
r_mean, g_mean, b_mean = np.mean(img_array, axis=(0, 1)) |
|
color_variance = np.var(img_array, axis=(0, 1)) |
|
|
|
|
|
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
|
edges = cv2.Canny(gray, 50, 150) |
|
edge_density = np.sum(edges > 0) / (width * height) |
|
|
|
properties = { |
|
'dimensions': (width, height), |
|
'aspect_ratio': aspect_ratio, |
|
'brightness': avg_brightness, |
|
'contrast': avg_contrast, |
|
'color_means': (float(r_mean), float(g_mean), float(b_mean)), |
|
'color_variance': color_variance.tolist(), |
|
'edge_density': float(edge_density), |
|
'file_size_pixels': width * height |
|
} |
|
|
|
logger.info(f"Analyzed image properties: {properties}") |
|
return properties |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing image properties: {e}") |
|
return {} |
|
|
|
def generate_image_hash(self, image: Image.Image) -> str: |
|
""" |
|
Generate hash for image deduplication |
|
|
|
Args: |
|
image: PIL Image |
|
|
|
Returns: |
|
MD5 hash string |
|
""" |
|
try: |
|
|
|
img_bytes = io.BytesIO() |
|
image.save(img_bytes, format='PNG') |
|
img_bytes = img_bytes.getvalue() |
|
|
|
|
|
hash_md5 = hashlib.md5(img_bytes) |
|
return hash_md5.hexdigest() |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating image hash: {e}") |
|
return "" |
|
|
|
class StyleAnalyzer: |
|
""" |
|
Analyzes style characteristics of reference images |
|
""" |
|
|
|
def __init__(self): |
|
self.style_keywords = { |
|
'realistic': ['photo', 'realistic', 'detailed', 'sharp'], |
|
'artistic': ['painting', 'artistic', 'brushstrokes', 'canvas'], |
|
'anime': ['anime', 'manga', 'cartoon', 'stylized'], |
|
'abstract': ['abstract', 'geometric', 'surreal', 'conceptual'], |
|
'vintage': ['vintage', 'retro', 'aged', 'classic'], |
|
'modern': ['modern', 'contemporary', 'clean', 'minimal'] |
|
} |
|
|
|
def suggest_style_keywords(self, image_properties: Dict[str, Any]) -> List[str]: |
|
""" |
|
Suggest style keywords based on image analysis |
|
|
|
Args: |
|
image_properties: Properties from analyze_image_properties |
|
|
|
Returns: |
|
List of suggested style keywords |
|
""" |
|
suggestions = [] |
|
|
|
try: |
|
brightness = image_properties.get('brightness', 128) |
|
contrast = image_properties.get('contrast', 50) |
|
edge_density = image_properties.get('edge_density', 0.1) |
|
|
|
|
|
if brightness < 100: |
|
suggestions.extend(['dark', 'moody', 'dramatic']) |
|
elif brightness > 180: |
|
suggestions.extend(['bright', 'light', 'airy']) |
|
|
|
|
|
if contrast > 80: |
|
suggestions.extend(['high contrast', 'bold', 'striking']) |
|
elif contrast < 30: |
|
suggestions.extend(['soft', 'gentle', 'muted']) |
|
|
|
|
|
if edge_density > 0.2: |
|
suggestions.extend(['detailed', 'complex', 'intricate']) |
|
elif edge_density < 0.05: |
|
suggestions.extend(['smooth', 'simple', 'minimalist']) |
|
|
|
return list(set(suggestions)) |
|
|
|
except Exception as e: |
|
logger.error(f"Error suggesting style keywords: {e}") |
|
return [] |
|
|