mgbam commited on
Commit
9009981
·
verified ·
1 Parent(s): 0730dbb

Create media_processing.py

Browse files
Files changed (1) hide show
  1. media_processing.py +1167 -0
media_processing.py ADDED
@@ -0,0 +1,1167 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import base64
3
+ import cv2
4
+ import numpy as np
5
+ from PIL import Image
6
+ import pytesseract
7
+ import requests
8
+ from urllib.parse import urlparse, urljoin
9
+ from bs4 import BeautifulSoup
10
+ import html2text
11
+ import json
12
+ import time
13
+ import webbrowser
14
+ import urllib.parse
15
+ import copy
16
+ import html
17
+ import tempfile
18
+ import uuid
19
+ import datetime
20
+ import threading
21
+ import atexit
22
+ from huggingface_hub import HfApi
23
+ import gradio as gr
24
+ import subprocess
25
+ import re
26
+
27
+ # ---------------------------------------------------------------------------
28
+ # Video temp-file management (per-session tracking and cleanup)
29
+ # ---------------------------------------------------------------------------
30
+ VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos")
31
+ VIDEO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
32
+ _SESSION_VIDEO_FILES: Dict[str, List[str]] = {}
33
+ _VIDEO_FILES_LOCK = threading.Lock()
34
+
35
+ def _ensure_video_dir_exists() -> None:
36
+ try:
37
+ os.makedirs(VIDEO_TEMP_DIR, exist_ok=True)
38
+ except Exception:
39
+ pass
40
+
41
+ def _register_video_for_session(session_id: Optional[str], file_path: str) -> None:
42
+ if not session_id or not file_path:
43
+ return
44
+ with _VIDEO_FILES_LOCK:
45
+ if session_id not in _SESSION_VIDEO_FILES:
46
+ _SESSION_VIDEO_FILES[session_id] = []
47
+ _SESSION_VIDEO_FILES[session_id].append(file_path)
48
+
49
+ def cleanup_session_videos(session_id: Optional[str]) -> None:
50
+ if not session_id:
51
+ return
52
+ with _VIDEO_FILES_LOCK:
53
+ file_list = _SESSION_VIDEO_FILES.pop(session_id, [])
54
+ for path in file_list:
55
+ try:
56
+ if path and os.path.exists(path):
57
+ os.unlink(path)
58
+ except Exception:
59
+ # Best-effort cleanup
60
+ pass
61
+
62
+ def reap_old_videos(ttl_seconds: int = VIDEO_FILE_TTL_SECONDS) -> None:
63
+ """Delete old video files in the temp directory based on modification time."""
64
+ try:
65
+ _ensure_video_dir_exists()
66
+ now_ts = time.time()
67
+ for name in os.listdir(VIDEO_TEMP_DIR):
68
+ path = os.path.join(VIDEO_TEMP_DIR, name)
69
+ try:
70
+ if not os.path.isfile(path):
71
+ continue
72
+ mtime = os.path.getmtime(path)
73
+ if now_ts - mtime > ttl_seconds:
74
+ os.unlink(path)
75
+ except Exception:
76
+ pass
77
+ except Exception:
78
+ # Temp dir might not exist or be accessible; ignore
79
+ pass
80
+
81
+ # ---------------------------------------------------------------------------
82
+ # Audio temp-file management (per-session tracking and cleanup)
83
+ # ---------------------------------------------------------------------------
84
+ AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio")
85
+ AUDIO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
86
+ _SESSION_AUDIO_FILES: Dict[str, List[str]] = {}
87
+ _AUDIO_FILES_LOCK = threading.Lock()
88
+
89
+ def _ensure_audio_dir_exists() -> None:
90
+ try:
91
+ os.makedirs(AUDIO_TEMP_DIR, exist_ok=True)
92
+ except Exception:
93
+ pass
94
+
95
+ def _register_audio_for_session(session_id: Optional[str], file_path: str) -> None:
96
+ if not session_id or not file_path:
97
+ return
98
+ with _AUDIO_FILES_LOCK:
99
+ if session_id not in _SESSION_AUDIO_FILES:
100
+ _SESSION_AUDIO_FILES[session_id] = []
101
+ _SESSION_AUDIO_FILES[session_id].append(file_path)
102
+
103
+ def cleanup_session_audio(session_id: Optional[str]) -> None:
104
+ if not session_id:
105
+ return
106
+ with _AUDIO_FILES_LOCK:
107
+ file_list = _SESSION_AUDIO_FILES.pop(session_id, [])
108
+ for path in file_list:
109
+ try:
110
+ if path and os.path.exists(path):
111
+ os.unlink(path)
112
+ except Exception:
113
+ pass
114
+
115
+ def reap_old_audio(ttl_seconds: int = AUDIO_FILE_TTL_SECONDS) -> None:
116
+ try:
117
+ _ensure_audio_dir_exists()
118
+ now_ts = time.time()
119
+ for name in os.listdir(AUDIO_TEMP_DIR):
120
+ path = os.path.join(AUDIO_TEMP_DIR, name)
121
+ try:
122
+ if not os.path.isfile(path):
123
+ continue
124
+ mtime = os.path.getmtime(path)
125
+ if now_ts - mtime > ttl_seconds:
126
+ os.unlink(path)
127
+ except Exception:
128
+ pass
129
+ except Exception:
130
+ pass
131
+
132
+ # ---------------------------------------------------------------------------
133
+ # General temp media file management (per-session tracking and cleanup)
134
+ # ---------------------------------------------------------------------------
135
+ MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media")
136
+ MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
137
+ _SESSION_MEDIA_FILES: Dict[str, List[str]] = {}
138
+ _MEDIA_FILES_LOCK = threading.Lock()
139
+
140
+ # Global dictionary to store temporary media files for the session
141
+ temp_media_files = {}
142
+
143
+ def _ensure_media_dir_exists() -> None:
144
+ """Ensure the media temp directory exists."""
145
+ try:
146
+ os.makedirs(MEDIA_TEMP_DIR, exist_ok=True)
147
+ except Exception:
148
+ pass
149
+
150
+ def track_session_media_file(session_id: Optional[str], file_path: str) -> None:
151
+ """Track a media file for session-based cleanup."""
152
+ if not session_id or not file_path:
153
+ return
154
+ with _MEDIA_FILES_LOCK:
155
+ if session_id not in _SESSION_MEDIA_FILES:
156
+ _SESSION_MEDIA_FILES[session_id] = []
157
+ _SESSION_MEDIA_FILES[session_id].append(file_path)
158
+
159
+ def cleanup_session_media(session_id: Optional[str]) -> None:
160
+ """Clean up media files for a specific session."""
161
+ if not session_id:
162
+ return
163
+ with _MEDIA_FILES_LOCK:
164
+ files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, [])
165
+
166
+ for path in files_to_clean:
167
+ try:
168
+ if path and os.path.exists(path):
169
+ os.unlink(path)
170
+ except Exception:
171
+ # Best-effort cleanup
172
+ pass
173
+
174
+ def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None:
175
+ """Delete old media files in the temp directory based on modification time."""
176
+ try:
177
+ _ensure_media_dir_exists()
178
+ now_ts = time.time()
179
+ for name in os.listdir(MEDIA_TEMP_DIR):
180
+ path = os.path.join(MEDIA_TEMP_DIR, name)
181
+ if os.path.isfile(path):
182
+ try:
183
+ mtime = os.path.getmtime(path)
184
+ if (now_ts - mtime) > ttl_seconds:
185
+ os.unlink(path)
186
+ except Exception:
187
+ pass
188
+ except Exception:
189
+ # Temp dir might not exist or be accessible; ignore
190
+ pass
191
+
192
+ def cleanup_all_temp_media_on_startup() -> None:
193
+ """Clean up all temporary media files on app startup."""
194
+ try:
195
+ # Clean up temp_media_files registry
196
+ temp_media_files.clear()
197
+
198
+ # Clean up actual files from disk (assume all are orphaned on startup)
199
+ _ensure_media_dir_exists()
200
+ for name in os.listdir(MEDIA_TEMP_DIR):
201
+ path = os.path.join(MEDIA_TEMP_DIR, name)
202
+ if os.path.isfile(path):
203
+ try:
204
+ os.unlink(path)
205
+ except Exception:
206
+ pass
207
+
208
+ # Clear session tracking
209
+ with _MEDIA_FILES_LOCK:
210
+ _SESSION_MEDIA_FILES.clear()
211
+
212
+ print("[StartupCleanup] Cleaned up orphaned temporary media files")
213
+ except Exception as e:
214
+ print(f"[StartupCleanup] Error during media cleanup: {str(e)}")
215
+
216
+ def cleanup_all_temp_media_on_shutdown() -> None:
217
+ """Clean up all temporary media files on app shutdown."""
218
+ try:
219
+ print("[ShutdownCleanup] Cleaning up temporary media files...")
220
+
221
+ # Clean up temp_media_files registry and remove files
222
+ for file_id, file_info in temp_media_files.items():
223
+ try:
224
+ if os.path.exists(file_info['path']):
225
+ os.unlink(file_info['path'])
226
+ except Exception:
227
+ pass
228
+ temp_media_files.clear()
229
+
230
+ # Clean up all session files
231
+ with _MEDIA_FILES_LOCK:
232
+ for session_id, file_paths in _SESSION_MEDIA_FILES.items():
233
+ for path in file_paths:
234
+ try:
235
+ if path and os.path.exists(path):
236
+ os.unlink(path)
237
+ except Exception:
238
+ pass
239
+ _SESSION_MEDIA_FILES.clear()
240
+
241
+ print("[ShutdownCleanup] Temporary media cleanup completed")
242
+ except Exception as e:
243
+ print(f"[ShutdownCleanup] Error during cleanup: {str(e)}")
244
+
245
+ # Register shutdown cleanup handler
246
+ atexit.register(cleanup_all_temp_media_on_shutdown)
247
+
248
+ def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str:
249
+ """Create a temporary file and return a local URL for preview."""
250
+ try:
251
+ # Create unique filename with timestamp and UUID
252
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
253
+ unique_id = str(uuid.uuid4())[:8]
254
+ base_name, ext = os.path.splitext(filename)
255
+ unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}"
256
+
257
+ # Create temporary file in the dedicated directory
258
+ _ensure_media_dir_exists()
259
+ temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename)
260
+
261
+ # Write media bytes to temporary file
262
+ with open(temp_path, 'wb') as f:
263
+ f.write(media_bytes)
264
+
265
+ # Track file for session-based cleanup
266
+ if session_id:
267
+ track_session_media_file(session_id, temp_path)
268
+
269
+ # Store the file info for later upload
270
+ file_id = f"{media_type}_{unique_id}"
271
+ temp_media_files[file_id] = {
272
+ 'path': temp_path,
273
+ 'filename': filename,
274
+ 'media_type': media_type,
275
+ 'media_bytes': media_bytes
276
+ }
277
+
278
+ # Return file:// URL for preview
279
+ file_url = f"file://{temp_path}"
280
+ print(f"[TempMedia] Created temporary {media_type} file: {file_url}")
281
+ return file_url
282
+
283
+ except Exception as e:
284
+ print(f"[TempMedia] Failed to create temporary file: {str(e)}")
285
+ return f"Error creating temporary {media_type} file: {str(e)}"
286
+
287
+ def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str:
288
+ """Upload media file to user's Hugging Face account or create temporary file."""
289
+ try:
290
+ # If use_temp is True, create temporary file for preview
291
+ if use_temp:
292
+ return create_temp_media_url(media_bytes, filename, media_type)
293
+
294
+ # Otherwise, upload to Hugging Face for permanent URL
295
+ # Try to get token from OAuth first, then fall back to environment variable
296
+ hf_token = None
297
+ if token and token.token:
298
+ hf_token = token.token
299
+ else:
300
+ hf_token = os.getenv('HF_TOKEN')
301
+
302
+ if not hf_token:
303
+ return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable."
304
+
305
+ # Initialize HF API
306
+ api = HfApi(token=hf_token)
307
+
308
+ # Get current user info to determine username
309
+ try:
310
+ user_info = api.whoami()
311
+ username = user_info.get('name', 'unknown-user')
312
+ except Exception as e:
313
+ print(f"[HFUpload] Could not get user info: {e}")
314
+ username = 'anycoder-user'
315
+
316
+ # Create repository name for media storage
317
+ repo_name = f"{username}/anycoder-media"
318
+
319
+ # Try to create the repository if it doesn't exist
320
+ try:
321
+ api.create_repo(
322
+ repo_id=repo_name,
323
+ repo_type="dataset",
324
+ private=False,
325
+ exist_ok=True
326
+ )
327
+ print(f"[HFUpload] Repository {repo_name} ready")
328
+ except Exception as e:
329
+ print(f"[HFUpload] Repository creation/access issue: {e}")
330
+ # Continue anyway, repo might already exist
331
+
332
+ # Create unique filename with timestamp and UUID
333
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
334
+ unique_id = str(uuid.uuid4())[:8]
335
+ base_name, ext = os.path.splitext(filename)
336
+ unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}"
337
+
338
+ # Create temporary file for upload
339
+ with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
340
+ temp_file.write(media_bytes)
341
+ temp_path = temp_file.name
342
+
343
+ try:
344
+ # Upload file to HF repository
345
+ api.upload_file(
346
+ path_or_fileobj=temp_path,
347
+ path_in_repo=unique_filename,
348
+ repo_id=repo_name,
349
+ repo_type="dataset",
350
+ commit_message=f"Upload {media_type} generated by AnyCoder"
351
+ )
352
+
353
+ # Generate permanent URL
354
+ permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}"
355
+ print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}")
356
+ return permanent_url
357
+
358
+ finally:
359
+ # Clean up temporary file
360
+ try:
361
+ os.unlink(temp_path)
362
+ except Exception:
363
+ pass
364
+
365
+ except Exception as e:
366
+ print(f"[HFUpload] Upload failed: {str(e)}")
367
+ return f"Error uploading {media_type} to Hugging Face: {str(e)}"
368
+
369
+ def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str:
370
+ """Upload all temporary media files to HF and replace their URLs in HTML content."""
371
+ try:
372
+ if not temp_media_files:
373
+ print("[DeployUpload] No temporary media files to upload")
374
+ return html_content
375
+
376
+ print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF")
377
+ updated_content = html_content
378
+
379
+ for file_id, file_info in temp_media_files.items():
380
+ try:
381
+ # Upload to HF with permanent URL
382
+ permanent_url = upload_media_to_hf(
383
+ file_info['media_bytes'],
384
+ file_info['filename'],
385
+ file_info['media_type'],
386
+ token,
387
+ use_temp=False # Force permanent upload
388
+ )
389
+
390
+ if not permanent_url.startswith("Error"):
391
+ # Replace the temporary file URL with permanent URL
392
+ temp_url = f"file://{file_info['path']}"
393
+ updated_content = updated_content.replace(temp_url, permanent_url)
394
+ print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}")
395
+ else:
396
+ print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}")
397
+
398
+ except Exception as e:
399
+ print(f"[DeployUpload] Error uploading {file_id}: {str(e)}")
400
+ continue
401
+
402
+ # Clean up temporary files after upload
403
+ cleanup_temp_media_files()
404
+
405
+ return updated_content
406
+
407
+ except Exception as e:
408
+ print(f"[DeployUpload] Failed to upload temporary files: {str(e)}")
409
+ return html_content
410
+
411
+ def cleanup_temp_media_files():
412
+ """Clean up temporary media files from disk and memory."""
413
+ try:
414
+ for file_id, file_info in temp_media_files.items():
415
+ try:
416
+ if os.path.exists(file_info['path']):
417
+ os.remove(file_info['path'])
418
+ print(f"[TempCleanup] Removed {file_info['path']}")
419
+ except Exception as e:
420
+ print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}")
421
+
422
+ # Clear the global dictionary
423
+ temp_media_files.clear()
424
+ print("[TempCleanup] Cleared temporary media files registry")
425
+
426
+ except Exception as e:
427
+ print(f"[TempCleanup] Error during cleanup: {str(e)}")
428
+
429
+ def generate_image_with_qwen(prompt: str, image_index: int = 0, token: gr.OAuthToken | None = None) -> str:
430
+ """Generate image using Qwen image model via Hugging Face InferenceClient and upload to HF for permanent URL"""
431
+ try:
432
+ # Check if HF_TOKEN is available
433
+ if not os.getenv('HF_TOKEN'):
434
+ return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
435
+
436
+ # Create InferenceClient for Qwen image generation
437
+ client = InferenceClient(
438
+ provider="auto",
439
+ api_key=os.getenv('HF_TOKEN'),
440
+ bill_to="huggingface",
441
+ )
442
+
443
+ # Generate image using Qwen/Qwen-Image model
444
+ image = client.text_to_image(
445
+ prompt,
446
+ model="Qwen/Qwen-Image",
447
+ )
448
+
449
+ # Resize image to reduce size while maintaining quality
450
+ max_size = 1024 # Increased size since we're not using data URIs
451
+ if image.width > max_size or image.height > max_size:
452
+ image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
453
+
454
+ # Convert PIL Image to bytes for upload
455
+ import io
456
+ buffer = io.BytesIO()
457
+ # Save as JPEG with good quality since we're not embedding
458
+ image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True)
459
+ image_bytes = buffer.getvalue()
460
+
461
+ # Create temporary URL for preview (will be uploaded to HF during deploy)
462
+ filename = f"generated_image_{image_index}.jpg"
463
+ temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
464
+
465
+ # Check if creation was successful
466
+ if temp_url.startswith("Error"):
467
+ return temp_url
468
+
469
+ # Return HTML img tag with temporary URL
470
+ return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
471
+
472
+ except Exception as e:
473
+ print(f"Image generation error: {str(e)}")
474
+ return f"Error generating image: {str(e)}"
475
+
476
+ def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str:
477
+ """Generate an image using image-to-image with Qwen-Image-Edit via Hugging Face InferenceClient."""
478
+ try:
479
+ # Check token
480
+ if not os.getenv('HF_TOKEN'):
481
+ return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
482
+
483
+ # Prepare client
484
+ client = InferenceClient(
485
+ provider="auto",
486
+ api_key=os.getenv('HF_TOKEN'),
487
+ bill_to="huggingface",
488
+ )
489
+
490
+ # Normalize input image to bytes
491
+ import io
492
+ from PIL import Image
493
+ try:
494
+ import numpy as np
495
+ except Exception:
496
+ np = None
497
+
498
+ if hasattr(input_image_data, 'read'):
499
+ # File-like object
500
+ raw = input_image_data.read()
501
+ pil_image = Image.open(io.BytesIO(raw))
502
+ elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'):
503
+ # PIL Image
504
+ pil_image = input_image_data
505
+ elif np is not None and isinstance(input_image_data, np.ndarray):
506
+ pil_image = Image.fromarray(input_image_data)
507
+ elif isinstance(input_image_data, (bytes, bytearray)):
508
+ pil_image = Image.open(io.BytesIO(input_image_data))
509
+ else:
510
+ # Fallback: try to convert via bytes
511
+ pil_image = Image.open(io.BytesIO(bytes(input_image_data)))
512
+
513
+ # Ensure RGB
514
+ if pil_image.mode != 'RGB':
515
+ pil_image = pil_image.convert('RGB')
516
+
517
+ # Resize input image to avoid request body size limits
518
+ max_input_size = 1024
519
+ if pil_image.width > max_input_size or pil_image.height > max_input_size:
520
+ pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS)
521
+
522
+ buf = io.BytesIO()
523
+ pil_image.save(buf, format='JPEG', quality=85, optimize=True)
524
+ input_bytes = buf.getvalue()
525
+
526
+ # Call image-to-image
527
+ image = client.image_to_image(
528
+ input_bytes,
529
+ prompt=prompt,
530
+ model="Qwen/Qwen-Image-Edit",
531
+ )
532
+
533
+ # Resize/optimize (larger since not using data URIs)
534
+ max_size = 1024
535
+ if image.width > max_size or image.height > max_size:
536
+ image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
537
+
538
+ out_buf = io.BytesIO()
539
+ image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True)
540
+ image_bytes = out_buf.getvalue()
541
+
542
+ # Create temporary URL for preview (will be uploaded to HF during deploy)
543
+ filename = "image_to_image_result.jpg"
544
+ temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
545
+
546
+ # Check if creation was successful
547
+ if temp_url.startswith("Error"):
548
+ return temp_url
549
+
550
+ return f"<img src=\"{temp_url}\" alt=\"{prompt}\" style=\"max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;\" loading=\"lazy\" />"
551
+ except Exception as e:
552
+ print(f"Image-to-image generation error: {str(e)}")
553
+ return f"Error generating image (image-to-image): {str(e)}"
554
+
555
+ def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
556
+ """Generate a video from an input image and prompt using Hugging Face InferenceClient."""
557
+ try:
558
+ print("[Image2Video] Starting video generation")
559
+ if not os.getenv('HF_TOKEN'):
560
+ print("[Image2Video] Missing HF_TOKEN")
561
+ return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
562
+
563
+ # Prepare client
564
+ client = InferenceClient(
565
+ provider="auto",
566
+ api_key=os.getenv('HF_TOKEN'),
567
+ bill_to="huggingface",
568
+ )
569
+ print(f"[Image2Video] InferenceClient initialized (provider=auto)")
570
+
571
+ # Normalize input image to bytes, with downscale/compress to cap request size
572
+ import io
573
+ from PIL import Image
574
+ try:
575
+ import numpy as np
576
+ except Exception:
577
+ np = None
578
+
579
+ def _load_pil(img_like) -> Image.Image:
580
+ if hasattr(img_like, 'read'):
581
+ return Image.open(io.BytesIO(img_like.read()))
582
+ if hasattr(img_like, 'mode') and hasattr(img_like, 'size'):
583
+ return img_like
584
+ if np is not None and isinstance(img_like, np.ndarray):
585
+ return Image.fromarray(img_like)
586
+ if isinstance(img_like, (bytes, bytearray)):
587
+ return Image.open(io.BytesIO(img_like))
588
+ return Image.open(io.BytesIO(bytes(img_like)))
589
+
590
+ pil_image = _load_pil(input_image_data)
591
+ if pil_image.mode != 'RGB':
592
+ pil_image = pil_image.convert('RGB')
593
+ try:
594
+ print(f"[Image2Video] Input PIL image size={pil_image.size} mode={pil_image.mode}")
595
+ except Exception:
596
+ pass
597
+
598
+ # Progressive encode to keep payload under ~3.9MB (below 4MB limit)
599
+ MAX_BYTES = 3_900_000
600
+ max_dim = 1024 # initial cap on longest edge
601
+ quality = 90
602
+
603
+ def encode_current(pil: Image.Image, q: int) -> bytes:
604
+ tmp = io.BytesIO()
605
+ pil.save(tmp, format='JPEG', quality=q, optimize=True)
606
+ return tmp.getvalue()
607
+
608
+ # Downscale while the longest edge exceeds max_dim
609
+ while max(pil_image.size) > max_dim:
610
+ ratio = max_dim / float(max(pil_image.size))
611
+ new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio)))
612
+ pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
613
+
614
+ encoded = encode_current(pil_image, quality)
615
+ # If still too big, iteratively reduce quality, then dimensions
616
+ while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640):
617
+ if quality > 40:
618
+ quality -= 10
619
+ else:
620
+ # reduce dims by 15% if already at low quality
621
+ new_w = max(1, int(pil_image.size[0] * 0.85))
622
+ new_h = max(1, int(pil_image.size[1] * 0.85))
623
+ pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
624
+ encoded = encode_current(pil_image, quality)
625
+
626
+ input_bytes = encoded
627
+
628
+ # Call image-to-video; require method support
629
+ model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled"
630
+ image_to_video_method = getattr(client, "image_to_video", None)
631
+ if not callable(image_to_video_method):
632
+ print("[Image2Video] InferenceClient.image_to_video not available in this huggingface_hub version")
633
+ return (
634
+ "Error generating video (image-to-video): Your installed huggingface_hub version "
635
+ "does not expose InferenceClient.image_to_video. Please upgrade with "
636
+ "`pip install -U huggingface_hub` and try again."
637
+ )
638
+ print(f"[Image2Video] Calling image_to_video with model={model_id}, prompt length={len(prompt or '')}")
639
+ video_bytes = image_to_video_method(
640
+ input_bytes,
641
+ prompt=prompt,
642
+ model=model_id,
643
+ )
644
+ print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
645
+
646
+ # Create temporary URL for preview (will be uploaded to HF during deploy)
647
+ filename = "image_to_video_result.mp4"
648
+ temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
649
+
650
+ # Check if creation was successful
651
+ if temp_url.startswith("Error"):
652
+ return temp_url
653
+
654
+ video_html = (
655
+ f'<video controls autoplay muted loop playsinline '
656
+ f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" '
657
+ f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" '
658
+ f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">'
659
+ f'<source src="{temp_url}" type="video/mp4" />'
660
+ f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>'
661
+ f'</video>'
662
+ )
663
+
664
+ print(f"[Image2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
665
+
666
+ # Validate the generated video HTML
667
+ if not validate_video_html(video_html):
668
+ print("[Image2Video] Generated video HTML failed validation")
669
+ return "Error: Generated video HTML is malformed"
670
+
671
+ return video_html
672
+ except Exception as e:
673
+ import traceback
674
+ print("[Image2Video] Exception during generation:")
675
+ traceback.print_exc()
676
+ print(f"Image-to-video generation error: {str(e)}")
677
+ return f"Error generating video (image-to-video): {str(e)}"
678
+
679
+ def generate_video_from_text(prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
680
+ """Generate a video from a text prompt using Hugging Face InferenceClient."""
681
+ try:
682
+ print("[Text2Video] Starting video generation from text")
683
+ if not os.getenv('HF_TOKEN'):
684
+ print("[Text2Video] Missing HF_TOKEN")
685
+ return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
686
+
687
+ client = InferenceClient(
688
+ provider="auto",
689
+ api_key=os.getenv('HF_TOKEN'),
690
+ bill_to="huggingface",
691
+ )
692
+ print("[Text2Video] InferenceClient initialized (provider=auto)")
693
+
694
+ # Ensure the client has text_to_video (newer huggingface_hub)
695
+ text_to_video_method = getattr(client, "text_to_video", None)
696
+ if not callable(text_to_video_method):
697
+ print("[Text2Video] InferenceClient.text_to_video not available in this huggingface_hub version")
698
+ return (
699
+ "Error generating video (text-to-video): Your installed huggingface_hub version "
700
+ "does not expose InferenceClient.text_to_video. Please upgrade with "
701
+ "`pip install -U huggingface_hub` and try again."
702
+ )
703
+
704
+ model_id = "Wan-AI/Wan2.2-T2V-A14B"
705
+ prompt_str = (prompt or "").strip()
706
+ print(f"[Text2Video] Calling text_to_video with model={model_id}, prompt length={len(prompt_str)}")
707
+ video_bytes = text_to_video_method(
708
+ prompt_str,
709
+ model=model_id,
710
+ )
711
+ print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
712
+
713
+ # Create temporary URL for preview (will be uploaded to HF during deploy)
714
+ filename = "text_to_video_result.mp4"
715
+ temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
716
+
717
+ # Check if creation was successful
718
+ if temp_url.startswith("Error"):
719
+ return temp_url
720
+
721
+ video_html = (
722
+ f'<video controls autoplay muted loop playsinline '
723
+ f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" '
724
+ f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" '
725
+ f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">'
726
+ f'<source src="{temp_url}" type="video/mp4" />'
727
+ f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>'
728
+ f'</video>'
729
+ )
730
+
731
+ print(f"[Text2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
732
+
733
+ # Validate the generated video HTML
734
+ if not validate_video_html(video_html):
735
+ print("[Text2Video] Generated video HTML failed validation")
736
+ return "Error: Generated video HTML is malformed"
737
+
738
+ return video_html
739
+ except Exception as e:
740
+ import traceback
741
+ print("[Text2Video] Exception during generation:")
742
+ traceback.print_exc()
743
+ print(f"Text-to-video generation error: {str(e)}")
744
+ return f"Error generating video (text-to-video): {str(e)}"
745
+
746
+ def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
747
+ """Generate music from a text prompt using ElevenLabs Music API and return an HTML <audio> tag."""
748
+ try:
749
+ api_key = os.getenv('ELEVENLABS_API_KEY')
750
+ if not api_key:
751
+ return "Error: ELEVENLABS_API_KEY environment variable is not set."
752
+
753
+ headers = {
754
+ 'Content-Type': 'application/json',
755
+ 'xi-api-key': api_key,
756
+ }
757
+ payload = {
758
+ 'prompt': (prompt or 'Epic orchestral theme with soaring strings and powerful brass'),
759
+ 'music_length_ms': int(music_length_ms) if music_length_ms else 30000,
760
+ }
761
+
762
+ resp = requests.post('https://api.elevenlabs.io/v1/music/compose', headers=headers, json=payload)
763
+ try:
764
+ resp.raise_for_status()
765
+ except Exception as e:
766
+ return f"Error generating music: {getattr(e, 'response', resp).text if hasattr(e, 'response') else resp.text}"
767
+
768
+ # Create temporary URL for preview (will be uploaded to HF during deploy)
769
+ filename = "generated_music.mp3"
770
+ temp_url = upload_media_to_hf(resp.content, filename, "audio", token, use_temp=True)
771
+
772
+ # Check if creation was successful
773
+ if temp_url.startswith("Error"):
774
+ return temp_url
775
+
776
+ audio_html = (
777
+ "<div class=\"anycoder-music\" style=\"max-width:420px;margin:16px auto;padding:12px 16px;border:1px solid #e5e7eb;border-radius:12px;background:linear-gradient(180deg,#fafafa,#f3f4f6);box-shadow:0 2px 8px rgba(0,0,0,0.06)\">"
778
+ " <div style=\"font-size:13px;color:#374151;margin-bottom:8px;display:flex:align-items:center;gap:6px\">"
779
+ " <span>🎵 Generated music</span>"
780
+ " </div>"
781
+ f" <audio controls autoplay loop style=\"width:100%;outline:none;\">"
782
+ f" <source src=\"{temp_url}\" type=\"audio/mpeg\" />"
783
+ " Your browser does not support the audio element."
784
+ " </audio>"
785
+ "</div>"
786
+ )
787
+
788
+ print(f"[Music] Successfully generated music HTML tag with temporary URL: {temp_url}")
789
+ return audio_html
790
+ except Exception as e:
791
+ return f"Error generating music: {str(e)}"
792
+
793
+ def extract_image_prompts_from_text(text: str, num_images_needed: int = 1) -> list:
794
+ """Extract image generation prompts from the full text based on number of images needed"""
795
+ # Use the entire text as the base prompt for image generation
796
+ # Clean up the text and create variations for the required number of images
797
+
798
+ # Clean the text
799
+ cleaned_text = text.strip()
800
+ if not cleaned_text:
801
+ return []
802
+
803
+ # Create variations of the prompt for the required number of images
804
+ prompts = []
805
+
806
+ # Generate exactly the number of images needed
807
+ for i in range(num_images_needed):
808
+ if i == 0:
809
+ # First image: Use the full prompt as-is
810
+ prompts.append(cleaned_text)
811
+ elif i == 1:
812
+ # Second image: Add "visual representation" to make it more image-focused
813
+ prompts.append(f"Visual representation of {cleaned_text}")
814
+ elif i == 2:
815
+ # Third image: Add "illustration" to create a different style
816
+ prompts.append(f"Illustration of {cleaned_text}")
817
+ else:
818
+ # For additional images, use different variations
819
+ variations = [
820
+ f"Digital art of {cleaned_text}",
821
+ f"Modern design of {cleaned_text}",
822
+ f"Professional illustration of {cleaned_text}",
823
+ f"Clean design of {cleaned_text}",
824
+ f"Beautiful visualization of {cleaned_text}",
825
+ f"Stylish representation of {cleaned_text}",
826
+ f"Contemporary design of {cleaned_text}",
827
+ f"Elegant illustration of {cleaned_text}"
828
+ ]
829
+ variation_index = (i - 3) % len(variations)
830
+ prompts.append(variations[variation_index])
831
+
832
+ return prompts
833
+
834
+ def create_image_replacement_blocks(html_content: str, user_prompt: str) -> str:
835
+ """Create search/replace blocks to replace placeholder images with generated Qwen images"""
836
+ if not user_prompt:
837
+ return ""
838
+
839
+ # Find existing image placeholders in the HTML first
840
+ import re
841
+
842
+ # Common patterns for placeholder images
843
+ placeholder_patterns = [
844
+ r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
845
+ r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
846
+ r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
847
+ r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
848
+ r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
849
+ r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
850
+ r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
851
+ r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>', # Base64 images
852
+ r'<img[^>]*src=["\']#["\'][^>]*>', # Empty src
853
+ r'<img[^>]*src=["\']about:blank["\'][^>]*>', # About blank
854
+ ]
855
+
856
+ # Find all placeholder images
857
+ placeholder_images = []
858
+ for pattern in placeholder_patterns:
859
+ matches = re.findall(pattern, html_content, re.IGNORECASE)
860
+ placeholder_images.extend(matches)
861
+
862
+ # Filter out HF URLs from placeholders (they are real generated content)
863
+ placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
864
+
865
+ # If no placeholder images found, look for any img tags
866
+ if not placeholder_images:
867
+ img_pattern = r'<img[^>]*>'
868
+ # Case-insensitive to catch <IMG> or mixed-case tags
869
+ placeholder_images = re.findall(img_pattern, html_content, re.IGNORECASE)
870
+
871
+ # Also look for div elements that might be image placeholders
872
+ div_placeholder_patterns = [
873
+ r'<div[^>]*class=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>',
874
+ r'<div[^>]*id=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>',
875
+ ]
876
+
877
+ for pattern in div_placeholder_patterns:
878
+ matches = re.findall(pattern, html_content, re.IGNORECASE | re.DOTALL)
879
+ placeholder_images.extend(matches)
880
+
881
+ # Count how many images we need to generate
882
+ num_images_needed = len(placeholder_images)
883
+
884
+ if num_images_needed == 0:
885
+ return ""
886
+
887
+ # Generate image prompts based on the number of images found
888
+ image_prompts = extract_image_prompts_from_text(user_prompt, num_images_needed)
889
+
890
+ # Generate images for each prompt
891
+ generated_images = []
892
+ for i, prompt in enumerate(image_prompts):
893
+ image_html = generate_image_with_qwen(prompt, i, token=None) # TODO: Pass token from parent context
894
+ if not image_html.startswith("Error"):
895
+ generated_images.append((i, image_html))
896
+
897
+ if not generated_images:
898
+ return ""
899
+
900
+ # Create search/replace blocks
901
+ replacement_blocks = []
902
+
903
+ for i, (prompt_index, generated_image) in enumerate(generated_images):
904
+ if i < len(placeholder_images):
905
+ # Replace existing placeholder
906
+ placeholder = placeholder_images[i]
907
+ # Clean up the placeholder for better matching
908
+ placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
909
+
910
+ # Try multiple variations of the placeholder for better matching
911
+ placeholder_variations = [
912
+ placeholder_clean,
913
+ placeholder_clean.replace('"', "'"),
914
+ placeholder_clean.replace("'", '"'),
915
+ re.sub(r'\s+', ' ', placeholder_clean),
916
+ placeholder_clean.replace(' ', ' '),
917
+ ]
918
+
919
+ # Create a replacement block for each variation
920
+ for variation in placeholder_variations:
921
+ replacement_blocks.append(f"""{SEARCH_START}
922
+ {variation}
923
+ {DIVIDER}
924
+ {generated_image}
925
+ {REPLACE_END}""")
926
+ else:
927
+ # Add new image if we have more generated images than placeholders
928
+ # Find a good insertion point (after body tag or main content)
929
+ if '<body' in html_content:
930
+ body_end = html_content.find('>', html_content.find('<body')) + 1
931
+ insertion_point = html_content[:body_end] + '\n '
932
+ replacement_blocks.append(f"""{SEARCH_START}
933
+ {insertion_point}
934
+ {DIVIDER}
935
+ {insertion_point}
936
+ {generated_image}
937
+ {REPLACE_END}""")
938
+
939
+ return '\n\n'.join(replacement_blocks)
940
+
941
+ def create_image_replacement_blocks_text_to_image_single(html_content: str, prompt: str) -> str:
942
+ """Create search/replace blocks that generate and insert ONLY ONE text-to-image result."""
943
+ if not prompt or not prompt.strip():
944
+ return ""
945
+
946
+ import re
947
+
948
+ # Detect placeholders similarly to the multi-image version
949
+ placeholder_patterns = [
950
+ r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
951
+ r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
952
+ r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
953
+ r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
954
+ r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
955
+ r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
956
+ r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
957
+ r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>',
958
+ r'<img[^>]*src=["\']#["\'][^>]*>',
959
+ r'<img[^>]*src=["\']about:blank["\'][^>]*>',
960
+ ]
961
+
962
+ placeholder_images = []
963
+ for pattern in placeholder_patterns:
964
+ matches = re.findall(pattern, html_content, re.IGNORECASE)
965
+ if matches:
966
+ placeholder_images.extend(matches)
967
+
968
+ # Filter out HF URLs from placeholders (they are real generated content)
969
+ placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
970
+
971
+ # Filter out HF URLs from placeholders (they are real generated content)
972
+ placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
973
+
974
+ # Fallback to any <img> if no placeholders
975
+ if not placeholder_images:
976
+ img_pattern = r'<img[^>]*>'
977
+ placeholder_images = re.findall(img_pattern, html_content)
978
+
979
+ # Generate a single image
980
+ image_html = generate_image_with_qwen(prompt, 0, token=None) # TODO: Pass token from parent context
981
+ if image_html.startswith("Error"):
982
+ return ""
983
+
984
+ # Replace first placeholder if present
985
+ if placeholder_images:
986
+ placeholder = placeholder_images[0]
987
+ placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
988
+ placeholder_variations = [
989
+ placeholder_clean,
990
+ placeholder_clean.replace('"', "'"),
991
+ placeholder_clean.replace("'", '"'),
992
+ re.sub(r'\s+', ' ', placeholder_clean),
993
+ placeholder_clean.replace(' ', ' '),
994
+ ]
995
+ blocks = []
996
+ for variation in placeholder_variations:
997
+ blocks.append(f"""{SEARCH_START}
998
+ {variation}
999
+ {DIVIDER}
1000
+ {image_html}
1001
+ {REPLACE_END}""")
1002
+ return '\n\n'.join(blocks)
1003
+
1004
+ # Otherwise insert after <body>
1005
+ if '<body' in html_content:
1006
+ body_end = html_content.find('>', html_content.find('<body')) + 1
1007
+ insertion_point = html_content[:body_end] + '\n '
1008
+ return f"""{SEARCH_START}
1009
+ {insertion_point}
1010
+ {DIVIDER}
1011
+ {insertion_point}
1012
+ {image_html}
1013
+ {REPLACE_END}"""
1014
+
1015
+ # If no <body>, just append
1016
+ return f"{SEARCH_START}\n\n{DIVIDER}\n{image_html}\n{REPLACE_END}"
1017
+
1018
+ def create_video_replacement_blocks_text_to_video(html_content: str, prompt: str, session_id: Optional[str] = None) -> str:
1019
+ """Create search/replace blocks that generate and insert ONLY ONE text-to-video result."""
1020
+ if not prompt or not prompt.strip():
1021
+ return ""
1022
+
1023
+ import re
1024
+
1025
+ # Detect the same placeholders as image counterparts, to replace the first image slot with a video
1026
+ placeholder_patterns = [
1027
+ r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
1028
+ r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
1029
+ r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
1030
+ r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
1031
+ r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
1032
+ r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
1033
+ r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
1034
+ r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>',
1035
+ r'<img[^>]*src=["\']#["\'][^>]*>',
1036
+ r'<img[^>]*src=["\']about:blank["\'][^>]*>',
1037
+ ]
1038
+
1039
+ placeholder_images = []
1040
+ for pattern in placeholder_patterns:
1041
+ matches = re.findall(pattern, html_content, re.IGNORECASE)
1042
+ if matches:
1043
+ placeholder_images.extend(matches)
1044
+
1045
+ # Filter out HF URLs from placeholders (they are real generated content)
1046
+ placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
1047
+
1048
+ if not placeholder_images:
1049
+ img_pattern = r'<img[^>]*>'
1050
+ placeholder_images = re.findall(img_pattern, html_content)
1051
+
1052
+ video_html = generate_video_from_text(prompt, session_id=session_id, token=None) # TODO: Pass token from parent context
1053
+ if video_html.startswith("Error"):
1054
+ return ""
1055
+
1056
+ # Replace first placeholder if present
1057
+ if placeholder_images:
1058
+ placeholder = placeholder_images[0]
1059
+ placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
1060
+ placeholder_variations = [
1061
+ placeholder,
1062
+ placeholder_clean,
1063
+ placeholder_clean.replace('"', "'"),
1064
+ placeholder_clean.replace("'", '"'),
1065
+ re.sub(r'\s+', ' ', placeholder_clean),
1066
+ placeholder_clean.replace(' ', ' '),
1067
+ ]
1068
+ blocks = []
1069
+ for variation in placeholder_variations:
1070
+ blocks.append(f"""{SEARCH_START}
1071
+ {variation}
1072
+ {DIVIDER}
1073
+ {video_html}
1074
+ {REPLACE_END}""")
1075
+ return '\n\n'.join(blocks)
1076
+
1077
+ # Otherwise insert after <body> with proper container
1078
+ if '<body' in html_content:
1079
+ body_start = html_content.find('<body')
1080
+ body_end = html_content.find('>', body_start) + 1
1081
+ opening_body_tag = html_content[body_start:body_end]
1082
+
1083
+ # Look for existing container elements to insert into
1084
+ body_content_start = body_end
1085
+
1086
+ # Try to find a good insertion point within existing content structure
1087
+ patterns_to_try = [
1088
+ r'<main[^>]*>',
1089
+ r'<section[^>]*class="[^"]*hero[^"]*"[^>]*>',
1090
+ r'<div[^>]*class="[^"]*container[^"]*"[^>]*>',
1091
+ r'<header[^>]*>',
1092
+ ]
1093
+
1094
+ insertion_point = None
1095
+ for pattern in patterns_to_try:
1096
+ import re
1097
+ match = re.search(pattern, html_content[body_content_start:], re.IGNORECASE)
1098
+ if match:
1099
+ match_end = body_content_start + match.end()
1100
+ # Find the end of this tag
1101
+ tag_content = html_content[body_content_start + match.start():match_end]
1102
+ insertion_point = html_content[:match_end] + '\n '
1103
+ break
1104
+
1105
+ if not insertion_point:
1106
+ # Fallback to right after body tag with container div
1107
+ insertion_point = html_content[:body_end] + '\n '
1108
+ video_with_container = f'<div class="video-container" style="margin: 20px 0; text-align: center;">\n {video_html}\n </div>'
1109
+ return f"""{SEARCH_START}
1110
+ {insertion_point}
1111
+ {DIVIDER}
1112
+ {insertion_point}
1113
+ {video_with_container}
1114
+ {REPLACE_END}"""
1115
+ else:
1116
+ return f"""{SEARCH_START}
1117
+ {insertion_point}
1118
+ {DIVIDER}
1119
+ {insertion_point}
1120
+ {video_html}
1121
+ {REPLACE_END}"""
1122
+
1123
+ # If no <body>, just append
1124
+ return f"{SEARCH_START}\n\n{DIVIDER}\n{video_html}\n{REPLACE_END}"
1125
+
1126
+ def create_music_replacement_blocks_text_to_music(html_content: str, prompt: str, session_id: Optional[str] = None) -> str:
1127
+ """Create search/replace blocks that insert ONE generated <audio> near the top of <body>."""
1128
+ if not prompt or not prompt.strip():
1129
+ return ""
1130
+
1131
+ audio_html = generate_music_from_text(prompt, session_id=session_id, token=None) # TODO: Pass token from parent context
1132
+ if audio_html.startswith("Error"):
1133
+ return ""
1134
+
1135
+ # Prefer inserting after the first <section>...</section> if present; else after <body>
1136
+ import re
1137
+ section_match = re.search(r"<section\b[\s\S]*?</section>", html_content, flags=re.IGNORECASE)
1138
+ if section_match:
1139
+ section_html = section_match.group(0)
1140
+ section_clean = re.sub(r"\s+", " ", section_html.strip())
1141
+ variations = [
1142
+ section_html,
1143
+ section_clean,
1144
+ section_clean.replace('"', "'"),
1145
+ section_clean.replace("'", '"'),
1146
+ re.sub(r"\s+", " ", section_clean),
1147
+ ]
1148
+ blocks = []
1149
+ for v in variations:
1150
+ blocks.append(f"""{SEARCH_START}
1151
+ {v}
1152
+ {DIVIDER}
1153
+ {v}\n {audio_html}
1154
+ {REPLACE_END}""")
1155
+ return "\n\n".join(blocks)
1156
+ if '<body' in html_content:
1157
+ body_end = html_content.find('>', html_content.find('<body')) + 1
1158
+ insertion_point = html_content[:body_end] + '\n '
1159
+ return f"""{SEARCH_START}
1160
+ {insertion_point}
1161
+ {DIVIDER}
1162
+ {insertion_point}
1163
+ {audio_html}
1164
+ {REPLACE_END}"""
1165
+
1166
+ # If no <body>, just append
1167
+ return f"{SEARCH_START}\n\n{DIVIDER}\n{audio_html}\n{REPLACE_END}"