Spaces:
Running
Running
| import gradio as gr | |
| from PIL import Image | |
| from moviepy.editor import VideoFileClip, AudioFileClip | |
| import os | |
| from openai import OpenAI | |
| import subprocess | |
| from pathlib import Path | |
| import uuid | |
| import tempfile | |
| import shlex | |
| import shutil | |
| import logging | |
| import traceback # For detailed error logging | |
| # --- Configuration --- | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Supported models configuration | |
| MODELS = { | |
| # Format: "Display Name": {"base_url": "...", "env_key": "...", "model_name_for_api": "..."} | |
| # Add your models here | |
| "deepseek-ai/DeepSeek-V3": { | |
| "base_url": "https://api.deepseek.com/v1", | |
| "env_key": "DEEPSEEK_API_KEY", | |
| "model_name_for_api": "deepseek-chat", # Use the specific model name required by DeepSeek API | |
| }, | |
| "Qwen/Qwen2.5-Coder-32B-Instruct": { | |
| "base_url": "https://api-inference.huggingface.co/v1/", # Check if correct for chat completions | |
| "env_key": "HF_TOKEN", | |
| # Note: HF Inference API might use a different endpoint or format for chat completions. | |
| # This base URL might be for text-generation. Adjust if needed. | |
| # Also, the model name might need /chat/completions appended or similar. | |
| "model_name_for_api": "Qwen/Qwen2.5-Coder-32B-Instruct", # Usually the model ID on HF | |
| }, | |
| # Example using a local server (like LM Studio, Ollama) | |
| # "Local Model (via Ollama)": { | |
| # "base_url": "http://localhost:11434/v1", # Ollama's OpenAI-compatible endpoint | |
| # "env_key": "OLLAMA_API_KEY", # Often not needed, use "NONE" or similar if no key | |
| # "model_name_for_api": "qwen:14b", # The specific model name served by Ollama | |
| # }, | |
| } | |
| # Allowed media file extensions | |
| allowed_medias = [ | |
| ".png", ".jpg", ".webp", ".jpeg", ".tiff", ".bmp", ".gif", ".svg", | |
| ".mp3", ".wav", ".ogg", ".aac", ".flac", ".m4a", | |
| ".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".mpg", ".mpeg", ".m4v", | |
| ".3gp", ".3g2", ".3gpp", | |
| ] | |
| # --- Global Variables --- | |
| client = None | |
| initial_model_choice = None | |
| # --- Helper Functions --- | |
| def get_first_available_key_config(): | |
| """Finds the first model config with a valid API key in environment variables.""" | |
| for model_display_name, config in MODELS.items(): | |
| api_key = os.environ.get(config["env_key"]) | |
| # Treat empty string "" as missing key, handle potential "NONE" placeholder | |
| if api_key and api_key.upper() != "NONE": | |
| logging.info(f"Found valid API key for model: {model_display_name}") | |
| return model_display_name, config | |
| logging.warning("No valid API keys found in environment variables for any configured models.") | |
| return None, None | |
| def initialize_client(): | |
| """Initializes the OpenAI client with the first available config.""" | |
| global client, initial_model_choice | |
| initial_model_choice, config = get_first_available_key_config() | |
| if config: | |
| try: | |
| api_key = os.environ.get(config["env_key"]) | |
| # Handle case where key is explicitly set to "NONE" or similar for keyless local models | |
| effective_api_key = api_key if api_key and api_key.upper() != "NONE" else "required-but-not-used" # Placeholder for local models if needed | |
| client = OpenAI( | |
| base_url=config["base_url"], | |
| api_key=effective_api_key, | |
| ) | |
| logging.info(f"OpenAI client initialized for model: {initial_model_choice} using base_url: {config['base_url']}") | |
| except Exception as e: | |
| logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True) | |
| client = None | |
| initial_model_choice = list(MODELS.keys())[0] # Fallback UI selection | |
| else: | |
| client = None | |
| # Set a default model choice for the UI even if client fails | |
| initial_model_choice = list(MODELS.keys())[0] if MODELS else None | |
| def get_files_infos(files): | |
| """Extracts metadata from uploaded files, handling potential errors.""" | |
| results = [] | |
| if not files: | |
| return results | |
| for file_obj in files: | |
| file_path = Path(file_obj.name) | |
| info = {"error": None, "original_name": file_path.name} | |
| try: | |
| info["size"] = os.path.getsize(file_path) | |
| # Sanitize filename (used in ffmpeg command) | |
| info["name"] = file_path.name.replace(" ", "_") | |
| # Validate sanitized name (basic check) | |
| if not info["name"] or "/" in info["name"] or "\\" in info["name"]: | |
| raise ValueError(f"Invalid sanitized filename generated: '{info['name']}'") | |
| file_extension = file_path.suffix.lower() | |
| # Video Processing | |
| if file_extension in (".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".mpg", ".mpeg", ".m4v", ".3gp", ".3g2", ".3gpp"): | |
| info["type"] = "video" | |
| try: | |
| # Ensure ffmpeg is found by moviepy, handle potential issues | |
| if not shutil.which("ffmpeg"): | |
| raise FileNotFoundError("ffmpeg command not found in PATH. MoviePy cannot process video/audio.") | |
| video = VideoFileClip(str(file_path), verbose=False) | |
| info["duration"] = video.duration | |
| info["dimensions"] = f"{video.size[0]}x{video.size[1]}" if video.size else "N/A" | |
| if video.audio: | |
| info["type"] = "video/audio" | |
| info["audio_channels"] = video.audio.nchannels if hasattr(video.audio, 'nchannels') else "N/A" | |
| video.close() # Release file handle | |
| except UnicodeDecodeError as ude: | |
| info["error"] = f"Metadata decoding error ({ude}). Duration/dimensions might be missing." | |
| logging.warning(f"UnicodeDecodeError processing video '{info['original_name']}': {ude}") | |
| except FileNotFoundError as fnf: | |
| info["error"] = str(fnf) | |
| logging.error(f"FFmpeg not found: {fnf}") | |
| except Exception as e: | |
| info["error"] = f"Error reading video metadata ({type(e).__name__})." | |
| logging.warning(f"Error processing video '{info['original_name']}': {e}", exc_info=False) # Log less verbose traceback for common errors | |
| # Audio Processing | |
| elif file_extension in (".mp3", ".wav", ".ogg", ".aac", ".flac", ".m4a"): | |
| info["type"] = "audio" | |
| try: | |
| if not shutil.which("ffmpeg"): | |
| raise FileNotFoundError("ffmpeg command not found in PATH. MoviePy cannot process video/audio.") | |
| audio = AudioFileClip(str(file_path), verbose=False) | |
| info["duration"] = audio.duration | |
| info["audio_channels"] = audio.nchannels if hasattr(audio, 'nchannels') else "N/A" | |
| audio.close() | |
| except UnicodeDecodeError as ude: | |
| info["error"] = f"Metadata decoding error ({ude}). Duration/channels might be missing." | |
| logging.warning(f"UnicodeDecodeError processing audio '{info['original_name']}': {ude}") | |
| except FileNotFoundError as fnf: | |
| info["error"] = str(fnf) | |
| logging.error(f"FFmpeg not found: {fnf}") | |
| except Exception as e: | |
| info["error"] = f"Error reading audio metadata ({type(e).__name__})." | |
| logging.warning(f"Error processing audio '{info['original_name']}': {e}", exc_info=False) | |
| # Image Processing | |
| elif file_extension in (".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".gif", ".svg", ".webp"): | |
| info["type"] = "image" | |
| try: | |
| with Image.open(file_path) as img: | |
| info["dimensions"] = f"{img.size[0]}x{img.size[1]}" | |
| except Exception as e: | |
| info["error"] = f"Error reading image metadata ({type(e).__name__})." | |
| logging.warning(f"Error processing image '{info['original_name']}': {e}", exc_info=False) | |
| else: | |
| info["type"] = "unknown" | |
| info["error"] = "Unsupported file type." | |
| logging.warning(f"Unsupported file type: {info['original_name']}") | |
| except OSError as ose: | |
| info["error"] = f"File system error: {ose}" | |
| logging.error(f"OSError accessing file {file_path}: {ose}", exc_info=True) | |
| if "name" not in info: info["name"] = info["original_name"].replace(" ", "_") # Ensure sanitized name exists | |
| except ValueError as ve: # Catch invalid sanitized name error | |
| info["error"] = str(ve) | |
| logging.error(f"Filename sanitization error for {info['original_name']}: {ve}") | |
| if "name" not in info: info["name"] = f"invalid_name_{uuid.uuid4()}" # Provide a fallback name | |
| except Exception as e: | |
| info["error"] = f"Unexpected error processing file: {e}" | |
| logging.error(f"Unexpected error processing file {file_path}: {e}", exc_info=True) | |
| if "name" not in info: info["name"] = info["original_name"].replace(" ", "_") | |
| results.append(info) | |
| return results | |
| def get_completion(prompt, files_info, top_p, temperature, model_choice): | |
| """Generates the FFMPEG command using the selected AI model.""" | |
| global client # Ensure we use the potentially updated client | |
| if client is None: | |
| # This should ideally be caught earlier, but double-check | |
| raise gr.Error("API Client not initialized. Cannot contact AI.") | |
| if model_choice not in MODELS: | |
| raise ValueError(f"Model '{model_choice}' is not found in configuration.") | |
| model_config = MODELS[model_choice] | |
| model_name_for_api = model_config["model_name_for_api"] | |
| # --- Create files info table (Markdown for the AI) --- | |
| files_info_string = "| Type | Name (for command) | Dimensions | Duration (s) | Audio Channels | Status |\n" | |
| files_info_string += "|------|--------------------|------------|--------------|----------------|--------|\n" | |
| valid_files_count = 0 | |
| for file_info in files_info: | |
| name_for_command = file_info.get("name", "N/A") # Use sanitized name | |
| file_type = file_info.get("type", "N/A") | |
| dimensions = file_info.get("dimensions", "-") | |
| duration_val = file_info.get('duration') | |
| duration_str = f"{duration_val:.2f}" if duration_val is not None else "-" | |
| audio_ch_val = file_info.get('audio_channels') | |
| audio_ch_str = str(audio_ch_val) if audio_ch_val is not None else "-" | |
| status = "Error" if file_info.get("error") else "OK" | |
| if not file_info.get("error"): | |
| valid_files_count += 1 | |
| files_info_string += f"| {file_type} | `{name_for_command}` | {dimensions} | {duration_str} | {audio_ch_str} | {status} |\n" | |
| if file_info.get("error"): | |
| # Provide error details clearly | |
| files_info_string += f"| `Error Details` | `{file_info['error'][:100]}` | - | - | - | - |\n" # Truncate long errors | |
| if valid_files_count == 0: | |
| raise gr.Error("No valid media files could be processed. Please check the file formats or errors.") | |
| # --- Construct Messages for the AI --- | |
| system_prompt = """You are a highly skilled FFMPEG expert simulating a command-line interface. | |
| Given a list of media assets and a user's objective, generate the SIMPLEST POSSIBLE, SINGLE ffmpeg command to achieve the goal. | |
| **Input Files:** Use the filenames provided in the 'Name (for command)' column of the asset list. These names have spaces replaced with underscores. | |
| **Output File:** The final output MUST be named exactly `output.mp4`. | |
| **Output Format:** The final output MUST be a video/mp4 container. | |
| **Key Requirements:** | |
| 1. **Single Command:** Output ONLY the ffmpeg command, on a single line. No explanations, no comments, no introductory text, no code blocks (like ```bash ... ```). | |
| 2. **Simplicity:** Use the minimum required options. Avoid `-filter_complex` unless absolutely necessary. Prefer direct mapping, simple filters (`-vf`, `-af`), concatenation (`concat` demuxer), etc. | |
| 3. **Correctness:** Ensure options, filter syntax, and stream mapping are correct. | |
| 4. **Input Names:** Strictly use the provided sanitized input filenames (e.g., `My_Video.mp4`). | |
| 5. **Output Name:** End the command with `-y output.mp4` (the `-y` allows overwriting). | |
| 6. **Handle Errors:** If an asset has an 'Error' status, try to work around it if possible (e.g., ignore a faulty audio stream if only video is needed), or generate a command that likely fails gracefully if the task is impossible without that asset. Do NOT output error messages yourself, just the command. | |
| 7. **Specific Tasks:** | |
| * *Waveform:* If asked for waveform, use `showwaves` filter (e.g., `"[0:a]showwaves=s=1280x100:mode=line,format=pix_fmts=yuv420p[v]"`), map video and audio (`-map "[v]" -map 0:a?`), and consider making audio mono (`-ac 1`) unless stereo is requested. Use video dimensions if provided, otherwise default to something reasonable like 1280x100. | |
| * *Image Sequence:* Use `-framerate` and pattern (`img%03d.png`) if applicable. For single images, use `-loop 1 -t duration`. | |
| * *Text Overlay:* Use `drawtext` filter. Get position (e.g., `x=(w-text_w)/2:y=h-th-10`), font, size, color from user prompt if possible, otherwise use defaults. | |
| * *Concatenation:* Prefer the `concat` demuxer (requires a temporary file list) over the `concat` filter if possible for simple cases without re-encoding. However, since you MUST output a single command, you might need to use the filter (`[0:v][1:v]concat=n=2:v=1[outv]`) if creating a temp file list isn't feasible within the single command constraint. Prioritize simplicity. | |
| **Example Output:** | |
| ffmpeg -i input_video.mp4 -vf "scale=1280:720" -c:a copy -y output.mp4 | |
| **DO NOT include ```bash or ``` anywhere in your response.** Just the raw command. | |
| """ | |
| user_message_content = f"""Generate the single-line FFMPEG command based on the assets and objective. | |
| **AVAILABLE ASSETS:** | |
| {files_info_string} | |
| **OBJECTIVE:** {prompt} | |
| **FFMPEG Command:** | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_message_content}, | |
| ] | |
| try: | |
| logging.info(f"Sending request to AI model: {model_name_for_api} at {client.base_url}") | |
| # Optional: Log the prompt itself (can be very long) | |
| # logging.debug(f"System Prompt:\n{system_prompt}") | |
| # logging.debug(f"User Message:\n{user_message_content}") | |
| completion = client.chat.completions.create( | |
| model=model_name_for_api, | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_tokens=1024, # Adjust token limit as needed | |
| ) | |
| content = completion.choices[0].message.content.strip() | |
| logging.info(f"AI Raw Response: '{content}'") | |
| # --- Command Validation and Cleaning --- | |
| # Remove potential markdown code blocks manually if AI didn't follow instructions | |
| if content.startswith("```") and content.endswith("```"): | |
| content = re.sub(r"^```(?:bash|sh)?\s*", "", content) | |
| content = re.sub(r"\s*```$", "", content) | |
| content = content.strip() | |
| logging.warning("AI included code blocks despite instructions, attempting cleanup.") | |
| # Remove any leading text before "ffmpeg" if necessary | |
| ffmpeg_index = content.lower().find("ffmpeg ") | |
| if ffmpeg_index > 0: | |
| logging.warning(f"AI included leading text, stripping: '{content[:ffmpeg_index]}'") | |
| content = content[ffmpeg_index:] | |
| elif ffmpeg_index == -1 and not content.lower().startswith("ffmpeg"): | |
| logging.error(f"AI response does not contain 'ffmpeg': '{content}'") | |
| raise ValueError("AI did not generate a valid ffmpeg command.") | |
| # Ensure it ends with the expected output file pattern (flexible space before -y) | |
| if not content.rstrip().endswith("-y output.mp4"): | |
| logging.warning("AI response doesn't end with '-y output.mp4'. Appending it.") | |
| # Append '-y output.mp4' if missing, trying to be robust | |
| if content.rstrip().endswith("output.mp4"): | |
| content = content.rstrip() + " -y output.mp4" # Add -y if only output.mp4 is there | |
| elif not " output.mp4" in content: # Avoid adding if output.mp4 is elsewhere | |
| content = content.rstrip() + " -y output.mp4" | |
| # Remove potential extra newlines | |
| command = content.replace('\n', ' ').replace('\r', '').strip() | |
| if not command: | |
| raise ValueError("AI generated an empty command string.") | |
| logging.info(f"Cleaned AI Command: '{command}'") | |
| return command | |
| except Exception as e: | |
| logging.error(f"Error during AI completion or processing: {e}", exc_info=True) | |
| # Try to give a more specific error to the user | |
| if "authentication" in str(e).lower(): | |
| raise gr.Error(f"AI API Authentication Error. Check your API key ({model_config['env_key']}). Error: {e}") | |
| elif "rate limit" in str(e).lower(): | |
| raise gr.Error(f"AI API Rate Limit Exceeded. Please try again later. Error: {e}") | |
| else: | |
| raise gr.Error(f"Failed to get command from AI. Error: {e}") | |
| # --- Main Gradio Update Function --- | |
| def update( | |
| files, | |
| prompt, | |
| top_p=1, | |
| temperature=1, | |
| model_choice=None, # Default to None, will use initial_model_choice | |
| ): | |
| """Handles the main logic: file processing, AI call, FFMPEG execution.""" | |
| # *** Fix: Declare global client at the beginning *** | |
| global client | |
| # Use initial choice if none provided (e.g., from direct call) | |
| if model_choice is None: | |
| model_choice = initial_model_choice | |
| # --- Input Validations --- | |
| if not files: | |
| raise gr.Error("β Please upload at least one media file.") | |
| if not prompt: | |
| raise gr.Error("π Please enter editing instructions (prompt).") | |
| if not model_choice or model_choice not in MODELS: | |
| raise gr.Error(f"β Invalid model selected: {model_choice}. Please choose from the list.") | |
| # --- Check FFMPEG Availability --- | |
| if not shutil.which("ffmpeg"): | |
| error_msg = "β FFMPEG command not found in system PATH. This application requires FFMPEG to be installed and accessible." | |
| logging.error(error_msg) | |
| raise gr.Error(error_msg) | |
| # --- Check and potentially update API client --- | |
| model_config = MODELS[model_choice] | |
| api_key_env_var = model_config["env_key"] | |
| api_key = os.environ.get(api_key_env_var) | |
| effective_api_key = api_key if api_key and api_key.upper() != "NONE" else "required-but-not-used" | |
| # Check if key is missing (and not intentionally "NONE") | |
| if not api_key and effective_api_key != "required-but-not-used": | |
| raise gr.Error(f"π API Key ({api_key_env_var}) for the selected model '{model_choice}' is missing. Please set it as an environment variable.") | |
| # Initialize or update client if needed | |
| if client is None: | |
| logging.warning(f"Client was None, attempting re-initialization for model: {model_choice}") | |
| try: | |
| client = OpenAI(base_url=model_config["base_url"], api_key=effective_api_key) | |
| logging.info(f"API Client initialized/updated for model: {model_choice}") | |
| except Exception as e: | |
| logging.error(f"Failed to initialize API client: {e}", exc_info=True) | |
| raise gr.Error(f"Failed to initialize API client: {e}") | |
| # If client exists, check if base_url or key needs update for the selected model | |
| elif client.base_url != model_config["base_url"] or client.api_key != effective_api_key: | |
| logging.info(f"Updating API client configuration for selected model: {model_choice}") | |
| client.base_url = model_config["base_url"] | |
| client.api_key = effective_api_key | |
| # --- Get File Infos and Check for Errors --- | |
| logging.info("Processing uploaded files...") | |
| files_info = get_files_infos(files) | |
| file_errors = [f"- '{f.get('original_name', 'Unknown file')}': {f['error']}" | |
| for f in files_info if f.get("error")] | |
| if file_errors: | |
| error_message = "β οΈ Errors occurred while processing uploaded files:\n" + "\n".join(file_errors) | |
| logging.error(error_message) | |
| # Allow proceeding if *some* files are okay, but warn the user. | |
| # Let the AI decide how to handle the errored files based on the prompt. | |
| # If *all* files have errors, then raise the error. | |
| if len(file_errors) == len(files_info): | |
| raise gr.Error(error_message + "\n\nCannot proceed as no files could be read.") | |
| else: | |
| gr.Warning(error_message + "\n\nAttempting to proceed with valid files. The AI will be informed about the errors.") | |
| # --- Validate File Sizes and Durations (Optional limits) --- | |
| for file_info in files_info: | |
| if not file_info.get("error"): # Only check valid files | |
| if "size" in file_info and file_info["size"] > 1024 * 1024 * 1024: # 150MB limit | |
| raise gr.Error(f"File '{file_info.get('original_name')}' ({file_info['size'] / (1024*1024):.1f}MB) exceeds the 150MB size limit.") | |
| if file_info.get("type", "").startswith("video") and "duration" in file_info and file_info["duration"] > 300: # 5 minute limit for videos | |
| raise gr.Error(f"Video '{file_info.get('original_name')}' ({file_info['duration']:.0f}s) exceeds the 5-minute duration limit.") | |
| # --- Get FFMPEG Command from AI --- | |
| command_string = None | |
| try: | |
| logging.info(f"Getting FFMPEG command from AI model: {model_choice}") | |
| command_string = get_completion( | |
| prompt, files_info, top_p, temperature, model_choice | |
| ) | |
| except gr.Error as e: | |
| raise e # Propagate Gradio errors directly | |
| except Exception as e: | |
| logging.error(f"Failed to get command from AI: {e}", exc_info=True) | |
| raise gr.Error(f"Failed to get or process command from AI. Error: {e}") | |
| if not command_string: | |
| raise gr.Error("AI returned an empty command. Please try again or rephrase.") | |
| # --- Prepare Temporary Directory and Execute FFMPEG --- | |
| # Using 'with' ensures cleanup even if errors occur | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| logging.info(f"Created temporary directory: {temp_dir}") | |
| final_output_location = None # Path to the final video outside temp dir | |
| try: | |
| # Copy necessary files to temp dir using sanitized names | |
| logging.info("Copying files to temporary directory...") | |
| input_file_mapping = {} # Map sanitized name to original path if needed | |
| for i, file_obj in enumerate(files): | |
| file_info = files_info[i] | |
| # Only copy files that were processed without error | |
| if not file_info.get("error"): | |
| original_path = Path(file_obj.name) | |
| sanitized_name = file_info['name'] | |
| destination_path = Path(temp_dir) / sanitized_name | |
| try: | |
| shutil.copy(original_path, destination_path) | |
| logging.info(f"Copied '{original_path.name}' -> '{destination_path}'") | |
| input_file_mapping[sanitized_name] = original_path | |
| except Exception as copy_err: | |
| logging.error(f"Failed to copy file {original_path} to {destination_path}: {copy_err}") | |
| # Raise error as ffmpeg will fail if inputs are missing | |
| raise gr.Error(f"Failed to prepare input file: {original_path.name}. Error: {copy_err}") | |
| # --- Parse and Validate FFMPEG Command --- | |
| try: | |
| # Split command string safely | |
| args = shlex.split(command_string) | |
| except ValueError as e: | |
| logging.error(f"Command syntax error: {e}. Command: {command_string}") | |
| raise gr.Error(f"Generated command has syntax errors (e.g., unbalanced quotes): {e}\nCommand: {command_string}") | |
| if not args or args[0].lower() != "ffmpeg": | |
| raise gr.Error(f"Generated command does not start with 'ffmpeg'. Command: {command_string}") | |
| # --- Prepare Final Command Arguments --- | |
| # Define the actual temporary output path *inside* the temp dir | |
| temp_output_file_name = f"output_{uuid.uuid4()}.mp4" | |
| temp_output_path = str(Path(temp_dir) / temp_output_file_name) | |
| # Replace the placeholder 'output.mp4' with the actual temp output path | |
| final_args = [] | |
| output_placeholder_found = False | |
| for arg in args: | |
| if arg == "output.mp4": | |
| # Check if it's preceded by -y, if not, add -y | |
| if final_args and final_args[-1] != "-y": | |
| final_args.append("-y") | |
| final_args.append(temp_output_path) | |
| output_placeholder_found = True | |
| else: | |
| final_args.append(arg) | |
| # If AI forgot output.mp4, add it (shouldn't happen with good prompting) | |
| if not output_placeholder_found: | |
| logging.warning("AI command did not include 'output.mp4'. Appending target output path.") | |
| if final_args[-1] != "-y": | |
| final_args.append("-y") | |
| final_args.append(temp_output_path) | |
| # --- Execute FFMPEG --- | |
| logging.info(f"Executing FFMPEG: {' '.join(final_args)}") | |
| try: | |
| process = subprocess.run( | |
| final_args, | |
| cwd=temp_dir, # Execute in the directory with copied files | |
| capture_output=True, # Captures stdout and stderr | |
| text=True, | |
| encoding='utf-8', errors='replace', | |
| check=True, # Raise CalledProcessError if return code is non-zero | |
| timeout=3000 # 5 minute timeout | |
| ) | |
| logging.info("FFMPEG command executed successfully.") | |
| # Log stderr as it often contains useful info/warnings | |
| if process.stderr: logging.info(f"FFMPEG stderr:\n{process.stderr}") | |
| # Log stdout only if needed for debugging | |
| if process.stdout: logging.debug(f"FFMPEG stdout:\n{process.stdout}") | |
| except subprocess.CalledProcessError as e: | |
| error_output = e.stderr or e.stdout or "No output captured." | |
| logging.error(f"FFMPEG execution failed! Return code: {e.returncode}\nCommand: {' '.join(e.cmd)}\nOutput:\n{error_output}") | |
| error_summary = error_output.strip().split('\n')[-1] # Get last line | |
| raise gr.Error(f"β FFMPEG execution failed: {error_summary}\n(Check logs/console for full command and error details)") | |
| except subprocess.TimeoutExpired as e: | |
| logging.error(f"FFMPEG command timed out after {e.timeout} seconds.\nCommand: {' '.join(e.cmd)}") | |
| raise gr.Error(f"β³ FFMPEG command timed out after {e.timeout} seconds. The operation might be too complex or files too large.") | |
| except FileNotFoundError as e: | |
| # This should be caught earlier, but double-check | |
| logging.error(f"FFMPEG command failed: {e}. Is ffmpeg installed and in PATH?") | |
| raise gr.Error(f"β FFMPEG execution failed: '{e.filename}' not found. Ensure FFMPEG is installed and accessible.") | |
| # --- Copy Result Out of Temp Directory --- | |
| if Path(temp_output_path).exists() and os.path.getsize(temp_output_path) > 0: | |
| # Create an output directory if it doesn't exist | |
| output_dir = Path("./output_videos") | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Copy to a filename based on UUID to avoid collisions | |
| final_output_location = shutil.copy(temp_output_path, output_dir / f"{Path(temp_output_path).stem}.mp4") | |
| logging.info(f"Copied final output video to: {final_output_location}") | |
| else: | |
| logging.error(f"FFMPEG seemed to succeed, but output file '{temp_output_path}' is missing or empty.") | |
| raise gr.Error("β FFMPEG finished, but the output file was not created or is empty. Check the generated command and logs.") | |
| # --- Prepare Display Command (using original placeholder) --- | |
| display_command_markdown = f"### Generated Command\n```bash\n{command_string}\n```" | |
| # --- Return Results --- | |
| return final_output_location, gr.update(value=display_command_markdown) | |
| except Exception as e: | |
| # Catch any other unexpected errors during setup or execution within the temp dir | |
| logging.error(f"Error during processing: {e}", exc_info=True) | |
| # No need to manually cleanup temp_dir, 'with' handles it | |
| if isinstance(e, gr.Error): raise e # Re-raise Gradio errors | |
| else: raise gr.Error(f"An unexpected error occurred: {e}") | |
| # --- Initialize Client on Startup --- | |
| initialize_client() | |
| if client is None and initial_model_choice: | |
| logging.warning("Application starting without a functional AI client due to initialization errors or missing keys.") | |
| # Consider showing a warning in the UI if possible, or rely on errors during `update` | |
| # --- Gradio Interface Definition --- | |
| with gr.Blocks(title="AI Video Editor - Edit with Natural Language", theme=gr.themes.Soft(primary_hue=gr.themes.colors.sky)) as demo: | |
| gr.Markdown( | |
| """ | |
| # ποΈ AI Video Editor: Your Smart Editing Assistant π¬ | |
| Welcome to the AI Video Editor! This tool uses AI models like **DeepSeek-V3** or **Qwen** to understand your editing needs in plain English. | |
| Upload your media, describe the desired result, and the AI generates the **FFMPEG command** to create your video. | |
| **No complex software needed!** Ideal for quick edits, learning FFMPEG, or automating simple video tasks. Trim, merge, add text, change speed, apply filters, combine media β just tell the AI! | |
| **Get started:** Upload files, type instructions, click **"π Run Edit"**! | |
| *(Ensure FFMPEG is installed on the system running this app.)* | |
| """, | |
| elem_id="header", | |
| ) | |
| with gr.Accordion("π Usage Instructions & Examples", open=False): | |
| gr.Markdown( | |
| """ | |
| ### How to Use | |
| 1. **Upload Files**: Use the "Upload Media Files" area. | |
| 2. **Write Instructions**: Describe the edit in the "Instructions" box. | |
| 3. **(Optional) Adjust Parameters**: Select AI model, tweak Top-p/Temperature for creativity. | |
| 4. **Generate**: Click **"π Run Edit"**. | |
| 5. **Review**: Watch the result in "Generated Video Output". The FFMPEG command used appears below. | |
| ### Example Instructions | |
| * `Trim the video to keep only the segment from 10s to 25s.` | |
| * `Concatenate video1.mp4 and video2.mp4.` | |
| * `Add text "Hello World" at the bottom center, white font, size 24.` | |
| * `Convert video to black and white.` | |
| * `Create slideshow from image1.jpg, image2.png (5s each) with background.mp3.` | |
| * `Resize video to 1280x720.` | |
| * `Speed up video 2x.` | |
| * `Generate waveform visualization for the audio file, 1280x120 pixels.` | |
| ### Tips | |
| * **Be Specific**: "remove first 5 seconds" is better than "make shorter". | |
| * **Use Filenames**: Refer to files like `Combine intro.mp4 and main.mp4` (AI uses names with underscores). | |
| * **Details Matter**: For text, specify position, color, size. For fades, mention duration. | |
| * **Keep it Simple**: One main goal per instruction works best. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| user_files = gr.File( | |
| file_count="multiple", | |
| label="π€ Upload Media Files", | |
| file_types=allowed_medias, | |
| ) | |
| user_prompt = gr.Textbox( | |
| placeholder="e.g., 'Combine video1.mp4 and video2.mp4'", | |
| label="π Instructions / Editing Objective", | |
| lines=3, | |
| ) | |
| with gr.Accordion("βοΈ Advanced Parameters", open=False): | |
| # Ensure initial_model_choice is valid before setting value | |
| valid_initial_model = initial_model_choice if initial_model_choice in MODELS else (list(MODELS.keys())[0] if MODELS else None) | |
| model_choice_dropdown = gr.Dropdown( # Changed to Dropdown for better UI with many models | |
| choices=list(MODELS.keys()), | |
| value=valid_initial_model, | |
| label="π§ Select AI Model", | |
| ) | |
| top_p_slider = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.7, step=0.05, | |
| label="Top-p (Diversity)", info="Lower values = more focused, higher = more random." | |
| ) | |
| temperature_slider = gr.Slider( | |
| minimum=0.0, maximum=2.0, value=0.2, step=0.1, # Default lower temp for more predictable ffmpeg | |
| label="Temperature (Randomness)", info="Lower values = more deterministic, higher = more creative/random." | |
| ) | |
| run_button = gr.Button("π Run Edit", variant="primary") | |
| with gr.Column(scale=1): | |
| generated_video_output = gr.Video( | |
| label="π¬ Generated Video Output", | |
| interactive=False, | |
| include_audio=True, | |
| ) | |
| generated_command_output = gr.Markdown(label="π» Generated FFMPEG Command") | |
| # --- Event Handling --- | |
| run_button.click( | |
| fn=update, | |
| inputs=[user_files, user_prompt, top_p_slider, temperature_slider, model_choice_dropdown], | |
| outputs=[generated_video_output, generated_command_output], | |
| api_name="generate_edit" | |
| ) | |
| # --- Examples --- | |
| # IMPORTANT: Update example file paths relative to where you run the script! | |
| # Create an 'examples' folder or adjust paths. | |
| example_list = [ | |
| [ | |
| ["./examples/video1.mp4"], # Make sure this path exists | |
| "Add text 'Watermark' to the top right corner, white font, size 18, slightly transparent.", | |
| 0.7, 0.2, list(MODELS.keys())[0] if MODELS else None, | |
| ], | |
| [ | |
| ["./examples/video1.mp4"], | |
| "Cut the video to keep only 10 seconds, starting from 00:00:15.", | |
| 0.7, 0.2, list(MODELS.keys())[min(1, len(MODELS)-1)] if len(MODELS) > 1 else (list(MODELS.keys())[0] if MODELS else None), | |
| ], | |
| [ | |
| ["./examples/video2.mp4"], # Make sure this path exists | |
| "Convert the video to grayscale (black and white).", | |
| 0.7, 0.2, list(MODELS.keys())[0] if MODELS else None, | |
| ], | |
| [ | |
| ["./examples/image1.jpg", "./examples/image2.png", "./examples/audio.mp3"], # Make sure paths exist | |
| "Create a slideshow: image1.jpg for 5s, then image2.png for 5s. Use audio.mp3 as background music. Output size 1920x1080.", | |
| 0.7, 0.2, list(MODELS.keys())[0] if MODELS else None, | |
| ], | |
| ] | |
| # Filter out examples if no models are configured | |
| valid_examples = [ex for ex in example_list if ex[4] is not None] | |
| if valid_examples: | |
| gr.Examples( | |
| examples=valid_examples, | |
| inputs=[user_files, user_prompt, top_p_slider, temperature_slider, model_choice_dropdown], | |
| outputs=[generated_video_output, generated_command_output], | |
| fn=update, | |
| cache_examples=True, # Keep False unless examples are very stable and slow | |
| label="β¨ Example Use Cases (Click to Run)", | |
| run_on_click=False, | |
| ) | |
| else: | |
| gr.Markdown("_(Examples disabled as no models seem to be configured with API keys)_") | |
| # Footer removed as requested | |
| # --- Launch the App --- | |
| if __name__ == "__main__": | |
| # Set concurrency limit based on resources | |
| demo.queue(default_concurrency_limit=20) | |
| # Launch on 0.0.0.0 to make accessible on network if needed | |
| # demo.launch(show_api=False, server_name="0.0.0.0") | |
| demo.launch(show_api=False) # Default for local/Hugging Face Spaces |