|
""" |
|
CompI Phase 2.A: Audio-to-Image Generation |
|
|
|
This module implements multimodal AI art generation that combines: |
|
- Text prompts with style and mood conditioning |
|
- Audio analysis and feature extraction |
|
- Audio-to-text captioning |
|
- Intelligent prompt fusion for enhanced creativity |
|
|
|
Features: |
|
- Support for various audio formats (mp3, wav, flac, etc.) |
|
- Real-time audio analysis with tempo, energy, and spectral features |
|
- OpenAI Whisper integration for audio captioning |
|
- Comprehensive metadata logging and filename conventions |
|
- Batch processing capabilities |
|
""" |
|
|
|
import os |
|
import sys |
|
import torch |
|
import json |
|
from datetime import datetime |
|
from typing import Dict, List, Optional, Tuple, Union |
|
from pathlib import Path |
|
import logging |
|
|
|
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) |
|
|
|
from diffusers import StableDiffusionPipeline |
|
from PIL import Image |
|
import numpy as np |
|
|
|
from src.utils.audio_utils import AudioProcessor, AudioCaptioner, MultimodalPromptFusion, AudioFeatures |
|
from src.utils.logging_utils import setup_logger |
|
from src.utils.file_utils import ensure_directory_exists, generate_filename |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class CompIPhase2AAudioToImage: |
|
""" |
|
CompI Phase 2.A: Audio-to-Image Generation System |
|
|
|
Combines text prompts with audio analysis to generate contextually rich AI art |
|
""" |
|
|
|
def __init__( |
|
self, |
|
model_name: str = "runwayml/stable-diffusion-v1-5", |
|
device: str = "auto", |
|
output_dir: str = "outputs", |
|
whisper_model: str = "base" |
|
): |
|
""" |
|
Initialize the audio-to-image generation system |
|
|
|
Args: |
|
model_name: Stable Diffusion model to use |
|
device: Device for inference (auto, cpu, cuda) |
|
output_dir: Directory for saving generated images |
|
whisper_model: Whisper model size for audio captioning |
|
""" |
|
self.model_name = model_name |
|
self.device = self._setup_device(device) |
|
self.output_dir = Path(output_dir) |
|
ensure_directory_exists(self.output_dir) |
|
|
|
|
|
self.pipe = None |
|
self.audio_processor = AudioProcessor() |
|
self.audio_captioner = AudioCaptioner(model_size=whisper_model, device=self.device) |
|
self.prompt_fusion = MultimodalPromptFusion() |
|
|
|
logger.info(f"Initialized CompI Phase 2.A on {self.device}") |
|
|
|
def _setup_device(self, device: str) -> str: |
|
"""Setup and validate device""" |
|
if device == "auto": |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
if device == "cuda" and not torch.cuda.is_available(): |
|
logger.warning("CUDA requested but not available, falling back to CPU") |
|
device = "cpu" |
|
|
|
return device |
|
|
|
def _load_pipeline(self): |
|
"""Lazy load the Stable Diffusion pipeline""" |
|
if self.pipe is None: |
|
logger.info(f"Loading Stable Diffusion model: {self.model_name}") |
|
|
|
|
|
def dummy_safety_checker(images, **kwargs): |
|
return images, [False] * len(images) |
|
|
|
self.pipe = StableDiffusionPipeline.from_pretrained( |
|
self.model_name, |
|
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, |
|
safety_checker=dummy_safety_checker, |
|
requires_safety_checker=False |
|
) |
|
|
|
self.pipe = self.pipe.to(self.device) |
|
self.pipe.enable_attention_slicing() |
|
|
|
if self.device == "cuda": |
|
self.pipe.enable_model_cpu_offload() |
|
|
|
logger.info("Stable Diffusion pipeline loaded successfully") |
|
|
|
def analyze_audio(self, audio_path: str, include_caption: bool = True) -> Tuple[AudioFeatures, str]: |
|
""" |
|
Comprehensive audio analysis |
|
|
|
Args: |
|
audio_path: Path to audio file |
|
include_caption: Whether to generate audio caption |
|
|
|
Returns: |
|
Tuple of (AudioFeatures, audio_caption) |
|
""" |
|
logger.info(f"Analyzing audio: {audio_path}") |
|
|
|
|
|
audio_features = self.audio_processor.analyze_audio_file(audio_path) |
|
|
|
|
|
audio_caption = "" |
|
if include_caption: |
|
try: |
|
audio_caption = self.audio_captioner.caption_audio(audio_path) |
|
except Exception as e: |
|
logger.warning(f"Audio captioning failed: {e}") |
|
audio_caption = "" |
|
|
|
return audio_features, audio_caption |
|
|
|
def generate_image( |
|
self, |
|
text_prompt: str, |
|
style: str = "", |
|
mood: str = "", |
|
audio_path: Optional[str] = None, |
|
num_images: int = 1, |
|
height: int = 512, |
|
width: int = 512, |
|
num_inference_steps: int = 30, |
|
guidance_scale: float = 7.5, |
|
seed: Optional[int] = None |
|
) -> List[Dict]: |
|
""" |
|
Generate images with optional audio conditioning |
|
|
|
Args: |
|
text_prompt: Base text prompt |
|
style: Art style |
|
mood: Mood/atmosphere |
|
audio_path: Optional path to audio file for conditioning |
|
num_images: Number of images to generate |
|
height: Image height |
|
width: Image width |
|
num_inference_steps: Number of diffusion steps |
|
guidance_scale: Guidance scale for generation |
|
seed: Random seed for reproducibility |
|
|
|
Returns: |
|
List of generation results with metadata |
|
""" |
|
self._load_pipeline() |
|
|
|
|
|
audio_features = None |
|
audio_caption = "" |
|
if audio_path and os.path.exists(audio_path): |
|
audio_features, audio_caption = self.analyze_audio(audio_path) |
|
|
|
|
|
if audio_features: |
|
enhanced_prompt = self.prompt_fusion.fuse_prompt_with_audio( |
|
text_prompt, style, mood, audio_features, audio_caption |
|
) |
|
else: |
|
enhanced_prompt = text_prompt |
|
if style: |
|
enhanced_prompt += f", {style}" |
|
if mood: |
|
enhanced_prompt += f", {mood}" |
|
|
|
logger.info(f"Generating {num_images} image(s) with prompt: {enhanced_prompt}") |
|
|
|
results = [] |
|
|
|
for i in range(num_images): |
|
|
|
current_seed = seed if seed is not None else torch.seed() |
|
generator = torch.Generator(device=self.device).manual_seed(current_seed) |
|
|
|
|
|
with torch.autocast(self.device) if self.device == "cuda" else torch.no_grad(): |
|
result = self.pipe( |
|
enhanced_prompt, |
|
height=height, |
|
width=width, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
generator=generator |
|
) |
|
|
|
image = result.images[0] |
|
|
|
|
|
metadata = { |
|
"timestamp": datetime.now().isoformat(), |
|
"text_prompt": text_prompt, |
|
"style": style, |
|
"mood": mood, |
|
"enhanced_prompt": enhanced_prompt, |
|
"audio_path": audio_path, |
|
"audio_caption": audio_caption, |
|
"generation_params": { |
|
"height": height, |
|
"width": width, |
|
"num_inference_steps": num_inference_steps, |
|
"guidance_scale": guidance_scale, |
|
"seed": current_seed, |
|
"model": self.model_name |
|
}, |
|
"device": self.device, |
|
"phase": "2A_audio_to_image" |
|
} |
|
|
|
|
|
if audio_features: |
|
metadata["audio_features"] = audio_features.to_dict() |
|
metadata["audio_tags"] = self.prompt_fusion.generate_audio_tags(audio_features) |
|
|
|
|
|
filename = self._generate_filename( |
|
text_prompt, style, mood, current_seed, i + 1, |
|
has_audio=audio_path is not None |
|
) |
|
|
|
|
|
image_path = self.output_dir / f"{filename}.png" |
|
metadata_path = self.output_dir / f"{filename}_metadata.json" |
|
|
|
image.save(image_path) |
|
with open(metadata_path, 'w') as f: |
|
json.dump(metadata, f, indent=2) |
|
|
|
results.append({ |
|
"image": image, |
|
"image_path": str(image_path), |
|
"metadata_path": str(metadata_path), |
|
"metadata": metadata, |
|
"filename": filename |
|
}) |
|
|
|
logger.info(f"Generated image {i+1}/{num_images}: {filename}") |
|
|
|
return results |
|
|
|
def _generate_filename( |
|
self, |
|
prompt: str, |
|
style: str, |
|
mood: str, |
|
seed: int, |
|
variation: int, |
|
has_audio: bool = False |
|
) -> str: |
|
"""Generate descriptive filename following CompI conventions""" |
|
|
|
|
|
prompt_words = prompt.lower().replace(',', '').split()[:5] |
|
prompt_slug = "_".join(prompt_words) |
|
|
|
|
|
style_slug = style.replace(" ", "").replace(",", "")[:10] if style else "standard" |
|
mood_slug = mood.replace(" ", "").replace(",", "")[:10] if mood else "neutral" |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
audio_tag = "_AUDIO" if has_audio else "" |
|
|
|
|
|
filename = f"{prompt_slug}_{style_slug}_{mood_slug}_{timestamp}_seed{seed}{audio_tag}_v{variation}" |
|
|
|
return filename |
|
|
|
def batch_process( |
|
self, |
|
audio_directory: str, |
|
text_prompt: str, |
|
style: str = "", |
|
mood: str = "", |
|
**generation_kwargs |
|
) -> List[Dict]: |
|
""" |
|
Process multiple audio files in batch |
|
|
|
Args: |
|
audio_directory: Directory containing audio files |
|
text_prompt: Base text prompt for all generations |
|
style: Art style |
|
mood: Mood/atmosphere |
|
**generation_kwargs: Additional generation parameters |
|
|
|
Returns: |
|
List of all generation results |
|
""" |
|
audio_dir = Path(audio_directory) |
|
if not audio_dir.exists(): |
|
raise ValueError(f"Audio directory not found: {audio_directory}") |
|
|
|
|
|
audio_extensions = {'.mp3', '.wav', '.flac', '.m4a', '.ogg'} |
|
audio_files = [ |
|
f for f in audio_dir.iterdir() |
|
if f.suffix.lower() in audio_extensions |
|
] |
|
|
|
if not audio_files: |
|
raise ValueError(f"No audio files found in {audio_directory}") |
|
|
|
logger.info(f"Processing {len(audio_files)} audio files") |
|
|
|
all_results = [] |
|
for audio_file in audio_files: |
|
logger.info(f"Processing: {audio_file.name}") |
|
|
|
try: |
|
results = self.generate_image( |
|
text_prompt=text_prompt, |
|
style=style, |
|
mood=mood, |
|
audio_path=str(audio_file), |
|
**generation_kwargs |
|
) |
|
all_results.extend(results) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing {audio_file.name}: {e}") |
|
continue |
|
|
|
logger.info(f"Batch processing complete: {len(all_results)} images generated") |
|
return all_results |
|
|