|
import uuid |
|
import time |
|
import re |
|
from typing import Dict, List, Optional, Tuple, Generator, Any |
|
import gradio as gr |
|
|
|
from utils import get_inference_client, remove_code_block, extract_text_from_file, |
|
create_multimodal_message, apply_search_replace_changes, cleanup_session_media, reap_old_media |
|
from web_utils import extract_website_content, enhance_query_with_search |
|
from code_processing import ( |
|
is_streamlit_code, is_gradio_code, extract_html_document, |
|
parse_transformers_js_output, format_transformers_js_output, build_transformers_inline_html, |
|
parse_svelte_output, format_svelte_output, |
|
parse_multipage_html_output, format_multipage_output, validate_and_autofix_files, |
|
inline_multipage_into_single_preview, apply_generated_media_to_html |
|
) |
|
from media_generation import MediaGenerator |
|
from config import ( |
|
HTML_SYSTEM_PROMPT, TRANSFORMERS_JS_SYSTEM_PROMPT, SVELTE_SYSTEM_PROMPT, GENERIC_SYSTEM_PROMPT, |
|
SEARCH_START, DIVIDER, REPLACE_END, TEMP_DIR_TTL_SECONDS |
|
) |
|
|
|
class GenerationEngine: |
|
"""Advanced code generation engine with multi-model support and intelligent processing""" |
|
|
|
def __init__(self): |
|
self.media_generator = MediaGenerator() |
|
self._active_generations = {} |
|
self._generation_stats = { |
|
'total_requests': 0, |
|
'successful_generations': 0, |
|
'errors': 0, |
|
'avg_response_time': 0.0 |
|
} |
|
|
|
def generate_code(self, |
|
query: Optional[str] = None, |
|
vlm_image: Optional[gr.Image] = None, |
|
gen_image: Optional[gr.Image] = None, |
|
file: Optional[str] = None, |
|
website_url: Optional[str] = None, |
|
settings: Dict[str, Any] = None, |
|
history: Optional[List[Tuple[str, str]]] = None, |
|
current_model: Dict = None, |
|
enable_search: bool = False, |
|
language: str = "html", |
|
provider: str = "auto", |
|
**media_options) -> Generator[Dict[str, Any], None, None]: |
|
""" |
|
Main code generation method with comprehensive options and streaming support |
|
""" |
|
start_time = time.time() |
|
session_id = str(uuid.uuid4()) |
|
|
|
try: |
|
self._active_generations[session_id] = { |
|
'start_time': start_time, |
|
'status': 'initializing', |
|
'progress': 0 |
|
} |
|
|
|
|
|
query = query or '' |
|
history = history or [] |
|
settings = settings or {} |
|
current_model = current_model or {'id': 'Qwen/Qwen3-Coder-480B-A35B-Instruct', 'name': 'Qwen3-Coder'} |
|
|
|
|
|
self._generation_stats['total_requests'] += 1 |
|
|
|
|
|
self._cleanup_resources(session_id) |
|
|
|
|
|
has_existing_content = self._check_existing_content(history) |
|
|
|
|
|
if has_existing_content and query.strip(): |
|
yield from self._handle_modification_request(query, history, current_model, provider, session_id) |
|
return |
|
|
|
|
|
enhanced_query = self._process_inputs(query, file, website_url, enable_search) |
|
|
|
|
|
system_prompt = self._select_system_prompt(language, enable_search, has_existing_content) |
|
|
|
|
|
messages = self._prepare_messages(history, system_prompt, enhanced_query, vlm_image) |
|
|
|
|
|
yield from self._stream_generation( |
|
messages, current_model, provider, language, |
|
enhanced_query, gen_image, session_id, media_options |
|
) |
|
|
|
|
|
self._generation_stats['successful_generations'] += 1 |
|
elapsed_time = time.time() - start_time |
|
self._update_avg_response_time(elapsed_time) |
|
|
|
except Exception as e: |
|
self._generation_stats['errors'] += 1 |
|
error_message = f"Generation Error: {str(e)}" |
|
print(f"[GenerationEngine] Error: {error_message}") |
|
|
|
yield { |
|
'code_output': error_message, |
|
'history_output': self._convert_history_to_messages(history), |
|
'sandbox': f"<div style='padding:2rem;text-align:center;color:#dc2626;background:#fef2f2;border:1px solid #fecaca;border-radius:8px;'><h3>Generation Failed</h3><p>{error_message}</p></div>", |
|
'status': 'error' |
|
} |
|
finally: |
|
|
|
self._active_generations.pop(session_id, None) |
|
|
|
def _cleanup_resources(self, session_id: str): |
|
"""Clean up temporary resources""" |
|
try: |
|
cleanup_session_media(session_id) |
|
reap_old_media() |
|
except Exception as e: |
|
print(f"[GenerationEngine] Cleanup warning: {e}") |
|
|
|
def _check_existing_content(self, history: List[Tuple[str, str]]) -> bool: |
|
"""Check if there's existing content to modify""" |
|
if not history: |
|
return False |
|
|
|
last_assistant_msg = history[-1][1] if history else "" |
|
content_indicators = [ |
|
'<!DOCTYPE html>', '<html', 'import gradio', 'import streamlit', |
|
'def ', 'IMPORTED PROJECT FROM HUGGING FACE SPACE', |
|
'=== index.html ===', '=== index.js ===', '=== style.css ===' |
|
] |
|
|
|
return any(indicator in last_assistant_msg for indicator in content_indicators) |
|
|
|
def _handle_modification_request(self, query: str, history: List[Tuple[str, str]], |
|
current_model: Dict, provider: str, session_id: str) -> Generator: |
|
"""Handle search/replace modification requests""" |
|
try: |
|
print("[GenerationEngine] Processing modification request") |
|
|
|
client = get_inference_client(current_model['id'], provider) |
|
last_assistant_msg = history[-1][1] if history else "" |
|
|
|
|
|
system_prompt = f"""You are a code editor assistant. Generate EXACT search/replace blocks for the requested modifications. |
|
|
|
CRITICAL REQUIREMENTS: |
|
1. Use EXACTLY these markers: {SEARCH_START}, {DIVIDER}, {REPLACE_END} |
|
2. The SEARCH block must match existing code EXACTLY (including whitespace) |
|
3. Generate multiple blocks if needed for different changes |
|
4. Only include specific lines that need to change with sufficient context |
|
5. DO NOT include explanations outside the blocks |
|
|
|
Example: |
|
{SEARCH_START} |
|
<h1>Old Title</h1> |
|
{DIVIDER} |
|
<h1>New Title</h1> |
|
{REPLACE_END}""" |
|
|
|
user_prompt = f"""Existing code: |
|
{last_assistant_msg} |
|
|
|
Modification request: |
|
{query} |
|
|
|
Generate the exact search/replace blocks needed.""" |
|
|
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_prompt} |
|
] |
|
|
|
|
|
response = self._call_llm(client, current_model, messages, max_tokens=4000, temperature=0.1) |
|
|
|
if response: |
|
|
|
if '=== index.html ===' in last_assistant_msg: |
|
modified_content = self._apply_transformers_js_changes(last_assistant_msg, response) |
|
else: |
|
modified_content = apply_search_replace_changes(last_assistant_msg, response) |
|
|
|
if modified_content != last_assistant_msg: |
|
updated_history = history + [(query, modified_content)] |
|
|
|
yield { |
|
'code_output': modified_content, |
|
'history': updated_history, |
|
'sandbox': self._generate_preview(modified_content, "html"), |
|
'history_output': self._convert_history_to_messages(updated_history), |
|
'status': 'completed' |
|
} |
|
return |
|
|
|
|
|
print("[GenerationEngine] Search/replace failed, falling back to normal generation") |
|
|
|
except Exception as e: |
|
print(f"[GenerationEngine] Modification request failed: {e}") |
|
|
|
def _process_inputs(self, query: str, file: Optional[str], website_url: Optional[str], |
|
enable_search: bool) -> str: |
|
"""Process file and website inputs, enhance with search if enabled""" |
|
enhanced_query = query |
|
|
|
|
|
if file: |
|
file_text = extract_text_from_file(file) |
|
if file_text: |
|
file_text = file_text[:5000] |
|
enhanced_query = f"{enhanced_query}\n\n[Reference file content]\n{file_text}" |
|
|
|
|
|
if website_url and website_url.strip(): |
|
website_text = extract_website_content(website_url.strip()) |
|
if website_text and not website_text.startswith("Error"): |
|
website_text = website_text[:8000] |
|
enhanced_query = f"{enhanced_query}\n\n[Website content to redesign]\n{website_text}" |
|
elif website_text.startswith("Error"): |
|
fallback_guidance = """ |
|
Since I couldn't extract the website content, please provide: |
|
1. What type of website is this? |
|
2. What are the main features you want? |
|
3. What's the target audience? |
|
4. Any specific design preferences?""" |
|
enhanced_query = f"{enhanced_query}\n\n[Website extraction error: {website_text}]{fallback_guidance}" |
|
|
|
|
|
if enable_search: |
|
enhanced_query = enhance_query_with_search(enhanced_query, True) |
|
|
|
return enhanced_query |
|
|
|
def _select_system_prompt(self, language: str, enable_search: bool, has_existing_content: bool) -> str: |
|
"""Select appropriate system prompt based on context""" |
|
if has_existing_content: |
|
return self._get_followup_system_prompt(language) |
|
|
|
|
|
search_enhancement = """ |
|
|
|
Use web search results when available to incorporate the latest best practices, frameworks, and technologies.""" if enable_search else "" |
|
|
|
if language == "html": |
|
return HTML_SYSTEM_PROMPT + search_enhancement |
|
elif language == "transformers.js": |
|
return TRANSFORMERS_JS_SYSTEM_PROMPT + search_enhancement |
|
elif language == "svelte": |
|
return SVELTE_SYSTEM_PROMPT + search_enhancement |
|
else: |
|
return GENERIC_SYSTEM_PROMPT.format(language=language) + search_enhancement |
|
|
|
def _get_followup_system_prompt(self, language: str) -> str: |
|
"""Get follow-up system prompt for modifications""" |
|
return f"""You are an expert developer modifying existing {language} code. |
|
Apply the requested changes using SEARCH/REPLACE blocks with these markers: |
|
{SEARCH_START}, {DIVIDER}, {REPLACE_END} |
|
|
|
Requirements: |
|
- SEARCH blocks must match existing code EXACTLY |
|
- Provide multiple blocks for different changes |
|
- Include sufficient context to make matches unique |
|
- Do not include explanations outside the blocks""" |
|
|
|
def _prepare_messages(self, history: List[Tuple[str, str]], system_prompt: str, |
|
enhanced_query: str, vlm_image: Optional[gr.Image]) -> List[Dict]: |
|
"""Prepare messages for LLM interaction""" |
|
messages = [{'role': 'system', 'content': system_prompt}] |
|
|
|
|
|
for user_msg, assistant_msg in history: |
|
|
|
if isinstance(user_msg, list): |
|
text_content = "" |
|
for item in user_msg: |
|
if isinstance(item, dict) and item.get("type") == "text": |
|
text_content += item.get("text", "") |
|
user_msg = text_content if text_content else str(user_msg) |
|
|
|
messages.append({'role': 'user', 'content': user_msg}) |
|
messages.append({'role': 'assistant', 'content': assistant_msg}) |
|
|
|
|
|
if vlm_image is not None: |
|
messages.append(create_multimodal_message(enhanced_query, vlm_image)) |
|
else: |
|
messages.append({'role': 'user', 'content': enhanced_query}) |
|
|
|
return messages |
|
|
|
def _stream_generation(self, messages: List[Dict], current_model: Dict, provider: str, |
|
language: str, query: str, gen_image: Optional[gr.Image], |
|
session_id: str, media_options: Dict) -> Generator: |
|
"""Stream code generation with real-time updates""" |
|
|
|
try: |
|
client = get_inference_client(current_model['id'], provider) |
|
|
|
|
|
if current_model["id"] == "zai-org/GLM-4.5": |
|
yield from self._handle_glm_45_generation(client, messages, language, query, gen_image, session_id, media_options) |
|
return |
|
elif current_model["id"] == "zai-org/GLM-4.5V": |
|
yield from self._handle_glm_45v_generation(client, messages, language, query, session_id, media_options) |
|
return |
|
|
|
|
|
completion = self._create_completion_stream(client, current_model, messages) |
|
content = "" |
|
|
|
|
|
for chunk in completion: |
|
chunk_content = self._extract_chunk_content(chunk, current_model) |
|
|
|
if chunk_content: |
|
content += chunk_content |
|
|
|
|
|
if language == "transformers.js": |
|
yield from self._handle_transformers_streaming(content) |
|
elif language == "svelte": |
|
yield from self._handle_svelte_streaming(content) |
|
else: |
|
yield from self._handle_standard_streaming(content, language) |
|
|
|
|
|
final_content = self._finalize_content(content, language, query, gen_image, session_id, media_options) |
|
|
|
yield { |
|
'code_output': final_content, |
|
'history': [(query, final_content)], |
|
'sandbox': self._generate_preview(final_content, language), |
|
'history_output': self._convert_history_to_messages([(query, final_content)]), |
|
'status': 'completed' |
|
} |
|
|
|
except Exception as e: |
|
raise Exception(f"Streaming generation failed: {str(e)}") |
|
|
|
def _handle_glm_45_generation(self, client, messages, language, query, gen_image, session_id, media_options): |
|
"""Handle GLM-4.5 specific generation""" |
|
try: |
|
stream = client.chat.completions.create( |
|
model="zai-org/GLM-4.5", |
|
messages=messages, |
|
stream=True, |
|
max_tokens=16384, |
|
) |
|
|
|
content = "" |
|
for chunk in stream: |
|
if chunk.choices[0].delta.content: |
|
content += chunk.choices[0].delta.content |
|
clean_code = remove_code_block(content) |
|
|
|
yield { |
|
'code_output': gr.update(value=clean_code, language=self._get_gradio_language(language)), |
|
'sandbox': self._generate_preview(clean_code, language), |
|
'status': 'streaming' |
|
} |
|
|
|
|
|
final_content = apply_generated_media_to_html( |
|
clean_code, query, session_id=session_id, **media_options |
|
) |
|
|
|
yield { |
|
'code_output': final_content, |
|
'history': [(query, final_content)], |
|
'sandbox': self._generate_preview(final_content, language), |
|
'history_output': self._convert_history_to_messages([(query, final_content)]), |
|
'status': 'completed' |
|
} |
|
|
|
except Exception as e: |
|
raise Exception(f"GLM-4.5 generation failed: {str(e)}") |
|
|
|
def _handle_glm_45v_generation(self, client, messages, language, query, session_id, media_options): |
|
"""Handle GLM-4.5V multimodal generation""" |
|
try: |
|
|
|
enhanced_messages = [ |
|
{"role": "system", "content": """You are an expert web developer creating modern, responsive applications. |
|
|
|
Output complete, standalone HTML documents that render directly in browsers. |
|
- Include proper DOCTYPE, head, and body structure |
|
- Use modern CSS frameworks and responsive design |
|
- Ensure accessibility and mobile compatibility |
|
- Output raw HTML without escape characters |
|
|
|
Always output only the HTML code inside ```html ... ``` blocks."""} |
|
] + messages[1:] |
|
|
|
stream = client.chat.completions.create( |
|
model="zai-org/GLM-4.5V", |
|
messages=enhanced_messages, |
|
stream=True, |
|
max_tokens=16384, |
|
) |
|
|
|
content = "" |
|
for chunk in stream: |
|
if hasattr(chunk, "choices") and chunk.choices and hasattr(chunk.choices[0], "delta"): |
|
delta_content = getattr(chunk.choices[0].delta, "content", None) |
|
if delta_content: |
|
content += delta_content |
|
clean_code = remove_code_block(content) |
|
|
|
|
|
if "\\n" in clean_code: |
|
clean_code = clean_code.replace("\\n", "\n") |
|
if "\\t" in clean_code: |
|
clean_code = clean_code.replace("\\t", "\t") |
|
|
|
yield { |
|
'code_output': gr.update(value=clean_code, language=self._get_gradio_language(language)), |
|
'sandbox': self._generate_preview(clean_code, language), |
|
'status': 'streaming' |
|
} |
|
|
|
|
|
clean_code = remove_code_block(content) |
|
if "\\n" in clean_code: |
|
clean_code = clean_code.replace("\\n", "\n") |
|
if "\\t" in clean_code: |
|
clean_code = clean_code.replace("\\t", "\t") |
|
|
|
yield { |
|
'code_output': clean_code, |
|
'history': [(query, clean_code)], |
|
'sandbox': self._generate_preview(clean_code, language), |
|
'history_output': self._convert_history_to_messages([(query, clean_code)]), |
|
'status': 'completed' |
|
} |
|
|
|
except Exception as e: |
|
raise Exception(f"GLM-4.5V generation failed: {str(e)}") |
|
|
|
def _create_completion_stream(self, client, current_model, messages): |
|
"""Create completion stream based on model type""" |
|
if current_model["id"] in ("codestral-2508", "mistral-medium-2508"): |
|
return client.chat.stream( |
|
model=current_model["id"], |
|
messages=messages, |
|
max_tokens=16384 |
|
) |
|
elif current_model["id"] in ("gpt-5", "grok-4", "claude-opus-4.1"): |
|
model_name_map = { |
|
"gpt-5": "GPT-5", |
|
"grok-4": "Grok-4", |
|
"claude-opus-4.1": "Claude-Opus-4.1" |
|
} |
|
return client.chat.completions.create( |
|
model=model_name_map[current_model["id"]], |
|
messages=messages, |
|
stream=True, |
|
max_tokens=16384 |
|
) |
|
else: |
|
return client.chat.completions.create( |
|
model=current_model["id"], |
|
messages=messages, |
|
stream=True, |
|
max_tokens=16384 |
|
) |
|
|
|
def _extract_chunk_content(self, chunk, current_model) -> Optional[str]: |
|
"""Extract content from stream chunk based on model format""" |
|
try: |
|
if current_model["id"] in ("codestral-2508", "mistral-medium-2508"): |
|
|
|
if (hasattr(chunk, "data") and chunk.data and |
|
hasattr(chunk.data, "choices") and chunk.data.choices and |
|
hasattr(chunk.data.choices[0], "delta") and |
|
hasattr(chunk.data.choices[0].delta, "content")): |
|
return chunk.data.choices[0].delta.content |
|
else: |
|
|
|
if (hasattr(chunk, "choices") and chunk.choices and |
|
hasattr(chunk.choices[0], "delta") and |
|
hasattr(chunk.choices[0].delta, "content")): |
|
|
|
content = chunk.choices[0].delta.content |
|
|
|
|
|
if current_model["id"] == "gpt-5" and content: |
|
if self._is_placeholder_thinking_only(content): |
|
return None |
|
return self._strip_placeholder_thinking(content) |
|
|
|
return content |
|
except Exception: |
|
pass |
|
|
|
return None |
|
|
|
def _handle_transformers_streaming(self, content: str) -> Generator: |
|
"""Handle streaming for transformers.js projects""" |
|
files = parse_transformers_js_output(content) |
|
has_all_files = all([files.get('index.html'), files.get('index.js'), files.get('style.css')]) |
|
|
|
if has_all_files: |
|
merged_html = build_transformers_inline_html(files) |
|
yield { |
|
'code_output': gr.update(value=merged_html, language="html"), |
|
'sandbox': self._send_transformers_to_sandbox(files), |
|
'status': 'streaming' |
|
} |
|
else: |
|
yield { |
|
'code_output': gr.update(value=content, language="html"), |
|
'sandbox': "<div style='padding:1em;text-align:center;color:#888;'>Generating transformers.js app...</div>", |
|
'status': 'streaming' |
|
} |
|
|
|
def _handle_svelte_streaming(self, content: str) -> Generator: |
|
"""Handle streaming for Svelte projects""" |
|
yield { |
|
'code_output': gr.update(value=content, language="html"), |
|
'sandbox': "<div style='padding:1em;text-align:center;color:#888;'>Generating Svelte app...</div>", |
|
'status': 'streaming' |
|
} |
|
|
|
def _handle_standard_streaming(self, content: str, language: str) -> Generator: |
|
"""Handle streaming for standard projects""" |
|
clean_code = remove_code_block(content) |
|
preview = self._generate_preview(clean_code, language) |
|
|
|
yield { |
|
'code_output': gr.update(value=clean_code, language=self._get_gradio_language(language)), |
|
'sandbox': preview, |
|
'status': 'streaming' |
|
} |
|
|
|
def _finalize_content(self, content: str, language: str, query: str, |
|
gen_image: Optional[gr.Image], session_id: str, media_options: Dict) -> str: |
|
"""Finalize content with post-processing and media integration""" |
|
final_content = remove_code_block(content) |
|
|
|
|
|
if language == "html": |
|
final_content = apply_generated_media_to_html( |
|
final_content, query, session_id=session_id, **media_options |
|
) |
|
|
|
return final_content |
|
|
|
def _generate_preview(self, content: str, language: str) -> str: |
|
"""Generate preview HTML for different content types""" |
|
if language == "html": |
|
|
|
files = parse_multipage_html_output(content) |
|
files = validate_and_autofix_files(files) |
|
if files and files.get('index.html'): |
|
merged = inline_multipage_into_single_preview(files) |
|
return self._send_to_sandbox(merged) |
|
return self._send_to_sandbox(content) |
|
|
|
elif language == "streamlit": |
|
if is_streamlit_code(content): |
|
return self._send_streamlit_to_stlite(content) |
|
return "<div style='padding:1em;color:#888;text-align:center;'>Add `import streamlit as st` to enable Streamlit preview.</div>" |
|
|
|
elif language == "gradio": |
|
if is_gradio_code(content): |
|
return self._send_gradio_to_lite(content) |
|
return "<div style='padding:1em;color:#888;text-align:center;'>Add `import gradio as gr` to enable Gradio preview.</div>" |
|
|
|
elif language == "python": |
|
if is_streamlit_code(content): |
|
return self._send_streamlit_to_stlite(content) |
|
elif is_gradio_code(content): |
|
return self._send_gradio_to_lite(content) |
|
return "<div style='padding:1em;color:#888;text-align:center;'>Preview available for Streamlit/Gradio apps. Add the appropriate import.</div>" |
|
|
|
elif language == "transformers.js": |
|
files = parse_transformers_js_output(content) |
|
if files['index.html']: |
|
return self._send_transformers_to_sandbox(files) |
|
|
|
return "<div style='padding:1em;color:#888;text-align:center;'>Preview is only available for HTML, Streamlit, and Gradio applications.</div>" |
|
|
|
def _send_to_sandbox(self, code: str) -> str: |
|
"""Send HTML to sandboxed iframe with cache busting""" |
|
import base64 |
|
import time |
|
|
|
|
|
timestamp = str(int(time.time() * 1000)) |
|
cache_bust_comment = f"<!-- refresh-{timestamp} -->" |
|
html_doc = cache_bust_comment + (code or "").strip() |
|
|
|
|
|
try: |
|
html_doc = self._inline_file_urls_as_data_uris(html_doc) |
|
except Exception: |
|
pass |
|
|
|
encoded_html = base64.b64encode(html_doc.encode('utf-8')).decode('utf-8') |
|
data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}" |
|
return f'<iframe src="{data_uri}" width="100%" height="920px" sandbox="allow-scripts allow-same-origin allow-forms allow-popups allow-modals allow-presentation" allow="display-capture" key="preview-{timestamp}"></iframe>' |
|
|
|
def _send_streamlit_to_stlite(self, code: str) -> str: |
|
"""Send Streamlit code to stlite preview""" |
|
import base64 |
|
|
|
html_doc = f"""<!doctype html> |
|
<html> |
|
<head> |
|
<meta charset="UTF-8" /> |
|
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no" /> |
|
<title>Streamlit Preview</title> |
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@stlite/[email protected]/build/stlite.css" /> |
|
<style>html,body{{margin:0;padding:0;height:100%;}} streamlit-app{{display:block;height:100%;}}</style> |
|
<script type="module" src="https://cdn.jsdelivr.net/npm/@stlite/[email protected]/build/stlite.js"></script> |
|
</head> |
|
<body> |
|
<streamlit-app>{code or ""}</streamlit-app> |
|
</body> |
|
</html>""" |
|
|
|
encoded_html = base64.b64encode(html_doc.encode('utf-8')).decode('utf-8') |
|
data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}" |
|
return f'<iframe src="{data_uri}" width="100%" height="920px" sandbox="allow-scripts allow-same-origin allow-forms allow-popups allow-modals allow-presentation" allow="display-capture"></iframe>' |
|
|
|
def _send_gradio_to_lite(self, code: str) -> str: |
|
"""Send Gradio code to gradio-lite preview""" |
|
import base64 |
|
|
|
html_doc = f"""<!doctype html> |
|
<html> |
|
<head> |
|
<meta charset="UTF-8" /> |
|
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no" /> |
|
<title>Gradio Preview</title> |
|
<script type="module" crossorigin src="https://cdn.jsdelivr.net/npm/@gradio/lite/dist/lite.js"></script> |
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@gradio/lite/dist/lite.css" /> |
|
<style>html,body{{margin:0;padding:0;height:100%;}} gradio-lite{{display:block;height:100%;}}</style> |
|
</head> |
|
<body> |
|
<gradio-lite>{code or ""}</gradio-lite> |
|
</body> |
|
</html>""" |
|
|
|
encoded_html = base64.b64encode(html_doc.encode('utf-8')).decode('utf-8') |
|
data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}" |
|
return f'<iframe src="{data_uri}" width="100%" height="920px" sandbox="allow-scripts allow-same-origin allow-forms allow-popups allow-modals allow-presentation" allow="display-capture"></iframe>' |
|
|
|
def _send_transformers_to_sandbox(self, files: Dict[str, str]) -> str: |
|
"""Send transformers.js files to sandbox""" |
|
merged_html = build_transformers_inline_html(files) |
|
return self._send_to_sandbox(merged_html) |
|
|
|
def _inline_file_urls_as_data_uris(self, html_doc: str) -> str: |
|
"""Convert file:// URLs to data URIs for iframe compatibility""" |
|
import base64 |
|
import mimetypes |
|
import urllib.parse |
|
|
|
def _file_url_to_data_uri(file_url: str) -> Optional[str]: |
|
try: |
|
parsed = urllib.parse.urlparse(file_url) |
|
path = urllib.parse.unquote(parsed.path) |
|
if not path: |
|
return None |
|
with open(path, 'rb') as f: |
|
raw = f.read() |
|
mime = mimetypes.guess_type(path)[0] or 'application/octet-stream' |
|
b64 = base64.b64encode(raw).decode() |
|
return f"data:{mime};base64,{b64}" |
|
except Exception: |
|
return None |
|
|
|
def _replace_file_url(match): |
|
url = match.group(1) |
|
data_uri = _file_url_to_data_uri(url) |
|
return f'src="{data_uri}"' if data_uri else match.group(0) |
|
|
|
html_doc = re.sub(r'src="(file:[^"]+)"', _replace_file_url, html_doc) |
|
html_doc = re.sub(r"src='(file:[^']+)'", _replace_file_url, html_doc) |
|
|
|
return html_doc |
|
|
|
def _apply_transformers_js_changes(self, original_content: str, changes_text: str) -> str: |
|
"""Apply search/replace changes to transformers.js content""" |
|
|
|
files = parse_transformers_js_output(original_content) |
|
|
|
|
|
blocks = self._parse_search_replace_blocks(changes_text) |
|
|
|
for block in blocks: |
|
search_text, replace_text = block |
|
|
|
|
|
for file_key in ['index.html', 'index.js', 'style.css']: |
|
if search_text in files[file_key]: |
|
files[file_key] = files[file_key].replace(search_text, replace_text) |
|
break |
|
|
|
return format_transformers_js_output(files) |
|
|
|
def _parse_search_replace_blocks(self, changes_text: str) -> List[Tuple[str, str]]: |
|
"""Parse search/replace blocks from text""" |
|
blocks = [] |
|
current_block = "" |
|
lines = changes_text.split('\n') |
|
|
|
for line in lines: |
|
if line.strip() == SEARCH_START: |
|
if current_block.strip(): |
|
blocks.append(current_block.strip()) |
|
current_block = line + '\n' |
|
elif line.strip() == REPLACE_END: |
|
current_block += line + '\n' |
|
blocks.append(current_block.strip()) |
|
current_block = "" |
|
else: |
|
current_block += line + '\n' |
|
|
|
if current_block.strip(): |
|
blocks.append(current_block.strip()) |
|
|
|
|
|
parsed_blocks = [] |
|
for block in blocks: |
|
lines = block.split('\n') |
|
search_lines = [] |
|
replace_lines = [] |
|
in_search = False |
|
in_replace = False |
|
|
|
for line in lines: |
|
if line.strip() == SEARCH_START: |
|
in_search = True |
|
in_replace = False |
|
elif line.strip() == DIVIDER: |
|
in_search = False |
|
in_replace = True |
|
elif line.strip() == REPLACE_END: |
|
in_replace = False |
|
elif in_search: |
|
search_lines.append(line) |
|
elif in_replace: |
|
replace_lines.append(line) |
|
|
|
if search_lines: |
|
search_text = '\n'.join(search_lines).strip() |
|
replace_text = '\n'.join(replace_lines).strip() |
|
parsed_blocks.append((search_text, replace_text)) |
|
|
|
return parsed_blocks |
|
|
|
def _call_llm(self, client, current_model: Dict, messages: List[Dict], |
|
max_tokens: int = 4000, temperature: float = 0.7) -> Optional[str]: |
|
"""Call LLM and return response content""" |
|
try: |
|
if current_model.get('type') == 'openai': |
|
response = client.chat.completions.create( |
|
model=current_model['id'], |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature |
|
) |
|
return response.choices[0].message.content |
|
elif current_model.get('type') == 'mistral': |
|
response = client.chat.complete( |
|
model=current_model['id'], |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature |
|
) |
|
return response.choices[0].message.content |
|
else: |
|
completion = client.chat.completions.create( |
|
model=current_model['id'], |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature |
|
) |
|
return completion.choices[0].message.content |
|
except Exception as e: |
|
print(f"[GenerationEngine] LLM call failed: {e}") |
|
return None |
|
|
|
def _convert_history_to_messages(self, history: List[Tuple[str, str]]) -> List[Dict[str, str]]: |
|
"""Convert history tuples to message format""" |
|
messages = [] |
|
for user_msg, assistant_msg in history: |
|
if isinstance(user_msg, list): |
|
text_content = "" |
|
for item in user_msg: |
|
if isinstance(item, dict) and item.get("type") == "text": |
|
text_content += item.get("text", "") |
|
user_msg = text_content if text_content else str(user_msg) |
|
|
|
messages.append({"role": "user", "content": user_msg}) |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
return messages |
|
|
|
def _get_gradio_language(self, language: str) -> Optional[str]: |
|
"""Get appropriate Gradio language for syntax highlighting""" |
|
from config import get_gradio_language |
|
return get_gradio_language(language) |
|
|
|
def _is_placeholder_thinking_only(self, text: str) -> bool: |
|
"""Check if text contains only thinking placeholders""" |
|
if not text: |
|
return False |
|
stripped = text.strip() |
|
if not stripped: |
|
return False |
|
return re.fullmatch(r"(?s)(?:\s*Thinking\.\.\.(?:\s*\(\d+s elapsed\))?\s*)+", stripped) is not None |
|
|
|
def _strip_placeholder_thinking(self, text: str) -> str: |
|
"""Remove placeholder thinking status lines""" |
|
if not text: |
|
return text |
|
return re.sub(r"(?mi)^[\t ]*Thinking\.\.\.(?:\s*\(\d+s elapsed\))?[\t ]*$\n?", "", text) |
|
|
|
def _update_avg_response_time(self, elapsed_time: float): |
|
"""Update average response time statistic""" |
|
current_avg = self._generation_stats['avg_response_time'] |
|
total_successful = self._generation_stats['successful_generations'] |
|
|
|
if total_successful <= 1: |
|
self._generation_stats['avg_response_time'] = elapsed_time |
|
else: |
|
|
|
self._generation_stats['avg_response_time'] = (current_avg * (total_successful - 1) + elapsed_time) / total_successful |
|
|
|
def get_generation_stats(self) -> Dict[str, Any]: |
|
"""Get current generation statistics""" |
|
return self._generation_stats.copy() |
|
|
|
def get_active_generations(self) -> Dict[str, Dict[str, Any]]: |
|
"""Get information about currently active generations""" |
|
return self._active_generations.copy() |
|
|
|
|
|
generation_engine = GenerationEngine() |