File size: 16,674 Bytes
c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 9009981 c7eb758 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
import os
import io
import base64
import requests
import tempfile
from typing import Optional, Dict, Any
from PIL import Image
import numpy as np
import gradio as gr
from huggingface_hub import InferenceClient
from utils import create_temp_media_url, compress_media_for_data_uri, validate_video_html
from config import HF_TOKEN
class MediaGenerator:
"""Handles generation of images, videos, and music"""
def __init__(self):
self.hf_client = None
if HF_TOKEN:
self.hf_client = InferenceClient(
provider="auto",
api_key=HF_TOKEN,
bill_to="huggingface"
)
def generate_image_with_qwen(self, prompt: str, image_index: int = 0,
token: Optional[gr.OAuthToken] = None) -> str:
"""Generate image using Qwen image model"""
try:
if not self.hf_client:
return "Error: HF_TOKEN environment variable is not set."
print(f"[ImageGen] Generating image with prompt: {prompt}")
# Generate image using Qwen/Qwen-Image model
image = self.hf_client.text_to_image(
prompt,
model="Qwen/Qwen-Image",
)
# Resize image to reduce size while maintaining quality
max_size = 1024
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to bytes
buffer = io.BytesIO()
image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True)
image_bytes = buffer.getvalue()
# Create temporary URL
filename = f"generated_image_{image_index}.jpg"
temp_url = self._upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
if temp_url.startswith("Error"):
return temp_url
return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
except Exception as e:
print(f"Image generation error: {str(e)}")
return f"Error generating image: {str(e)}"
def generate_image_to_image(self, input_image_data, prompt: str,
token: Optional[gr.OAuthToken] = None) -> str:
"""Generate image using image-to-image with Qwen-Image-Edit"""
try:
if not self.hf_client:
return "Error: HF_TOKEN environment variable is not set."
print(f"[Image2Image] Processing with prompt: {prompt}")
# Normalize input image to bytes
pil_image = self._process_input_image(input_image_data)
# Resize input image to avoid request body size limits
max_input_size = 1024
if pil_image.width > max_input_size or pil_image.height > max_input_size:
pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS)
# Convert to bytes
buf = io.BytesIO()
pil_image.save(buf, format='JPEG', quality=85, optimize=True)
input_bytes = buf.getvalue()
# Call image-to-image
image = self.hf_client.image_to_image(
input_bytes,
prompt=prompt,
model="Qwen/Qwen-Image-Edit",
)
# Resize and optimize output
max_size = 1024
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
out_buf = io.BytesIO()
image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True)
image_bytes = out_buf.getvalue()
# Create temporary URL
filename = "image_to_image_result.jpg"
temp_url = self._upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
if temp_url.startswith("Error"):
return temp_url
return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
except Exception as e:
print(f"Image-to-image generation error: {str(e)}")
return f"Error generating image (image-to-image): {str(e)}"
def generate_video_from_image(self, input_image_data, prompt: str,
session_id: Optional[str] = None,
token: Optional[gr.OAuthToken] = None) -> str:
"""Generate video from input image using Lightricks LTX-Video"""
try:
print("[Image2Video] Starting video generation")
if not self.hf_client:
return "Error: HF_TOKEN environment variable is not set."
# Process input image
pil_image = self._process_input_image(input_image_data)
print(f"[Image2Video] Input image size: {pil_image.size}")
# Compress image for API limits
input_bytes = self._compress_image_for_video(pil_image, max_size_mb=3.9)
# Check for image-to-video method
image_to_video_method = getattr(self.hf_client, "image_to_video", None)
if not callable(image_to_video_method):
return ("Error: Your huggingface_hub version does not support image_to_video. "
"Please upgrade with `pip install -U huggingface_hub`")
model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled"
print(f"[Image2Video] Calling API with model: {model_id}")
video_bytes = image_to_video_method(
input_bytes,
prompt=prompt,
model=model_id,
)
print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown'}")
# Create temporary URL
filename = "image_to_video_result.mp4"
temp_url = self._upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
if temp_url.startswith("Error"):
return temp_url
video_html = self._create_video_html(temp_url)
if not validate_video_html(video_html):
return "Error: Generated video HTML is malformed"
print(f"[Image2Video] Successfully generated video: {temp_url}")
return video_html
except Exception as e:
print(f"[Image2Video] Error: {str(e)}")
return f"Error generating video (image-to-video): {str(e)}"
def generate_video_from_text(self, prompt: str, session_id: Optional[str] = None,
token: Optional[gr.OAuthToken] = None) -> str:
"""Generate video from text prompt using Wan-AI text-to-video model"""
try:
print("[Text2Video] Starting video generation")
if not self.hf_client:
return "Error: HF_TOKEN environment variable is not set."
# Check for text-to-video method
text_to_video_method = getattr(self.hf_client, "text_to_video", None)
if not callable(text_to_video_method):
return ("Error: Your huggingface_hub version does not support text_to_video. "
"Please upgrade with `pip install -U huggingface_hub`")
model_id = "Wan-AI/Wan2.2-T2V-A14B"
prompt_str = (prompt or "").strip()
print(f"[Text2Video] Using model: {model_id}, prompt length: {len(prompt_str)}")
video_bytes = text_to_video_method(
prompt_str,
model=model_id,
)
print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown'}")
# Create temporary URL
filename = "text_to_video_result.mp4"
temp_url = self._upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
if temp_url.startswith("Error"):
return temp_url
video_html = self._create_video_html(temp_url)
if not validate_video_html(video_html):
return "Error: Generated video HTML is malformed"
print(f"[Text2Video] Successfully generated video: {temp_url}")
return video_html
except Exception as e:
print(f"[Text2Video] Error: {str(e)}")
return f"Error generating video (text-to-video): {str(e)}"
def generate_music_from_text(self, prompt: str, music_length_ms: int = 30000,
session_id: Optional[str] = None,
token: Optional[gr.OAuthToken] = None) -> str:
"""Generate music using ElevenLabs Music API"""
try:
api_key = os.getenv('ELEVENLABS_API_KEY')
if not api_key:
return "Error: ELEVENLABS_API_KEY environment variable is not set."
print(f"[MusicGen] Generating music: {prompt}")
headers = {
'Content-Type': 'application/json',
'xi-api-key': api_key,
}
payload = {
'prompt': prompt or 'Epic orchestral theme with soaring strings and powerful brass',
'music_length_ms': int(music_length_ms) if music_length_ms else 30000,
}
resp = requests.post(
'https://api.elevenlabs.io/v1/music/compose',
headers=headers,
json=payload,
timeout=60
)
try:
resp.raise_for_status()
except Exception as e:
error_text = getattr(e, 'response', resp).text if hasattr(e, 'response') else resp.text
return f"Error generating music: {error_text}"
# Create temporary URL
filename = "generated_music.mp3"
temp_url = self._upload_media_to_hf(resp.content, filename, "audio", token, use_temp=True)
if temp_url.startswith("Error"):
return temp_url
audio_html = self._create_audio_html(temp_url)
print(f"[MusicGen] Successfully generated music: {temp_url}")
return audio_html
except Exception as e:
print(f"[MusicGen] Error: {str(e)}")
return f"Error generating music: {str(e)}"
def _process_input_image(self, input_image_data) -> Image.Image:
"""Convert various image formats to PIL Image"""
if hasattr(input_image_data, 'read'):
raw = input_image_data.read()
pil_image = Image.open(io.BytesIO(raw))
elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'):
pil_image = input_image_data
elif isinstance(input_image_data, np.ndarray):
pil_image = Image.fromarray(input_image_data)
elif isinstance(input_image_data, (bytes, bytearray)):
pil_image = Image.open(io.BytesIO(input_image_data))
else:
pil_image = Image.open(io.BytesIO(bytes(input_image_data)))
# Ensure RGB
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
return pil_image
def _compress_image_for_video(self, pil_image: Image.Image, max_size_mb: float = 3.9) -> bytes:
"""Compress image for video generation API limits"""
MAX_BYTES = int(max_size_mb * 1024 * 1024)
max_dim = 1024
quality = 90
def encode_current(pil: Image.Image, q: int) -> bytes:
tmp = io.BytesIO()
pil.save(tmp, format='JPEG', quality=q, optimize=True)
return tmp.getvalue()
# Downscale while too large
while max(pil_image.size) > max_dim:
ratio = max_dim / float(max(pil_image.size))
new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio)))
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
encoded = encode_current(pil_image, quality)
# Reduce quality or dimensions if still too large
while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640):
if quality > 40:
quality -= 10
else:
new_w = max(1, int(pil_image.size[0] * 0.85))
new_h = max(1, int(pil_image.size[1] * 0.85))
pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
encoded = encode_current(pil_image, quality)
return encoded
def _upload_media_to_hf(self, media_bytes: bytes, filename: str, media_type: str,
token: Optional[gr.OAuthToken] = None, use_temp: bool = True) -> str:
"""Upload media to HF or create temporary file"""
if use_temp:
return create_temp_media_url(media_bytes, filename, media_type)
# HF upload logic would go here for permanent URLs
# For now, always use temp files
return create_temp_media_url(media_bytes, filename, media_type)
def _create_video_html(self, video_url: str) -> str:
"""Create HTML video element"""
return f'''<video controls autoplay muted loop playsinline
style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;"
onloadstart="this.style.backgroundColor='#f0f0f0'"
onerror="this.style.display='none'; console.error('Video failed to load')">
<source src="{video_url}" type="video/mp4" />
<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>
</video>'''
def _create_audio_html(self, audio_url: str) -> str:
"""Create HTML audio player"""
return f'''<div class="anycoder-music" style="max-width:420px;margin:16px auto;padding:12px 16px;border:1px solid #e5e7eb;border-radius:12px;background:linear-gradient(180deg,#fafafa,#f3f4f6);box-shadow:0 2px 8px rgba(0,0,0,0.06)">
<div style="font-size:13px;color:#374151;margin-bottom:8px;display:flex;align-items:center;gap:6px">
<span>🎵 Generated music</span>
</div>
<audio controls autoplay loop style="width:100%;outline:none;">
<source src="{audio_url}" type="audio/mpeg" />
Your browser does not support the audio element.
</audio>
</div>'''
# Global media generator instance
media_generator = MediaGenerator()
# Export main functions
def generate_image_with_qwen(prompt: str, image_index: int = 0, token: Optional[gr.OAuthToken] = None) -> str:
return media_generator.generate_image_with_qwen(prompt, image_index, token)
def generate_image_to_image(input_image_data, prompt: str, token: Optional[gr.OAuthToken] = None) -> str:
return media_generator.generate_image_to_image(input_image_data, prompt, token)
def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None,
token: Optional[gr.OAuthToken] = None) -> str:
return media_generator.generate_video_from_image(input_image_data, prompt, session_id, token)
def generate_video_from_text(prompt: str, session_id: Optional[str] = None,
token: Optional[gr.OAuthToken] = None) -> str:
return media_generator.generate_video_from_text(prompt, session_id, token)
def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None,
token: Optional[gr.OAuthToken] = None) -> str:
return media_generator.generate_music_from_text(prompt, music_length_ms, session_id, token) |