|
import os |
|
import re |
|
import mimetypes |
|
import tempfile |
|
import uuid |
|
import datetime |
|
import base64 |
|
import time |
|
import threading |
|
import atexit |
|
from typing import Dict, List, Optional, Tuple, Union |
|
from pathlib import Path |
|
|
|
import PyPDF2 |
|
import docx |
|
import cv2 |
|
import numpy as np |
|
from PIL import Image |
|
import pytesseract |
|
from huggingface_hub import InferenceClient, HfApi |
|
import gradio as gr |
|
|
|
from config import HF_TOKEN, SEARCH_START, DIVIDER, REPLACE_END, TEMP_DIR_TTL_SECONDS |
|
|
|
|
|
MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media") |
|
VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos") |
|
AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio") |
|
|
|
_SESSION_MEDIA_FILES: Dict[str, List[str]] = {} |
|
_SESSION_VIDEO_FILES: Dict[str, List[str]] = {} |
|
_SESSION_AUDIO_FILES: Dict[str, List[str]] = {} |
|
_MEDIA_FILES_LOCK = threading.Lock() |
|
_VIDEO_FILES_LOCK = threading.Lock() |
|
_AUDIO_FILES_LOCK = threading.Lock() |
|
|
|
temp_media_files = {} |
|
|
|
def ensure_temp_dirs(): |
|
"""Ensure all temporary directories exist""" |
|
for temp_dir in [MEDIA_TEMP_DIR, VIDEO_TEMP_DIR, AUDIO_TEMP_DIR]: |
|
try: |
|
os.makedirs(temp_dir, exist_ok=True) |
|
except Exception: |
|
pass |
|
|
|
def get_inference_client(model_id: str, provider: str = "auto"): |
|
"""Return an InferenceClient based on model_id and provider""" |
|
if not HF_TOKEN: |
|
raise RuntimeError("HF_TOKEN environment variable is not set") |
|
|
|
|
|
openai_models = { |
|
"qwen3-30b-a3b-instruct-2507": { |
|
"api_key": os.getenv("DASHSCOPE_API_KEY"), |
|
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1" |
|
}, |
|
"gpt-5": { |
|
"api_key": os.getenv("POE_API_KEY"), |
|
"base_url": "https://api.poe.com/v1" |
|
}, |
|
"kimi-k2-turbo-preview": { |
|
"api_key": os.getenv("MOONSHOT_API_KEY"), |
|
"base_url": "https://api.moonshot.ai/v1" |
|
}, |
|
"gemini-2.5-flash": { |
|
"api_key": os.getenv("GEMINI_API_KEY"), |
|
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai/" |
|
} |
|
} |
|
|
|
if model_id in openai_models: |
|
from openai import OpenAI |
|
config = openai_models[model_id] |
|
return OpenAI(api_key=config["api_key"], base_url=config["base_url"]) |
|
|
|
|
|
if model_id in ("codestral-2508", "mistral-medium-2508"): |
|
from mistralai import Mistral |
|
return Mistral(api_key=os.getenv("MISTRAL_API_KEY")) |
|
|
|
|
|
provider_map = { |
|
"openai/gpt-oss-120b": "groq", |
|
"openai/gpt-oss-20b": "groq", |
|
"Qwen/Qwen3-235B-A22B": "cerebras", |
|
"Qwen/Qwen3-Coder-480B-A35B-Instruct": "cerebras", |
|
"deepseek-ai/DeepSeek-V3.1": "novita", |
|
"zai-org/GLM-4.5": "fireworks-ai" |
|
} |
|
|
|
if model_id in provider_map: |
|
provider = provider_map[model_id] |
|
|
|
return InferenceClient( |
|
provider=provider, |
|
api_key=HF_TOKEN, |
|
bill_to="huggingface" |
|
) |
|
|
|
def remove_code_block(text: str) -> str: |
|
"""Remove code block markers from text""" |
|
if not text: |
|
return text |
|
|
|
patterns = [ |
|
r'```(?:html|HTML)\n([\s\S]+?)\n```', |
|
r'```\n([\s\S]+?)\n```', |
|
r'```([\s\S]+?)```' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, text, re.DOTALL) |
|
if match: |
|
extracted = match.group(1).strip() |
|
|
|
|
|
lines = extracted.split('\n', 1) |
|
if lines[0].strip().lower() in ['python', 'html', 'css', 'javascript', 'json']: |
|
return lines[1] if len(lines) > 1 else '' |
|
|
|
|
|
for tag in ['<!DOCTYPE html', '<html']: |
|
idx = extracted.find(tag) |
|
if idx > 0: |
|
return extracted[idx:].strip() |
|
|
|
return extracted |
|
|
|
|
|
stripped = text.strip() |
|
if stripped.startswith(('<!DOCTYPE html>', '<html', '<')): |
|
for tag in ['<!DOCTYPE html', '<html']: |
|
idx = stripped.find(tag) |
|
if idx > 0: |
|
return stripped[idx:].strip() |
|
return stripped |
|
|
|
return text.strip() |
|
|
|
def extract_text_from_image(image_path: str) -> str: |
|
"""Extract text from image using OCR""" |
|
try: |
|
|
|
try: |
|
pytesseract.get_tesseract_version() |
|
except Exception: |
|
return "Error: Tesseract OCR is not installed. Please install Tesseract to extract text from images." |
|
|
|
|
|
image = cv2.imread(image_path) |
|
if image is None: |
|
return "Error: Could not read image file" |
|
|
|
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
gray = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2GRAY) |
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
|
|
text = pytesseract.image_to_string(binary, config='--psm 6') |
|
return text.strip() if text.strip() else "No text found in image" |
|
|
|
except Exception as e: |
|
return f"Error extracting text from image: {e}" |
|
|
|
def extract_text_from_file(file_path: str) -> str: |
|
"""Extract text from various file formats""" |
|
if not file_path or not os.path.exists(file_path): |
|
return "" |
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
|
try: |
|
if ext == ".pdf": |
|
with open(file_path, "rb") as f: |
|
reader = PyPDF2.PdfReader(f) |
|
return "\n".join(page.extract_text() or "" for page in reader.pages) |
|
|
|
elif ext in [".txt", ".md", ".csv"]: |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
return f.read() |
|
|
|
elif ext == ".docx": |
|
doc = docx.Document(file_path) |
|
return "\n".join([para.text for para in doc.paragraphs]) |
|
|
|
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".gif", ".webp"]: |
|
return extract_text_from_image(file_path) |
|
|
|
else: |
|
return "" |
|
|
|
except Exception as e: |
|
return f"Error extracting text: {e}" |
|
|
|
def compress_media_for_data_uri(media_bytes: bytes, media_type: str = "video", max_size_mb: int = 8) -> bytes: |
|
"""Compress media bytes for data URI embedding""" |
|
max_size = max_size_mb * 1024 * 1024 |
|
|
|
if len(media_bytes) <= max_size: |
|
return media_bytes |
|
|
|
print(f"[MediaCompress] {media_type} size {len(media_bytes)} bytes exceeds {max_size_mb}MB limit, attempting compression") |
|
|
|
try: |
|
import subprocess |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=f'.{media_type[:3]}', delete=False) as temp_input: |
|
temp_input.write(media_bytes) |
|
temp_input_path = temp_input.name |
|
|
|
temp_output_path = temp_input_path.replace(f'.{media_type[:3]}', f'_compressed.{media_type[:3]}') |
|
|
|
try: |
|
if media_type == "video": |
|
|
|
subprocess.run([ |
|
'ffmpeg', '-i', temp_input_path, |
|
'-vcodec', 'libx264', '-crf', '30', '-preset', 'fast', |
|
'-vf', 'scale=480:-1', '-r', '15', |
|
'-an', |
|
'-y', temp_output_path |
|
], check=True, capture_output=True, stderr=subprocess.DEVNULL) |
|
else: |
|
subprocess.run([ |
|
'ffmpeg', '-i', temp_input_path, |
|
'-codec:a', 'libmp3lame', '-b:a', '64k', |
|
'-y', temp_output_path |
|
], check=True, capture_output=True, stderr=subprocess.DEVNULL) |
|
|
|
|
|
with open(temp_output_path, 'rb') as f: |
|
compressed_bytes = f.read() |
|
|
|
print(f"[MediaCompress] Compressed from {len(media_bytes)} to {len(compressed_bytes)} bytes") |
|
return compressed_bytes |
|
|
|
except (subprocess.CalledProcessError, FileNotFoundError): |
|
print(f"[MediaCompress] ffmpeg compression failed, using original {media_type}") |
|
return media_bytes |
|
finally: |
|
|
|
for path in [temp_input_path, temp_output_path]: |
|
try: |
|
if os.path.exists(path): |
|
os.remove(path) |
|
except Exception: |
|
pass |
|
|
|
except Exception as e: |
|
print(f"[MediaCompress] Compression failed: {e}, using original {media_type}") |
|
return media_bytes |
|
|
|
def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", |
|
session_id: Optional[str] = None) -> str: |
|
"""Create a temporary file and return a local URL for preview""" |
|
try: |
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
unique_id = str(uuid.uuid4())[:8] |
|
base_name, ext = os.path.splitext(filename) |
|
unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}" |
|
|
|
|
|
ensure_temp_dirs() |
|
temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename) |
|
|
|
with open(temp_path, 'wb') as f: |
|
f.write(media_bytes) |
|
|
|
|
|
if session_id: |
|
track_session_media_file(session_id, temp_path) |
|
|
|
|
|
file_id = f"{media_type}_{unique_id}" |
|
temp_media_files[file_id] = { |
|
'path': temp_path, |
|
'filename': filename, |
|
'media_type': media_type, |
|
'media_bytes': media_bytes |
|
} |
|
|
|
file_url = f"file://{temp_path}" |
|
print(f"[TempMedia] Created temporary {media_type} file: {file_url}") |
|
return file_url |
|
|
|
except Exception as e: |
|
print(f"[TempMedia] Failed to create temporary file: {str(e)}") |
|
return f"Error creating temporary {media_type} file: {str(e)}" |
|
|
|
def track_session_media_file(session_id: Optional[str], file_path: str) -> None: |
|
"""Track a media file for session-based cleanup""" |
|
if not session_id or not file_path: |
|
return |
|
|
|
with _MEDIA_FILES_LOCK: |
|
if session_id not in _SESSION_MEDIA_FILES: |
|
_SESSION_MEDIA_FILES[session_id] = [] |
|
_SESSION_MEDIA_FILES[session_id].append(file_path) |
|
|
|
def cleanup_session_media(session_id: Optional[str]) -> None: |
|
"""Clean up media files for a specific session""" |
|
if not session_id: |
|
return |
|
|
|
with _MEDIA_FILES_LOCK: |
|
files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, []) |
|
|
|
for path in files_to_clean: |
|
try: |
|
if path and os.path.exists(path): |
|
os.unlink(path) |
|
except Exception: |
|
pass |
|
|
|
def reap_old_media(ttl_seconds: int = TEMP_DIR_TTL_SECONDS) -> None: |
|
"""Delete old media files based on modification time""" |
|
try: |
|
ensure_temp_dirs() |
|
now_ts = time.time() |
|
|
|
for temp_dir in [MEDIA_TEMP_DIR, VIDEO_TEMP_DIR, AUDIO_TEMP_DIR]: |
|
if not os.path.exists(temp_dir): |
|
continue |
|
|
|
for name in os.listdir(temp_dir): |
|
path = os.path.join(temp_dir, name) |
|
if os.path.isfile(path): |
|
try: |
|
mtime = os.path.getmtime(path) |
|
if (now_ts - mtime) > ttl_seconds: |
|
os.unlink(path) |
|
except Exception: |
|
pass |
|
except Exception: |
|
pass |
|
|
|
def cleanup_all_temp_media(): |
|
"""Clean up all temporary media files""" |
|
try: |
|
print("[Cleanup] Cleaning up temporary media files...") |
|
|
|
|
|
for file_id, file_info in temp_media_files.items(): |
|
try: |
|
if os.path.exists(file_info['path']): |
|
os.unlink(file_info['path']) |
|
except Exception: |
|
pass |
|
temp_media_files.clear() |
|
|
|
|
|
with _MEDIA_FILES_LOCK: |
|
for session_files in _SESSION_MEDIA_FILES.values(): |
|
for path in session_files: |
|
try: |
|
if path and os.path.exists(path): |
|
os.unlink(path) |
|
except Exception: |
|
pass |
|
_SESSION_MEDIA_FILES.clear() |
|
|
|
print("[Cleanup] Temporary media cleanup completed") |
|
except Exception as e: |
|
print(f"[Cleanup] Error during cleanup: {str(e)}") |
|
|
|
def process_image_for_model(image) -> Optional[str]: |
|
"""Convert image to base64 for model input""" |
|
if image is None: |
|
return None |
|
|
|
import io |
|
import base64 |
|
import numpy as np |
|
from PIL import Image as PILImage |
|
|
|
|
|
if isinstance(image, np.ndarray): |
|
image = PILImage.fromarray(image) |
|
|
|
buffer = io.BytesIO() |
|
image.save(buffer, format='PNG') |
|
img_str = base64.b64encode(buffer.getvalue()).decode('utf-8') |
|
return f"data:image/png;base64,{img_str}" |
|
|
|
def create_multimodal_message(text: str, image=None) -> Dict: |
|
"""Create a chat message with optional image""" |
|
if image is None: |
|
return {"role": "user", "content": text} |
|
|
|
|
|
return {"role": "user", "content": f"{text}\n\n[An image was provided as reference.]"} |
|
|
|
def apply_search_replace_changes(original_content: str, changes_text: str) -> str: |
|
"""Apply search/replace changes to content""" |
|
if not changes_text.strip(): |
|
return original_content |
|
|
|
|
|
if (SEARCH_START not in changes_text) and (DIVIDER not in changes_text) and (REPLACE_END not in changes_text): |
|
try: |
|
updated_content = original_content |
|
replaced_any_rule = False |
|
|
|
|
|
css_blocks = re.findall(r"([^{]+)\{([\s\S]*?)\}", changes_text, flags=re.MULTILINE) |
|
|
|
for selector_raw, body_raw in css_blocks: |
|
selector = selector_raw.strip() |
|
body = body_raw.strip() |
|
if not selector: |
|
continue |
|
|
|
pattern = re.compile(rf"({re.escape(selector)}\s*\{{)([\s\S]*?)(\}})") |
|
|
|
def _replace_rule(match): |
|
nonlocal replaced_any_rule |
|
replaced_any_rule = True |
|
prefix, existing_body, suffix = match.groups() |
|
|
|
|
|
first_line_indent = "" |
|
for line in existing_body.splitlines(): |
|
stripped = line.lstrip(" \t") |
|
if stripped: |
|
first_line_indent = line[: len(line) - len(stripped)] |
|
break |
|
|
|
if body: |
|
new_body_lines = [first_line_indent + line if line.strip() else line for line in body.splitlines()] |
|
new_body_text = "\n" + "\n".join(new_body_lines) + "\n" |
|
else: |
|
new_body_text = existing_body |
|
|
|
return f"{prefix}{new_body_text}{suffix}" |
|
|
|
updated_content, num_subs = pattern.subn(_replace_rule, updated_content, count=1) |
|
|
|
if replaced_any_rule: |
|
return updated_content |
|
except Exception: |
|
pass |
|
|
|
|
|
blocks = [] |
|
current_block = "" |
|
lines = changes_text.split('\n') |
|
|
|
for line in lines: |
|
if line.strip() == SEARCH_START: |
|
if current_block.strip(): |
|
blocks.append(current_block.strip()) |
|
current_block = line + '\n' |
|
elif line.strip() == REPLACE_END: |
|
current_block += line + '\n' |
|
blocks.append(current_block.strip()) |
|
current_block = "" |
|
else: |
|
current_block += line + '\n' |
|
|
|
if current_block.strip(): |
|
blocks.append(current_block.strip()) |
|
|
|
modified_content = original_content |
|
|
|
for block in blocks: |
|
if not block.strip(): |
|
continue |
|
|
|
lines = block.split('\n') |
|
search_lines = [] |
|
replace_lines = [] |
|
in_search = False |
|
in_replace = False |
|
|
|
for line in lines: |
|
if line.strip() == SEARCH_START: |
|
in_search = True |
|
in_replace = False |
|
elif line.strip() == DIVIDER: |
|
in_search = False |
|
in_replace = True |
|
elif line.strip() == REPLACE_END: |
|
in_replace = False |
|
elif in_search: |
|
search_lines.append(line) |
|
elif in_replace: |
|
replace_lines.append(line) |
|
|
|
if search_lines: |
|
search_text = '\n'.join(search_lines).strip() |
|
replace_text = '\n'.join(replace_lines).strip() |
|
|
|
if search_text in modified_content: |
|
modified_content = modified_content.replace(search_text, replace_text) |
|
else: |
|
print(f"Warning: Search text not found: {search_text[:100]}...") |
|
|
|
return modified_content |
|
|
|
def validate_video_html(video_html: str) -> bool: |
|
"""Validate that video HTML is well-formed and safe""" |
|
try: |
|
if not video_html or not video_html.strip(): |
|
return False |
|
|
|
if '<video' not in video_html or '</video>' not in video_html: |
|
return False |
|
|
|
if '<source' not in video_html: |
|
return False |
|
|
|
|
|
has_data_uri = 'data:video/mp4;base64,' in video_html |
|
has_hf_url = 'https://huggingface.co/datasets/' in video_html and '/resolve/main/' in video_html |
|
has_file_url = 'file://' in video_html |
|
|
|
if not (has_data_uri or has_hf_url or has_file_url): |
|
return False |
|
|
|
|
|
video_start = video_html.find('<video') |
|
video_end = video_html.find('</video>') + 8 |
|
if video_start == -1 or video_end == 7: |
|
return False |
|
|
|
return True |
|
except Exception: |
|
return False |
|
|
|
|
|
atexit.register(cleanup_all_temp_media) |