Comp-I / src /generators /compi_phase2e_refimg_to_image.py
axrzce's picture
Deploy from GitHub main
338d95d verified
raw
history blame
21.8 kB
"""
CompI Phase 2.E: Style Reference/Example Image to AI Art Generation
This module implements multimodal AI art generation that combines:
- Text prompts with style and mood conditioning
- Reference image style transfer and guidance
- Image-to-image generation with controllable strength
- Support for both local files and web URLs
- Advanced style analysis and prompt enhancement
Features:
- Support for various image formats and web sources
- Real-time image analysis and style suggestion
- Controllable reference strength for creative flexibility
- Comprehensive metadata logging and filename conventions
- Batch processing capabilities with multiple variations
"""
import os
import sys
import torch
import json
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union
from pathlib import Path
import logging
# Add project root to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from diffusers import StableDiffusionImg2ImgPipeline, StableDiffusionPipeline
from PIL import Image
import numpy as np
from src.utils.image_utils import ImageProcessor, StyleAnalyzer
from src.utils.logging_utils import setup_logger
from src.utils.file_utils import ensure_directory_exists, generate_filename
from src.config import (
STABLE_DIFFUSION_IMG2IMG_MODEL,
OUTPUTS_DIR,
DEFAULT_IMAGE_SIZE,
DEFAULT_INFERENCE_STEPS,
DEFAULT_GUIDANCE_SCALE
)
# Setup logging
logger = setup_logger(__name__)
class CompIPhase2ERefImageToImage:
"""
CompI Phase 2.E: Style Reference/Example Image to AI Art Generation System
Combines text prompts with reference image style guidance for enhanced creativity
"""
def __init__(
self,
model_name: str = STABLE_DIFFUSION_IMG2IMG_MODEL,
device: Optional[str] = None,
enable_attention_slicing: bool = True,
enable_memory_efficient_attention: bool = True
):
"""
Initialize the CompI Phase 2.E system
Args:
model_name: Hugging Face model identifier
device: Device to run on ('cuda', 'cpu', or None for auto)
enable_attention_slicing: Enable attention slicing for memory efficiency
enable_memory_efficient_attention: Enable memory efficient attention
"""
self.model_name = model_name
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
# Initialize components
self.image_processor = ImageProcessor()
self.style_analyzer = StyleAnalyzer()
# Initialize pipelines (lazy loading)
self._img2img_pipeline = None
self._txt2img_pipeline = None
# Configuration
self.enable_attention_slicing = enable_attention_slicing
self.enable_memory_efficient_attention = enable_memory_efficient_attention
logger.info(f"Initialized CompI Phase 2.E on device: {self.device}")
@property
def img2img_pipeline(self) -> StableDiffusionImg2ImgPipeline:
"""Lazy load img2img pipeline"""
if self._img2img_pipeline is None:
logger.info(f"Loading img2img pipeline: {self.model_name}")
self._img2img_pipeline = StableDiffusionImg2ImgPipeline.from_pretrained(
self.model_name,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
safety_checker=None, # Disabled for creative use
requires_safety_checker=False
)
self._img2img_pipeline = self._img2img_pipeline.to(self.device)
if self.enable_attention_slicing:
self._img2img_pipeline.enable_attention_slicing()
if self.enable_memory_efficient_attention and hasattr(self._img2img_pipeline, 'enable_memory_efficient_attention'):
self._img2img_pipeline.enable_memory_efficient_attention()
return self._img2img_pipeline
@property
def txt2img_pipeline(self) -> StableDiffusionPipeline:
"""Lazy load txt2img pipeline for fallback"""
if self._txt2img_pipeline is None:
logger.info(f"Loading txt2img pipeline: {self.model_name}")
self._txt2img_pipeline = StableDiffusionPipeline.from_pretrained(
self.model_name,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
safety_checker=None, # Disabled for creative use
requires_safety_checker=False
)
self._txt2img_pipeline = self._txt2img_pipeline.to(self.device)
if self.enable_attention_slicing:
self._txt2img_pipeline.enable_attention_slicing()
if self.enable_memory_efficient_attention and hasattr(self._txt2img_pipeline, 'enable_memory_efficient_attention'):
self._txt2img_pipeline.enable_memory_efficient_attention()
return self._txt2img_pipeline
def load_reference_image(
self,
source: Union[str, Path, Image.Image],
preprocess: bool = True
) -> Optional[Tuple[Image.Image, Dict]]:
"""
Load and analyze reference image from various sources
Args:
source: Image source (file path, URL, or PIL Image)
preprocess: Whether to preprocess the image
Returns:
Tuple of (processed_image, analysis_results) or None if failed
"""
try:
# Load image based on source type
if isinstance(source, Image.Image):
image = source.convert('RGB')
source_info = "PIL Image object"
elif isinstance(source, (str, Path)):
source_str = str(source)
if source_str.startswith(('http://', 'https://')):
image = self.image_processor.load_image_from_url(source_str)
source_info = f"URL: {source_str}"
else:
image = self.image_processor.load_image_from_file(source_str)
source_info = f"File: {source_str}"
if image is None:
return None
else:
logger.error(f"Unsupported source type: {type(source)}")
return None
# Preprocess if requested
if preprocess:
image = self.image_processor.preprocess_image(image, DEFAULT_IMAGE_SIZE)
# Analyze image properties
properties = self.image_processor.analyze_image_properties(image)
style_suggestions = self.style_analyzer.suggest_style_keywords(properties)
image_hash = self.image_processor.generate_image_hash(image)
analysis = {
'source': source_info,
'properties': properties,
'style_suggestions': style_suggestions,
'hash': image_hash,
'processed_size': image.size
}
logger.info(f"Successfully loaded and analyzed reference image: {analysis}")
return image, analysis
except Exception as e:
logger.error(f"Error loading reference image: {e}")
return None
def enhance_prompt_with_style(
self,
base_prompt: str,
style: str = "",
mood: str = "",
style_suggestions: List[str] = None
) -> str:
"""
Enhance prompt with style information from reference image
Args:
base_prompt: Base text prompt
style: Additional style keywords
mood: Mood/atmosphere keywords
style_suggestions: Suggested keywords from image analysis
Returns:
Enhanced prompt string
"""
try:
prompt_parts = [base_prompt.strip()]
# Add explicit style
if style.strip():
prompt_parts.append(style.strip())
# Add mood
if mood.strip():
prompt_parts.append(mood.strip())
# Add style suggestions from image analysis
if style_suggestions:
# Limit to top 3 suggestions to avoid prompt bloat
top_suggestions = style_suggestions[:3]
prompt_parts.extend(top_suggestions)
enhanced_prompt = ", ".join(prompt_parts)
logger.info(f"Enhanced prompt: {enhanced_prompt}")
return enhanced_prompt
except Exception as e:
logger.error(f"Error enhancing prompt: {e}")
return base_prompt
def generate_with_reference(
self,
prompt: str,
reference_image: Image.Image,
style: str = "",
mood: str = "",
strength: float = 0.5,
num_images: int = 1,
num_inference_steps: int = DEFAULT_INFERENCE_STEPS,
guidance_scale: float = DEFAULT_GUIDANCE_SCALE,
seed: Optional[int] = None,
style_suggestions: List[str] = None
) -> List[Dict]:
"""
Generate images using reference image guidance
Args:
prompt: Text prompt
reference_image: Reference PIL Image
style: Style keywords
mood: Mood keywords
strength: Reference strength (0.0-1.0, higher = closer to reference)
num_images: Number of images to generate
num_inference_steps: Number of denoising steps
guidance_scale: Classifier-free guidance scale
seed: Random seed for reproducibility
style_suggestions: Style suggestions from image analysis
Returns:
List of generation results with metadata
"""
try:
# Enhance prompt with style information
enhanced_prompt = self.enhance_prompt_with_style(
prompt, style, mood, style_suggestions
)
results = []
for i in range(num_images):
# Set up random seed
if seed is not None:
current_seed = seed + i
else:
current_seed = torch.seed()
generator = torch.Generator(device=self.device).manual_seed(current_seed)
# Generate image
logger.info(f"Generating image {i+1}/{num_images} with reference guidance")
with torch.autocast(self.device) if self.device == "cuda" else torch.no_grad():
result = self.img2img_pipeline(
prompt=enhanced_prompt,
image=reference_image,
strength=strength,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
generator=generator
)
generated_image = result.images[0]
# Create metadata
metadata = {
'prompt': prompt,
'enhanced_prompt': enhanced_prompt,
'style': style,
'mood': mood,
'strength': strength,
'num_inference_steps': num_inference_steps,
'guidance_scale': guidance_scale,
'seed': current_seed,
'model': self.model_name,
'generation_type': 'img2img_reference',
'timestamp': datetime.now().isoformat(),
'device': self.device,
'reference_size': reference_image.size,
'output_size': generated_image.size,
'style_suggestions': style_suggestions or []
}
results.append({
'image': generated_image,
'metadata': metadata,
'index': i
})
logger.info(f"Successfully generated {len(results)} images with reference guidance")
return results
except Exception as e:
logger.error(f"Error generating images with reference: {e}")
return []
def generate_without_reference(
self,
prompt: str,
style: str = "",
mood: str = "",
num_images: int = 1,
num_inference_steps: int = DEFAULT_INFERENCE_STEPS,
guidance_scale: float = DEFAULT_GUIDANCE_SCALE,
seed: Optional[int] = None
) -> List[Dict]:
"""
Generate images without reference (fallback to text-to-image)
Args:
prompt: Text prompt
style: Style keywords
mood: Mood keywords
num_images: Number of images to generate
num_inference_steps: Number of denoising steps
guidance_scale: Classifier-free guidance scale
seed: Random seed for reproducibility
Returns:
List of generation results with metadata
"""
try:
# Enhance prompt
enhanced_prompt = self.enhance_prompt_with_style(prompt, style, mood)
results = []
for i in range(num_images):
# Set up random seed
if seed is not None:
current_seed = seed + i
else:
current_seed = torch.seed()
generator = torch.Generator(device=self.device).manual_seed(current_seed)
# Generate image
logger.info(f"Generating image {i+1}/{num_images} without reference")
with torch.autocast(self.device) if self.device == "cuda" else torch.no_grad():
result = self.txt2img_pipeline(
prompt=enhanced_prompt,
height=DEFAULT_IMAGE_SIZE[1],
width=DEFAULT_IMAGE_SIZE[0],
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
generator=generator
)
generated_image = result.images[0]
# Create metadata
metadata = {
'prompt': prompt,
'enhanced_prompt': enhanced_prompt,
'style': style,
'mood': mood,
'num_inference_steps': num_inference_steps,
'guidance_scale': guidance_scale,
'seed': current_seed,
'model': self.model_name,
'generation_type': 'txt2img_fallback',
'timestamp': datetime.now().isoformat(),
'device': self.device,
'output_size': generated_image.size
}
results.append({
'image': generated_image,
'metadata': metadata,
'index': i
})
logger.info(f"Successfully generated {len(results)} images without reference")
return results
except Exception as e:
logger.error(f"Error generating images without reference: {e}")
return []
def save_results(
self,
results: List[Dict],
output_dir: Path = OUTPUTS_DIR,
reference_info: Optional[Dict] = None
) -> List[str]:
"""
Save generation results with comprehensive metadata
Args:
results: List of generation results
output_dir: Output directory
reference_info: Reference image information
Returns:
List of saved file paths
"""
try:
ensure_directory_exists(output_dir)
saved_files = []
for result in results:
image = result['image']
metadata = result['metadata']
index = result['index']
# Generate filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
prompt_slug = "_".join(metadata['prompt'].lower().split()[:5])
style_slug = metadata.get('style', '').replace(' ', '')[:10]
mood_slug = metadata.get('mood', '').replace(' ', '')[:10]
# Add reference indicator
ref_indicator = "REFIMG" if metadata['generation_type'] == 'img2img_reference' else "NOREFIMG"
filename = f"{prompt_slug}_{style_slug}_{mood_slug}_{timestamp}_seed{metadata['seed']}_{ref_indicator}_v{index+1}.png"
filepath = output_dir / filename
# Save image
image.save(filepath)
# Add reference info to metadata if available
if reference_info:
metadata['reference_info'] = reference_info
# Save metadata
metadata_filename = filepath.stem + "_metadata.json"
metadata_filepath = output_dir / metadata_filename
with open(metadata_filepath, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
saved_files.extend([str(filepath), str(metadata_filepath)])
logger.info(f"Saved: {filepath}")
return saved_files
except Exception as e:
logger.error(f"Error saving results: {e}")
return []
def generate_batch(
self,
prompt: str,
reference_source: Optional[Union[str, Path, Image.Image]] = None,
style: str = "",
mood: str = "",
strength: float = 0.5,
num_images: int = 1,
num_inference_steps: int = DEFAULT_INFERENCE_STEPS,
guidance_scale: float = DEFAULT_GUIDANCE_SCALE,
seed: Optional[int] = None,
save_results: bool = True,
output_dir: Path = OUTPUTS_DIR
) -> Dict:
"""
Complete batch generation pipeline
Args:
prompt: Text prompt
reference_source: Reference image source (file, URL, or PIL Image)
style: Style keywords
mood: Mood keywords
strength: Reference strength (only used if reference provided)
num_images: Number of images to generate
num_inference_steps: Number of denoising steps
guidance_scale: Classifier-free guidance scale
seed: Random seed for reproducibility
save_results: Whether to save results to disk
output_dir: Output directory for saved files
Returns:
Dictionary with results and metadata
"""
try:
logger.info(f"Starting batch generation: {num_images} images")
reference_image = None
reference_info = None
style_suggestions = []
# Load and analyze reference image if provided
if reference_source is not None:
ref_result = self.load_reference_image(reference_source)
if ref_result:
reference_image, reference_info = ref_result
style_suggestions = reference_info.get('style_suggestions', [])
logger.info(f"Using reference image with suggestions: {style_suggestions}")
else:
logger.warning("Failed to load reference image, falling back to text-only generation")
# Generate images
if reference_image is not None:
results = self.generate_with_reference(
prompt=prompt,
reference_image=reference_image,
style=style,
mood=mood,
strength=strength,
num_images=num_images,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
seed=seed,
style_suggestions=style_suggestions
)
else:
results = self.generate_without_reference(
prompt=prompt,
style=style,
mood=mood,
num_images=num_images,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
seed=seed
)
# Save results if requested
saved_files = []
if save_results and results:
saved_files = self.save_results(results, output_dir, reference_info)
# Compile final results
batch_result = {
'results': results,
'reference_info': reference_info,
'saved_files': saved_files,
'generation_summary': {
'total_images': len(results),
'prompt': prompt,
'style': style,
'mood': mood,
'has_reference': reference_image is not None,
'style_suggestions': style_suggestions,
'timestamp': datetime.now().isoformat()
}
}
logger.info(f"Batch generation complete: {len(results)} images generated")
return batch_result
except Exception as e:
logger.error(f"Error in batch generation: {e}")
return {
'results': [],
'reference_info': None,
'saved_files': [],
'error': str(e)
}