import re import base64 import json from typing import Dict, List, Optional, Tuple, Union from pathlib import Path from utils import apply_search_replace_changes, validate_video_html from media_generation import generate_image_with_qwen, generate_image_to_image, generate_video_from_image, generate_video_from_text, generate_music_from_text from config import SEARCH_START, DIVIDER, REPLACE_END class CodeProcessor: """Handles processing and transformation of various code formats""" @staticmethod def is_streamlit_code(code: str) -> bool: """Check if Python code is a Streamlit app""" if not code: return False lowered = code.lower() return ("import streamlit" in lowered) or ("from streamlit" in lowered) or ("st." in code and "streamlit" in lowered) @staticmethod def is_gradio_code(code: str) -> bool: """Check if Python code is a Gradio app""" if not code: return False lowered = code.lower() return ( "import gradio" in lowered or "from gradio" in lowered or "gr.Interface(" in code or "gr.Blocks(" in code ) @staticmethod def extract_html_document(text: str) -> str: """Extract HTML document from text, ignoring planning notes""" if not text: return text lower = text.lower() idx = lower.find(" Dict[str, str]: """Parse transformers.js output and extract the three files""" files = { 'index.html': '', 'index.js': '', 'style.css': '' } if not text: return files # Multiple patterns for different code block variations html_patterns = [ r'```html\s*\n([\s\S]*?)(?:```|\Z)', r'```htm\s*\n([\s\S]*?)(?:```|\Z)', r'```\s*(?:index\.html|html)\s*\n([\s\S]*?)(?:```|\Z)' ] js_patterns = [ r'```javascript\s*\n([\s\S]*?)(?:```|\Z)', r'```js\s*\n([\s\S]*?)(?:```|\Z)', r'```\s*(?:index\.js|javascript|js)\s*\n([\s\S]*?)(?:```|\Z)' ] css_patterns = [ r'```css\s*\n([\s\S]*?)(?:```|\Z)', r'```\s*(?:style\.css|css)\s*\n([\s\S]*?)(?:```|\Z)' ] # Extract content using patterns for pattern in html_patterns: html_match = re.search(pattern, text, re.IGNORECASE) if html_match: files['index.html'] = html_match.group(1).strip() break for pattern in js_patterns: js_match = re.search(pattern, text, re.IGNORECASE) if js_match: files['index.js'] = js_match.group(1).strip() break for pattern in css_patterns: css_match = re.search(pattern, text, re.IGNORECASE) if css_match: files['style.css'] = css_match.group(1).strip() break # Fallback: support === filename === format if not (files['index.html'] and files['index.js'] and files['style.css']): fallback_files = MultipageProcessor.parse_multipage_html_output(text) for key in files.keys(): if key in fallback_files: files[key] = fallback_files[key] return files @staticmethod def format_transformers_js_output(files: Dict[str, str]) -> str: """Format the three files into a single display string""" output = [] output.append("=== index.html ===") output.append(files.get('index.html', '')) output.append("\n=== index.js ===") output.append(files.get('index.js', '')) output.append("\n=== style.css ===") output.append(files.get('style.css', '')) return '\n'.join(output) @staticmethod def build_transformers_inline_html(files: Dict[str, str]) -> str: """Merge transformers.js files into a single HTML document""" html = files.get('index.html') or '' js = files.get('index.js') or '' css = files.get('style.css') or '' # Normalize JS imports to stable CDN cdn_url = "https://cdn.jsdelivr.net/npm/@huggingface/transformers@3.7.2" def _normalize_imports(_code: str) -> str: if not _code: return _code or "" _code = re.sub(r"from\s+['\"]@huggingface/transformers['\"]", f"from '{cdn_url}'", _code) _code = re.sub(r"from\s+['\"]@xenova/transformers['\"]", f"from '{cdn_url}'", _code) _code = re.sub(r"from\s+['\"]https://cdn.jsdelivr.net/npm/@huggingface/transformers@[^'\"]+['\"]", f"from '{cdn_url}'", _code) _code = re.sub(r"from\s+['\"]https://cdn.jsdelivr.net/npm/@xenova/transformers@[^'\"]+['\"]", f"from '{cdn_url}'", _code) return _code # Extract and merge inline module scripts inline_modules = [] try: for _m in re.finditer(r"]*type=[\"']module[\"'][^>]*>([\s\S]*?)", html, flags=re.IGNORECASE): inline_modules.append(_m.group(1)) if inline_modules: html = re.sub(r"]*type=[\"']module[\"'][^>]*>[\s\S]*?\s*", "", html, flags=re.IGNORECASE) except Exception: pass # Combine JS code combined_js_parts = [] if inline_modules: combined_js_parts.append("\n\n".join(inline_modules)) if js: combined_js_parts.append(js) js = "\n\n".join([p for p in combined_js_parts if (p and p.strip())]) js = _normalize_imports(js) # Add prelude for better compatibility if js.strip(): prelude = ( f"import {{ env }} from '{cdn_url}';\n" "try { env.useBrowserCache = false; } catch (e) {}\n" "try { if (env && env.backends && env.backends.onnx && env.backends.onnx.wasm) { env.backends.onnx.wasm.numThreads = 1; env.backends.onnx.wasm.proxy = false; } } catch (e) {}\n" f"(async () => {{ try {{ if (typeof globalThis.transformers === 'undefined') {{ const m = await import('{cdn_url}'); globalThis.transformers = m; }} }} catch (e) {{}} }})();\n" ) js = prelude + js # Create minimal shell if needed doc = html.strip() if not doc or ('\n" "\n\n\n\nTransformers.js App\n\n" "\n
\n\n" ) # Remove local file references doc = re.sub(r"]+href=\"[^\"]*style\.css\"[^>]*>\s*", "", doc, flags=re.IGNORECASE) doc = re.sub(r"]+src=\"[^\"]*index\.js\"[^>]*>\s*\s*", "", doc, flags=re.IGNORECASE) # Inline CSS if css: style_tag = f"" if '' in doc.lower(): match = re.search(r"", doc, flags=re.IGNORECASE) if match: idx = match.start() doc = doc[:idx] + style_tag + doc[idx:] else: match = re.search(r"]*>", doc, flags=re.IGNORECASE) if match: idx = match.end() doc = doc[:idx] + "\n" + style_tag + doc[idx:] else: doc = style_tag + doc # Inline JS with debugging and cleanup if js: script_tag = f"" debug_overlay = TransformersJSProcessor._create_debug_overlay() cleanup_tag = TransformersJSProcessor._create_cleanup_script() match = re.search(r"", doc, flags=re.IGNORECASE) if match: idx = match.start() doc = doc[:idx] + debug_overlay + script_tag + cleanup_tag + doc[idx:] else: doc = doc + debug_overlay + script_tag + cleanup_tag return doc @staticmethod def _create_debug_overlay() -> str: """Create debug overlay for transformers.js apps""" return ( "\n" "
\n" "" ) @staticmethod def _create_cleanup_script() -> str: """Create cleanup script for transformers.js apps""" return ( "" ) class SvelteProcessor: """Handles Svelte specific code processing""" @staticmethod def parse_svelte_output(text: str) -> Dict[str, str]: """Parse Svelte output to extract individual files""" files = { 'src/App.svelte': '', 'src/app.css': '' } if not text: return files # Extract using code block patterns svelte_pattern = r'```svelte\s*\n([\s\S]+?)\n```' css_pattern = r'```css\s*\n([\s\S]+?)\n```' svelte_match = re.search(svelte_pattern, text, re.IGNORECASE) css_match = re.search(css_pattern, text, re.IGNORECASE) if svelte_match: files['src/App.svelte'] = svelte_match.group(1).strip() if css_match: files['src/app.css'] = css_match.group(1).strip() # Fallback: support === filename === format if not (files['src/App.svelte'] and files['src/app.css']): fallback_files = MultipageProcessor.parse_multipage_html_output(text) for key in files.keys(): if key in fallback_files: files[key] = fallback_files[key] return files @staticmethod def format_svelte_output(files: Dict[str, str]) -> str: """Format Svelte files into a single display string""" output = [] output.append("=== src/App.svelte ===") output.append(files.get('src/App.svelte', '')) output.append("\n=== src/app.css ===") output.append(files.get('src/app.css', '')) return '\n'.join(output) class MultipageProcessor: """Handles multi-page HTML projects""" @staticmethod def parse_multipage_html_output(text: str) -> Dict[str, str]: """Parse multi-page HTML output formatted as === filename === sections""" if not text: return {} from utils import remove_code_block cleaned = remove_code_block(text) files: Dict[str, str] = {} pattern = re.compile(r"^===\s*([^=\n]+?)\s*===\s*\n([\s\S]*?)(?=\n===\s*[^=\n]+?\s*===|\Z)", re.MULTILINE) for m in pattern.finditer(cleaned): name = m.group(1).strip() content = m.group(2).strip() # Remove accidental trailing fences content = re.sub(r"^```\w*\s*\n|\n```\s*$", "", content) files[name] = content return files @staticmethod def format_multipage_output(files: Dict[str, str]) -> str: """Format files back into === filename === sections""" if not isinstance(files, dict) or not files: return "" # Order with index.html first ordered_paths = [] if 'index.html' in files: ordered_paths.append('index.html') for path in sorted(files.keys()): if path == 'index.html': continue ordered_paths.append(path) parts: List[str] = [] for path in ordered_paths: parts.append(f"=== {path} ===") parts.append((files.get(path) or '').rstrip()) return "\n".join(parts) @staticmethod def validate_and_autofix_files(files: Dict[str, str]) -> Dict[str, str]: """Ensure minimal contract for multi-file sites""" if not isinstance(files, dict) or not files: return files or {} normalized: Dict[str, str] = {} for k, v in files.items(): safe_key = k.strip().lstrip('/') normalized[safe_key] = v html_files = [p for p in normalized.keys() if p.lower().endswith('.html')] has_index = 'index.html' in normalized # Create index.html if missing but other HTML files exist if not has_index and html_files: links = '\n'.join([f"
  • {p}
  • " for p in html_files]) normalized['index.html'] = ( "\n\n\n\n" "\n" "Site Index\n\n\n

    Site

    \n
      \n" + links + "\n
    \n\n" ) # Collect asset references asset_refs: set[str] = set() patterns = [ re.compile(r"]+href=\"([^\"]+)\""), re.compile(r"]+src=\"([^\"]+)\""), re.compile(r"]+src=\"([^\"]+)\""), re.compile(r"]+href=\"([^\"]+)\"") ] for path, content in list(normalized.items()): if not path.lower().endswith('.html'): continue for patt in patterns: for m in patt.finditer(content or ""): ref = (m.group(1) or "").strip() if not ref or ref.startswith(('http://', 'https://', 'data:', '#')): continue asset_refs.add(ref.lstrip('/')) # Add minimal stubs for missing references for ref in list(asset_refs): if ref not in normalized: if ref.lower().endswith('.css'): normalized[ref] = "/* generated stub */\n" elif ref.lower().endswith('.js'): normalized[ref] = "// generated stub\n" elif ref.lower().endswith('.html'): normalized[ref] = ( "\n\nPage\n" "

    Placeholder page

    This page was auto-created to satisfy an internal link.

    \n" ) return normalized @staticmethod def inline_multipage_into_single_preview(files: Dict[str, str]) -> str: """Inline local CSS/JS for iframe preview""" html = files.get('index.html', '') if not html: return "" doc = html # Inline CSS links def _inline_css(match): href = match.group(1) if href in files: return f"" return match.group(0) doc = re.sub(r"]+href=\"([^\"]+)\"[^>]*/?>", _inline_css, doc, flags=re.IGNORECASE) # Inline JS scripts def _inline_js(match): src = match.group(1) if src in files: return f"" return match.group(0) doc = re.sub(r"]+src=\"([^\"]+)\"[^>]*>\s*", _inline_js, doc, flags=re.IGNORECASE) # Add client-side navigation for other pages MultipageProcessor._add_client_side_navigation(doc, files) return doc @staticmethod def _add_client_side_navigation(doc: str, files: Dict[str, str]) -> str: """Add client-side navigation for multi-page preview""" try: html_pages = {k: v for k, v in files.items() if k.lower().endswith('.html')} # Extract body content for each page _index_body = re.search(r"]*>([\s\S]*?)", doc, flags=re.IGNORECASE) html_pages['index.html'] = _index_body.group(1) if _index_body else doc encoded = base64.b64encode(json.dumps(html_pages).encode('utf-8')).decode('ascii') nav_script = ( "" ) m = re.search(r"", doc, flags=re.IGNORECASE) if m: i = m.start() doc = doc[:i] + nav_script + doc[i:] else: doc = doc + nav_script except Exception: pass # Non-fatal in preview return doc class MediaIntegrator: """Handles integration of generated media into code""" @staticmethod def apply_generated_media_to_html(html_content: str, user_prompt: str, enable_text_to_image: bool = False, enable_image_to_image: bool = False, input_image_data=None, image_to_image_prompt: Optional[str] = None, text_to_image_prompt: Optional[str] = None, enable_image_to_video: bool = False, image_to_video_prompt: Optional[str] = None, session_id: Optional[str] = None, enable_text_to_video: bool = False, text_to_video_prompt: Optional[str] = None, enable_text_to_music: bool = False, text_to_music_prompt: Optional[str] = None, token=None) -> str: """Apply media generation to HTML content""" # Detect multi-page structure is_multipage = False multipage_files = {} entry_html_path = None try: multipage_files = MultipageProcessor.parse_multipage_html_output(html_content) or {} if multipage_files: is_multipage = True entry_html_path = 'index.html' if 'index.html' in multipage_files else next((p for p in multipage_files.keys() if p.lower().endswith('.html')), None) except Exception: pass result = multipage_files.get(entry_html_path, html_content) if is_multipage and entry_html_path else html_content try: # Process media generation based on priority if enable_image_to_video and input_image_data is not None: result = MediaIntegrator._apply_image_to_video(result, user_prompt, image_to_video_prompt, input_image_data, session_id, token) elif enable_text_to_video: result = MediaIntegrator._apply_text_to_video(result, user_prompt, text_to_video_prompt, session_id, token) elif enable_text_to_music: result = MediaIntegrator._apply_text_to_music(result, user_prompt, text_to_music_prompt, session_id, token) elif enable_image_to_image and input_image_data is not None: result = MediaIntegrator._apply_image_to_image(result, user_prompt, image_to_image_prompt, input_image_data, token) elif enable_text_to_image: result = MediaIntegrator._apply_text_to_image(result, user_prompt, text_to_image_prompt, token) except Exception as e: print(f"[MediaApply] Error during media generation: {str(e)}") # Return updated content if is_multipage and entry_html_path: multipage_files[entry_html_path] = result return MultipageProcessor.format_multipage_output(multipage_files) return result @staticmethod def _apply_image_to_video(html_content: str, user_prompt: str, prompt: Optional[str], input_image_data, session_id: Optional[str], token) -> str: """Apply image-to-video generation""" i2v_prompt = (prompt or user_prompt or "").strip() print(f"[MediaApply] Applying image-to-video with prompt: {i2v_prompt}") try: video_html_tag = generate_video_from_image(input_image_data, i2v_prompt, session_id=session_id, token=token) if not video_html_tag.startswith("Error") and validate_video_html(video_html_tag): return MediaIntegrator._place_media_in_html(html_content, video_html_tag, "video") except Exception as e: print(f"[MediaApply] Image-to-video generation failed: {str(e)}") return html_content @staticmethod def _apply_text_to_video(html_content: str, user_prompt: str, prompt: Optional[str], session_id: Optional[str], token) -> str: """Apply text-to-video generation""" t2v_prompt = (prompt or user_prompt or "").strip() print(f"[MediaApply] Applying text-to-video with prompt: {t2v_prompt}") try: video_html_tag = generate_video_from_text(t2v_prompt, session_id=session_id, token=token) if not video_html_tag.startswith("Error") and validate_video_html(video_html_tag): return MediaIntegrator._place_media_in_html(html_content, video_html_tag, "video") except Exception as e: print(f"[MediaApply] Text-to-video generation failed: {str(e)}") return html_content @staticmethod def _apply_text_to_music(html_content: str, user_prompt: str, prompt: Optional[str], session_id: Optional[str], token) -> str: """Apply text-to-music generation""" t2m_prompt = (prompt or user_prompt or "").strip() print(f"[MediaApply] Applying text-to-music with prompt: {t2m_prompt}") try: audio_html_tag = generate_music_from_text(t2m_prompt, session_id=session_id, token=token) if not audio_html_tag.startswith("Error"): return MediaIntegrator._place_media_in_html(html_content, audio_html_tag, "audio") except Exception as e: print(f"[MediaApply] Text-to-music generation failed: {str(e)}") return html_content @staticmethod def _apply_image_to_image(html_content: str, user_prompt: str, prompt: Optional[str], input_image_data, token) -> str: """Apply image-to-image generation""" i2i_prompt = (prompt or user_prompt or "").strip() print(f"[MediaApply] Applying image-to-image with prompt: {i2i_prompt}") try: image_html_tag = generate_image_to_image(input_image_data, i2i_prompt, token=token) if not image_html_tag.startswith("Error"): return MediaIntegrator._place_media_in_html(html_content, image_html_tag, "image") except Exception as e: print(f"[MediaApply] Image-to-image generation failed: {str(e)}") return html_content @staticmethod def _apply_text_to_image(html_content: str, user_prompt: str, prompt: Optional[str], token) -> str: """Apply text-to-image generation""" t2i_prompt = (prompt or user_prompt or "").strip() print(f"[MediaApply] Applying text-to-image with prompt: {t2i_prompt}") try: image_html_tag = generate_image_with_qwen(t2i_prompt, 0, token=token) if not image_html_tag.startswith("Error"): return MediaIntegrator._place_media_in_html(html_content, image_html_tag, "image") except Exception as e: print(f"[MediaApply] Text-to-image generation failed: {str(e)}") return html_content @staticmethod def _place_media_in_html(html_content: str, media_html: str, media_type: str) -> str: """Place generated media in appropriate location in HTML""" # Find good insertion points insertion_patterns = [ r'(]*>)', r'(]*class="[^"]*hero[^"]*"[^>]*>)', r'(]*class="[^"]*container[^"]*"[^>]*>)', r'(]*>)' ] for pattern in insertion_patterns: match = re.search(pattern, html_content, re.IGNORECASE) if match: insertion_point = match.end() container_class = "video-container" if media_type == "video" else f"{media_type}-container" media_with_container = f'\n
    \n {media_html}\n
    ' return html_content[:insertion_point] + media_with_container + html_content[insertion_point:] # Fallback: append before closing body body_close = html_content.rfind('') if body_close != -1: return html_content[:body_close] + f'\n {media_html}\n' + html_content[body_close:] # Last resort: append at end return html_content + f'\n{media_html}' # Export main functions and classes code_processor = CodeProcessor() transformers_processor = TransformersJSProcessor() svelte_processor = SvelteProcessor() multipage_processor = MultipageProcessor() media_integrator = MediaIntegrator() # Main exports def is_streamlit_code(code: str) -> bool: return code_processor.is_streamlit_code(code) def is_gradio_code(code: str) -> bool: return code_processor.is_gradio_code(code) def extract_html_document(text: str) -> str: return code_processor.extract_html_document(text) def parse_transformers_js_output(text: str) -> Dict[str, str]: return transformers_processor.parse_transformers_js_output(text) def format_transformers_js_output(files: Dict[str, str]) -> str: return transformers_processor.format_transformers_js_output(files) def build_transformers_inline_html(files: Dict[str, str]) -> str: return transformers_processor.build_transformers_inline_html(files) def parse_svelte_output(text: str) -> Dict[str, str]: return svelte_processor.parse_svelte_output(text) def format_svelte_output(files: Dict[str, str]) -> str: return svelte_processor.format_svelte_output(files) def parse_multipage_html_output(text: str) -> Dict[str, str]: return multipage_processor.parse_multipage_html_output(text) def format_multipage_output(files: Dict[str, str]) -> str: return multipage_processor.format_multipage_output(files) def validate_and_autofix_files(files: Dict[str, str]) -> Dict[str, str]: return multipage_processor.validate_and_autofix_files(files) def inline_multipage_into_single_preview(files: Dict[str, str]) -> str: return multipage_processor.inline_multipage_into_single_preview(files) def apply_generated_media_to_html(html_content: str, user_prompt: str, **kwargs) -> str: return media_integrator.apply_generated_media_to_html(html_content, user_prompt, **kwargs)