Spaces:
Build error
Build error
| import os | |
| import re | |
| import json | |
| import time | |
| import random | |
| import tempfile | |
| import requests | |
| import numpy as np | |
| from PIL import Image | |
| from io import BytesIO | |
| from datetime import datetime | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| import moviepy.editor as mpy | |
| from moviepy.editor import * | |
| from moviepy.audio.fx.all import volumex | |
| from moviepy.video.fx.all import crop | |
| # Load environment variables from .env file if present | |
| load_dotenv() | |
| # Constants | |
| CACHE_DIR = os.path.join(tempfile.gettempdir(), "yt_shorts_generator") | |
| ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets") | |
| MUSIC_DIR = os.path.join(ASSETS_DIR, "background_music") | |
| FONTS_DIR = os.path.join(ASSETS_DIR, "fonts") | |
| # Create necessary directories | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| os.makedirs(MUSIC_DIR, exist_ok=True) | |
| os.makedirs(FONTS_DIR, exist_ok=True) | |
| # Helper functions for logging | |
| def info(message): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] [INFO] {message}" | |
| print(formatted_message) | |
| return formatted_message | |
| def success(message): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] [SUCCESS] {message}" | |
| print(formatted_message) | |
| return formatted_message | |
| def warning(message): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] [WARNING] {message}" | |
| print(formatted_message) | |
| return formatted_message | |
| def error(message): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] [ERROR] {message}" | |
| print(formatted_message) | |
| return formatted_message | |
| def get_music_files(): | |
| """Get list of available music files in the music directory.""" | |
| if not os.path.exists(MUSIC_DIR): | |
| return ["none"] | |
| music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] | |
| if not music_files: | |
| return ["none"] | |
| return ["random"] + music_files | |
| def get_font_files(): | |
| """Get list of available font files in the fonts directory.""" | |
| if not os.path.exists(FONTS_DIR): | |
| return ["default"] | |
| font_files = [f.split('.')[0] for f in os.listdir(FONTS_DIR) if f.endswith(('.ttf', '.otf'))] | |
| if not font_files: | |
| return ["default"] | |
| return ["default"] + font_files | |
| def choose_random_music(): | |
| """Selects a random music file from the music directory.""" | |
| if not os.path.exists(MUSIC_DIR): | |
| error(f"Music directory {MUSIC_DIR} does not exist") | |
| return None | |
| music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] | |
| if not music_files: | |
| warning(f"No music files found in {MUSIC_DIR}") | |
| return None | |
| return os.path.join(MUSIC_DIR, random.choice(music_files)) | |
| class YouTube: | |
| def __init__(self, niche: str, language: str, | |
| text_gen="g4f", text_model="gpt-4", | |
| image_gen="g4f", image_model="flux", | |
| tts_engine="edge", tts_voice="en-US-AriaNeural", | |
| subtitle_font="default", font_size=80, | |
| text_color="white", highlight_color="blue", | |
| subtitles_enabled=True, highlighting_enabled=True, | |
| subtitle_position="bottom", music_file="random", | |
| api_keys=None, progress=gr.Progress()) -> None: | |
| """Initialize the YouTube Shorts Generator.""" | |
| self.progress = progress | |
| self.progress(0, desc="Initializing") | |
| # Store basic parameters | |
| info(f"Initializing YouTube class") | |
| self._niche = niche | |
| self._language = language | |
| self.text_gen = text_gen | |
| self.text_model = text_model | |
| self.image_gen = image_gen | |
| self.image_model = image_model | |
| self.tts_engine = tts_engine | |
| self.tts_voice = tts_voice | |
| self.subtitle_font = subtitle_font | |
| self.font_size = font_size | |
| self.text_color = text_color | |
| self.highlight_color = highlight_color | |
| self.subtitles_enabled = subtitles_enabled | |
| self.highlighting_enabled = highlighting_enabled | |
| self.subtitle_position = subtitle_position | |
| self.music_file = music_file | |
| self.api_keys = api_keys or {} | |
| self.images = [] | |
| self.logs = [] | |
| # Set API keys from parameters or environment variables | |
| if 'gemini' in self.api_keys and self.api_keys['gemini']: | |
| os.environ["GEMINI_API_KEY"] = self.api_keys['gemini'] | |
| if 'assemblyai' in self.api_keys and self.api_keys['assemblyai']: | |
| os.environ["ASSEMBLYAI_API_KEY"] = self.api_keys['assemblyai'] | |
| if 'elevenlabs' in self.api_keys and self.api_keys['elevenlabs']: | |
| os.environ["ELEVENLABS_API_KEY"] = self.api_keys['elevenlabs'] | |
| if 'segmind' in self.api_keys and self.api_keys['segmind']: | |
| os.environ["SEGMIND_API_KEY"] = self.api_keys['segmind'] | |
| if 'openai' in self.api_keys and self.api_keys['openai']: | |
| os.environ["OPENAI_API_KEY"] = self.api_keys['openai'] | |
| info(f"Niche: {niche}, Language: {language}") | |
| self.log(f"Initialized with niche: {niche}, language: {language}") | |
| self.log(f"Text generator: {text_gen} - Model: {text_model}") | |
| self.log(f"Image generator: {image_gen} - Model: {image_model}") | |
| self.log(f"TTS engine: {tts_engine} - Voice: {tts_voice}") | |
| self.log(f"Subtitles: {'Enabled' if subtitles_enabled else 'Disabled'} - Highlighting: {'Enabled' if highlighting_enabled else 'Disabled'}") | |
| self.log(f"Music: {music_file}") | |
| def log(self, message): | |
| """Add a log message to the logs list.""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| self.logs.append(log_entry) | |
| return log_entry | |
| def niche(self) -> str: | |
| return self._niche | |
| def language(self) -> str: | |
| return self._language | |
| def generate_response(self, prompt: str, model: str = None) -> str: | |
| """Generate a response using the selected text generation model.""" | |
| self.log(f"Generating response for prompt: {prompt[:50]}...") | |
| try: | |
| if self.text_gen == "gemini": | |
| self.log("Using Google's Gemini model") | |
| # Check if API key is set | |
| gemini_api_key = os.environ.get("GEMINI_API_KEY", "") | |
| if not gemini_api_key: | |
| raise ValueError("Gemini API key is not set. Please provide a valid API key.") | |
| import google.generativeai as genai | |
| genai.configure(api_key=gemini_api_key) | |
| model_to_use = model if model else self.text_model | |
| genai_model = genai.GenerativeModel(model_to_use) | |
| response = genai_model.generate_content(prompt).text | |
| elif self.text_gen == "g4f": | |
| self.log("Using G4F for text generation") | |
| import g4f | |
| model_to_use = model if model else self.text_model | |
| self.log(f"Using G4F model: {model_to_use}") | |
| response = g4f.ChatCompletion.create( | |
| model=model_to_use, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| elif self.text_gen == "openai": | |
| self.log("Using OpenAI for text generation") | |
| openai_api_key = os.environ.get("OPENAI_API_KEY", "") | |
| if not openai_api_key: | |
| raise ValueError("OpenAI API key is not set. Please provide a valid API key.") | |
| from openai import OpenAI | |
| client = OpenAI(api_key=openai_api_key) | |
| model_to_use = model if model else "gpt-3.5-turbo" | |
| response = client.chat.completions.create( | |
| model=model_to_use, | |
| messages=[{"role": "user", "content": prompt}] | |
| ).choices[0].message.content | |
| else: | |
| # Default to g4f if other methods aren't available | |
| self.log(f"Using default G4F model as fallback") | |
| import g4f | |
| response = g4f.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| self.log(f"Response generated successfully, length: {len(response)} characters") | |
| return response | |
| except Exception as e: | |
| error_msg = f"Error generating response: {str(e)}" | |
| self.log(error_msg) | |
| raise Exception(error_msg) | |
| def generate_topic(self) -> str: | |
| """Generate a topic based on the YouTube Channel niche.""" | |
| self.progress(0.05, desc="Generating topic") | |
| self.log("Generating topic based on niche") | |
| completion = self.generate_response( | |
| f"Please generate a specific video idea that takes about the following topic: {self.niche}. " | |
| f"Make it exactly one sentence. Only return the topic, nothing else." | |
| ) | |
| if not completion: | |
| self.log(error("Failed to generate Topic.")) | |
| raise Exception("Failed to generate a topic. Please try again with a different niche.") | |
| self.subject = completion | |
| self.log(success(f"Generated topic: {completion}")) | |
| return completion | |
| def generate_script(self) -> str: | |
| """Generate a script for a video, based on the subject and language.""" | |
| self.progress(0.1, desc="Creating script") | |
| self.log("Generating script for video") | |
| prompt = f""" | |
| Generate a script for youtube shorts video, depending on the subject of the video. | |
| The script is to be returned as a string with the specified number of paragraphs. | |
| Here is an example of a string: | |
| "This is an example string." | |
| Do not under any circumstance reference this prompt in your response. | |
| Get straight to the point, don't start with unnecessary things like, "welcome to this video". | |
| Obviously, the script should be related to the subject of the video. | |
| YOU MUST NOT INCLUDE ANY TYPE OF MARKDOWN OR FORMATTING IN THE SCRIPT, NEVER USE A TITLE. | |
| YOU MUST WRITE THE SCRIPT IN THE LANGUAGE SPECIFIED IN [LANGUAGE]. | |
| ONLY RETURN THE RAW CONTENT OF THE SCRIPT. DO NOT INCLUDE "VOICEOVER", "NARRATOR" OR SIMILAR INDICATORS. | |
| Subject: {self.subject} | |
| Language: {self.language} | |
| """ | |
| completion = self.generate_response(prompt) | |
| # Apply regex to remove * | |
| completion = re.sub(r"\*", "", completion) | |
| if not completion: | |
| self.log(error("The generated script is empty.")) | |
| raise Exception("Failed to generate a script. Please try again.") | |
| if len(completion) > 5000: | |
| self.log(warning("Generated Script is too long. Retrying...")) | |
| return self.generate_script() | |
| self.script = completion | |
| self.log(success(f"Generated script ({len(completion)} chars)")) | |
| return completion | |
| def generate_metadata(self) -> dict: | |
| """Generate video metadata (title, description).""" | |
| self.progress(0.15, desc="Creating title and description") | |
| self.log("Generating metadata (title and description)") | |
| title = self.generate_response( | |
| f"Please generate a YouTube Video Title for the following subject, including hashtags: " | |
| f"{self.subject}. Only return the title, nothing else. Limit the title under 100 characters." | |
| ) | |
| if len(title) > 100: | |
| self.log(warning("Generated Title is too long. Retrying...")) | |
| return self.generate_metadata() | |
| description = self.generate_response( | |
| f"Please generate a YouTube Video Description for the following script: {self.script}. " | |
| f"Only return the description, nothing else." | |
| ) | |
| self.metadata = { | |
| "title": title, | |
| "description": description | |
| } | |
| self.log(success(f"Generated title: {title}")) | |
| self.log(success(f"Generated description: {description[:50]}...")) | |
| return self.metadata | |
| def generate_prompts(self, count=5) -> list: | |
| """Generate AI Image Prompts based on the provided Video Script.""" | |
| self.progress(0.2, desc="Creating image prompts") | |
| self.log(f"Generating {count} image prompts") | |
| prompt = f""" | |
| Generate {count} Image Prompts for AI Image Generation, | |
| depending on the subject of a video. | |
| Subject: {self.subject} | |
| The image prompts are to be returned as | |
| a JSON-Array of strings. | |
| Each search term should consist of a full sentence, | |
| always add the main subject of the video. | |
| Be emotional and use interesting adjectives to make the | |
| Image Prompt as detailed as possible. | |
| YOU MUST ONLY RETURN THE JSON-ARRAY OF STRINGS. | |
| YOU MUST NOT RETURN ANYTHING ELSE. | |
| YOU MUST NOT RETURN THE SCRIPT. | |
| The search terms must be related to the subject of the video. | |
| Here is an example of a JSON-Array of strings: | |
| ["image prompt 1", "image prompt 2", "image prompt 3"] | |
| For context, here is the full text: | |
| {self.script} | |
| """ | |
| completion = str(self.generate_response(prompt))\ | |
| .replace("```json", "") \ | |
| .replace("```", "") | |
| image_prompts = [] | |
| if "image_prompts" in completion: | |
| try: | |
| image_prompts = json.loads(completion)["image_prompts"] | |
| except: | |
| self.log(warning("Failed to parse 'image_prompts' from JSON response.")) | |
| if not image_prompts: | |
| try: | |
| image_prompts = json.loads(completion) | |
| self.log(f"Parsed image prompts from JSON response.") | |
| except Exception: | |
| self.log(warning("JSON parsing failed. Attempting to extract array using regex...")) | |
| # Get everything between [ and ], and turn it into a list | |
| r = re.compile(r"\[.*\]", re.DOTALL) | |
| matches = r.findall(completion) | |
| if len(matches) == 0: | |
| self.log(warning("Failed to extract array. Creating generic image prompts.")) | |
| # Create generic prompts based on the subject | |
| image_prompts = [ | |
| f"A beautiful image showing {self.subject}, photorealistic", | |
| f"A detailed visualization of {self.subject}, high quality", | |
| f"An artistic representation of {self.subject}, vibrant colors", | |
| f"A photorealistic image about {self.subject}, high resolution", | |
| f"A dramatic scene related to {self.subject}, cinema quality" | |
| ] | |
| else: | |
| try: | |
| image_prompts = json.loads(matches[0]) | |
| except: | |
| self.log(error("Failed to parse array from regex match.")) | |
| # Use regex to extract individual strings | |
| string_pattern = r'"([^"]*)"' | |
| strings = re.findall(string_pattern, matches[0]) | |
| if strings: | |
| image_prompts = strings | |
| else: | |
| # Last resort - split by commas and clean up | |
| image_prompts = [ | |
| s.strip().strip('"').strip("'") | |
| for s in matches[0].strip('[]').split(',') | |
| ] | |
| # Ensure we have the requested number of prompts | |
| while len(image_prompts) < count: | |
| image_prompts.append(f"A high-quality image about {self.subject}") | |
| # Limit to the requested count | |
| image_prompts = image_prompts[:count] | |
| self.image_prompts = image_prompts | |
| self.log(success(f"Generated {len(self.image_prompts)} Image Prompts")) | |
| for i, prompt in enumerate(self.image_prompts): | |
| self.log(f"Image Prompt {i+1}: {prompt}") | |
| return image_prompts | |
| def generate_image(self, prompt) -> str: | |
| """Generate an image using the selected image generation model.""" | |
| self.log(f"Generating image for prompt: {prompt[:50]}...") | |
| try: | |
| image_path = os.path.join(CACHE_DIR, f"img_{len(self.images)}_{int(time.time())}.png") | |
| if self.image_gen == "prodia": | |
| self.log("Using Prodia provider for image generation") | |
| s = requests.Session() | |
| headers = { | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
| } | |
| # Generate job | |
| self.log("Sending generation request to Prodia API") | |
| resp = s.get( | |
| "https://api.prodia.com/generate", | |
| params={ | |
| "new": "true", | |
| "prompt": prompt, | |
| "model": self.image_model, | |
| "negative_prompt": "verybadimagenegative_v1.3", | |
| "steps": "20", | |
| "cfg": "7", | |
| "seed": random.randint(1, 10000), | |
| "sample": "DPM++ 2M Karras", | |
| "aspect_ratio": "square" | |
| }, | |
| headers=headers | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"Prodia API error: {resp.text}") | |
| job_id = resp.json()['job'] | |
| self.log(f"Job created with ID: {job_id}") | |
| # Wait for generation to complete | |
| max_attempts = 30 | |
| attempts = 0 | |
| while attempts < max_attempts: | |
| attempts += 1 | |
| time.sleep(2) | |
| status = s.get(f"https://api.prodia.com/job/{job_id}", headers=headers).json() | |
| if status["status"] == "succeeded": | |
| self.log("Image generation successful, downloading result") | |
| img_data = s.get(f"https://images.prodia.xyz/{job_id}.png?download=1", headers=headers).content | |
| with open(image_path, "wb") as f: | |
| f.write(img_data) | |
| self.images.append(image_path) | |
| self.log(success(f"Image saved to: {image_path}")) | |
| return image_path | |
| elif status["status"] == "failed": | |
| raise Exception(f"Prodia job failed: {status.get('error', 'Unknown error')}") | |
| # Still processing | |
| self.log(f"Still processing, attempt {attempts}/{max_attempts}...") | |
| raise Exception("Prodia job timed out") | |
| elif self.image_gen == "hercai": | |
| self.log("Using Hercai provider for image generation") | |
| url = f"https://hercai.onrender.com/{self.image_model}/text2image?prompt={prompt}" | |
| r = requests.get(url) | |
| if r.status_code != 200: | |
| raise Exception(f"Hercai API error: {r.text}") | |
| parsed = r.json() | |
| if "url" in parsed and parsed["url"]: | |
| self.log("Image URL received from Hercai") | |
| image_url = parsed["url"] | |
| img_data = requests.get(image_url).content | |
| with open(image_path, "wb") as f: | |
| f.write(img_data) | |
| self.images.append(image_path) | |
| self.log(success(f"Image saved to: {image_path}")) | |
| return image_path | |
| else: | |
| raise Exception("No image URL in Hercai response") | |
| elif self.image_gen == "g4f": | |
| self.log("Using G4F provider for image generation") | |
| try: | |
| from g4f.client import Client | |
| client = Client() | |
| response = client.images.generate( | |
| model=self.image_model, | |
| prompt=prompt, | |
| response_format="url" | |
| ) | |
| if response and response.data and len(response.data) > 0: | |
| image_url = response.data[0].url | |
| image_response = requests.get(image_url) | |
| if image_response.status_code == 200: | |
| with open(image_path, "wb") as f: | |
| f.write(image_response.content) | |
| self.images.append(image_path) | |
| self.log(success(f"Image saved to: {image_path}")) | |
| return image_path | |
| else: | |
| raise Exception(f"Failed to download image from {image_url}") | |
| else: | |
| raise Exception("No image URL received from G4F") | |
| except Exception as e: | |
| raise Exception(f"G4F image generation failed: {str(e)}") | |
| elif self.image_gen == "segmind": | |
| self.log("Using Segmind provider for image generation") | |
| api_key = os.environ.get("SEGMIND_API_KEY", "") | |
| if not api_key: | |
| raise ValueError("Segmind API key is not set. Please provide a valid API key.") | |
| headers = { | |
| "x-api-key": api_key, | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.post( | |
| "https://api.segmind.com/v1/sdxl-turbo", | |
| json={ | |
| "prompt": prompt, | |
| "negative_prompt": "blurry, low quality, distorted face, text, watermark", | |
| "samples": 1, | |
| "size": "1024x1024", | |
| "guidance_scale": 1.0 | |
| }, | |
| headers=headers | |
| ) | |
| if response.status_code == 200: | |
| with open(image_path, "wb") as f: | |
| f.write(response.content) | |
| self.images.append(image_path) | |
| self.log(success(f"Image saved to: {image_path}")) | |
| return image_path | |
| else: | |
| raise Exception(f"Segmind request failed: {response.status_code} {response.text}") | |
| elif self.image_gen == "pollinations": | |
| self.log("Using Pollinations provider for image generation") | |
| response = requests.get(f"https://image.pollinations.ai/prompt/{prompt}{random.randint(1,10000)}") | |
| if response.status_code == 200: | |
| self.log("Image received from Pollinations") | |
| with open(image_path, "wb") as f: | |
| f.write(response.content) | |
| self.images.append(image_path) | |
| self.log(success(f"Image saved to: {image_path}")) | |
| return image_path | |
| else: | |
| raise Exception(f"Pollinations request failed with status code: {response.status_code}") | |
| else: | |
| # Default to generating a colored placeholder image | |
| self.log(f"Unknown provider '{self.image_gen}'. Generating placeholder image.") | |
| # Create a placeholder colored image with the prompt text | |
| img = Image.new('RGB', (800, 800), color=(random.randint(0, 255), | |
| random.randint(0, 255), | |
| random.randint(0, 255))) | |
| img.save(image_path) | |
| self.images.append(image_path) | |
| self.log(warning(f"Created placeholder image at: {image_path}")) | |
| return image_path | |
| except Exception as e: | |
| error_msg = f"Image generation failed: {str(e)}" | |
| self.log(error(error_msg)) | |
| # Create a fallback image | |
| try: | |
| img = Image.new('RGB', (800, 800), color=(200, 200, 200)) | |
| image_path = os.path.join(CACHE_DIR, f"error_img_{len(self.images)}_{int(time.time())}.png") | |
| img.save(image_path) | |
| self.images.append(image_path) | |
| self.log(warning(f"Created error placeholder image at: {image_path}")) | |
| return image_path | |
| except: | |
| # If all else fails, return None and handle it gracefully | |
| return None | |
| def generate_speech(self, text, output_format='mp3') -> str: | |
| """Generate speech from text using the selected TTS engine.""" | |
| self.progress(0.6, desc="Creating voiceover") | |
| self.log("Generating speech from text") | |
| # Clean text | |
| text = re.sub(r'[^\w\s.?!,;:\'"-]', '', text) | |
| self.log(f"Using TTS Engine: {self.tts_engine}, Voice: {self.tts_voice}") | |
| audio_path = os.path.join(CACHE_DIR, f"speech_{int(time.time())}.{output_format}") | |
| try: | |
| if self.tts_engine == "elevenlabs": | |
| self.log("Using ElevenLabs provider for speech generation") | |
| elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY", "") | |
| if not elevenlabs_api_key: | |
| raise ValueError("ElevenLabs API key is not set. Please provide a valid API key.") | |
| headers = { | |
| "Accept": "audio/mpeg", | |
| "Content-Type": "application/json", | |
| "xi-api-key": elevenlabs_api_key | |
| } | |
| payload = { | |
| "text": text, | |
| "model_id": "eleven_monolingual_v1", | |
| "voice_settings": { | |
| "stability": 0.5, | |
| "similarity_boost": 0.5, | |
| "style": 0.0, | |
| "use_speaker_boost": True | |
| } | |
| } | |
| voice_id = self.tts_voice if self.tts_voice not in ["Sarah", "default"] else "21m00Tcm4TlvDq8ikWAM" | |
| response = requests.post( | |
| url=f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}", | |
| json=payload, | |
| headers=headers | |
| ) | |
| if response.status_code == 200: | |
| with open(audio_path, 'wb') as f: | |
| f.write(response.content) | |
| self.log(success(f"Speech generated successfully using ElevenLabs at {audio_path}")) | |
| else: | |
| raise Exception(f"ElevenLabs API error: {response.text}") | |
| elif self.tts_engine == "gtts": | |
| self.log("Using Google TTS provider for speech generation") | |
| from gtts import gTTS | |
| tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) | |
| tts.save(audio_path) | |
| elif self.tts_engine == "openai": | |
| self.log("Using OpenAI provider for speech generation") | |
| openai_api_key = os.environ.get("OPENAI_API_KEY", "") | |
| if not openai_api_key: | |
| raise ValueError("OpenAI API key is not set. Please provide a valid API key.") | |
| from openai import OpenAI | |
| client = OpenAI(api_key=openai_api_key) | |
| voice = self.tts_voice if self.tts_voice else "alloy" | |
| response = client.audio.speech.create( | |
| model="tts-1", | |
| voice=voice, | |
| input=text | |
| ) | |
| response.stream_to_file(audio_path) | |
| elif self.tts_engine == "edge": | |
| self.log("Using Edge TTS provider for speech generation") | |
| import edge_tts | |
| import asyncio | |
| voice = self.tts_voice if self.tts_voice else "en-US-AriaNeural" | |
| async def generate(): | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(audio_path) | |
| asyncio.run(generate()) | |
| else: | |
| # Fallback to gtts | |
| self.log(f"Unknown TTS engine '{self.tts_engine}'. Falling back to gTTS.") | |
| from gtts import gTTS | |
| tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) | |
| tts.save(audio_path) | |
| self.log(success(f"Speech generated and saved to: {audio_path}")) | |
| self.tts_path = audio_path | |
| return audio_path | |
| except Exception as e: | |
| error_msg = f"Speech generation failed: {str(e)}" | |
| self.log(error(error_msg)) | |
| # Create a silent audio file as fallback | |
| try: | |
| from pydub import AudioSegment | |
| from pydub.generators import Sine | |
| # Generate 30 seconds of silence | |
| silence = AudioSegment.silent(duration=30000) | |
| silence.export(audio_path, format=output_format) | |
| self.log(warning(f"Created silent audio fallback at: {audio_path}")) | |
| self.tts_path = audio_path | |
| return audio_path | |
| except: | |
| self.log(error("Failed to create silent audio fallback")) | |
| return None | |
| def generate_subtitles(self, audio_path): | |
| """Generate word-level subtitles for the video.""" | |
| if not self.subtitles_enabled: | |
| self.log("Subtitles are disabled. Skipping subtitle generation.") | |
| return None | |
| self.progress(0.65, desc="Creating subtitles") | |
| self.log("Starting subtitle generation process") | |
| try: | |
| assemblyai_api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") | |
| if not assemblyai_api_key: | |
| self.log(warning("AssemblyAI API key not set. Generating simulated subtitles.")) | |
| return self._generate_simulated_subtitles() | |
| import assemblyai as aai | |
| aai.settings.api_key = assemblyai_api_key | |
| config = aai.TranscriptionConfig(speaker_labels=False, word_boost=[], format_text=True) | |
| transcriber = aai.Transcriber(config=config) | |
| self.log("Submitting audio for transcription") | |
| transcript = transcriber.transcribe(audio_path) | |
| if not transcript or not transcript.words: | |
| self.log(warning("Transcription returned no words. Using simulated subtitles.")) | |
| return self._generate_simulated_subtitles() | |
| # Process word-level information | |
| wordlevel_info = [] | |
| for word in transcript.words: | |
| word_data = { | |
| "word": word.text.strip(), | |
| "start": word.start / 1000.0, | |
| "end": word.end / 1000.0 | |
| } | |
| wordlevel_info.append(word_data) | |
| self.log(success(f"Transcription successful. Got {len(wordlevel_info)} words.")) | |
| # Define constants for subtitle generation | |
| FONT = self.subtitle_font | |
| FONTSIZE = self.font_size | |
| COLOR = self.text_color | |
| BG_COLOR = self.highlight_color if self.highlighting_enabled else None | |
| FRAME_SIZE = (1080, 1920) | |
| MAX_CHARS = 30 | |
| MAX_DURATION = 3.0 | |
| MAX_GAP = 2.5 | |
| # Split text into lines based on character count, duration, and gap | |
| subtitles = [] | |
| line = [] | |
| line_duration = 0 | |
| for idx, word_data in enumerate(wordlevel_info): | |
| line.append(word_data) | |
| line_duration += word_data["end"] - word_data["start"] | |
| temp = " ".join(item["word"] for item in line) | |
| new_line_chars = len(temp) | |
| duration_exceeded = line_duration > MAX_DURATION | |
| chars_exceeded = new_line_chars > MAX_CHARS | |
| if idx > 0: | |
| gap = word_data['start'] - wordlevel_info[idx - 1]['end'] | |
| maxgap_exceeded = gap > MAX_GAP | |
| else: | |
| maxgap_exceeded = False | |
| # Check if any condition is exceeded to finalize the current line | |
| if duration_exceeded or chars_exceeded or maxgap_exceeded: | |
| if line: | |
| subtitle_line = { | |
| "text": " ".join(item["word"] for item in line), | |
| "start": line[0]["start"], | |
| "end": line[-1]["end"], | |
| "words": line | |
| } | |
| subtitles.append(subtitle_line) | |
| line = [] | |
| line_duration = 0 | |
| # Add the remaining words as the last subtitle line if any | |
| if line: | |
| subtitle_line = { | |
| "text": " ".join(item["word"] for item in line), | |
| "start": line[0]["start"], | |
| "end": line[-1]["end"], | |
| "words": line | |
| } | |
| subtitles.append(subtitle_line) | |
| self.log(success(f"Generated {len(subtitles)} subtitle lines")) | |
| return { | |
| "wordlevel": wordlevel_info, | |
| "linelevel": subtitles, | |
| "settings": { | |
| "font": FONT, | |
| "fontsize": FONTSIZE, | |
| "color": COLOR, | |
| "bg_color": BG_COLOR, | |
| "position": self.subtitle_position, | |
| "highlighting_enabled": self.highlighting_enabled | |
| } | |
| } | |
| except Exception as e: | |
| error_msg = f"Subtitle generation failed: {str(e)}" | |
| self.log(error(error_msg)) | |
| return self._generate_simulated_subtitles() | |
| def _generate_simulated_subtitles(self): | |
| """Generate simulated subtitles when AssemblyAI is not available.""" | |
| self.log("Generating simulated subtitles") | |
| # Split script into words | |
| words = self.script.split() | |
| # Estimate audio duration based on word count (average speaking rate) | |
| estimated_duration = len(words) * 0.3 # 0.3 seconds per word on average | |
| # Generate word-level timings | |
| wordlevel_info = [] | |
| current_time = 0 | |
| for word in words: | |
| # Adjust duration based on word length | |
| word_duration = 0.2 + min(0.05 * len(word), 0.3) # Between 0.2 and 0.5 seconds | |
| word_data = { | |
| "word": word, | |
| "start": current_time, | |
| "end": current_time + word_duration | |
| } | |
| wordlevel_info.append(word_data) | |
| # Add a small gap between words | |
| current_time += word_duration + 0.05 | |
| # Generate line-level subtitles | |
| subtitles = [] | |
| line = [] | |
| line_start = 0 | |
| line_text = "" | |
| for word_data in wordlevel_info: | |
| # Check if adding this word would exceed character limit | |
| if len(line_text + " " + word_data["word"]) > 30 and line: | |
| # Finalize current line | |
| subtitle_line = { | |
| "text": line_text, | |
| "start": line_start, | |
| "end": line[-1]["end"], | |
| "words": line.copy() | |
| } | |
| subtitles.append(subtitle_line) | |
| # Start new line | |
| line = [word_data] | |
| line_start = word_data["start"] | |
| line_text = word_data["word"] | |
| else: | |
| # Add word to current line | |
| line.append(word_data) | |
| line_text = (line_text + " " + word_data["word"]).strip() | |
| if len(line) == 1: | |
| line_start = word_data["start"] | |
| # Add final line if not empty | |
| if line: | |
| subtitle_line = { | |
| "text": line_text, | |
| "start": line_start, | |
| "end": line[-1]["end"], | |
| "words": line | |
| } | |
| subtitles.append(subtitle_line) | |
| self.log(success(f"Generated {len(wordlevel_info)} simulated word timings and {len(subtitles)} subtitle lines")) | |
| # Define settings for subtitle display | |
| settings = { | |
| "font": self.subtitle_font, | |
| "fontsize": self.font_size, | |
| "color": self.text_color, | |
| "bg_color": self.highlight_color if self.highlighting_enabled else None, | |
| "position": self.subtitle_position, | |
| "highlighting_enabled": self.highlighting_enabled | |
| } | |
| return { | |
| "wordlevel": wordlevel_info, | |
| "linelevel": subtitles, | |
| "settings": settings | |
| } | |
| def combine(self) -> str: | |
| """Combine images, audio, and subtitles into a final video.""" | |
| self.progress(0.8, desc="Creating final video") | |
| self.log("Combining images and audio into final video") | |
| try: | |
| output_path = os.path.join(CACHE_DIR, f"output_{int(time.time())}.mp4") | |
| # Check for required files | |
| if not self.images: | |
| raise ValueError("No images available for video creation") | |
| if not hasattr(self, 'tts_path') or not self.tts_path or not os.path.exists(self.tts_path): | |
| raise ValueError("No TTS audio file available") | |
| # Load audio | |
| tts_clip = AudioFileClip(self.tts_path) | |
| max_duration = tts_clip.duration | |
| # Calculate duration for each image | |
| num_images = len(self.images) | |
| req_dur = max_duration / num_images | |
| # Create video clips from images | |
| clips = [] | |
| tot_dur = 0 | |
| # Loop through images, repeating if necessary to fill audio duration | |
| while tot_dur < max_duration: | |
| for image_path in self.images: | |
| # Check if image exists and is valid | |
| if not os.path.exists(image_path): | |
| self.log(warning(f"Image not found: {image_path}, skipping")) | |
| continue | |
| try: | |
| clip = ImageClip(image_path) | |
| clip = clip.set_duration(req_dur) | |
| clip = clip.set_fps(30) | |
| # Handle aspect ratio (vertical video for shorts) | |
| aspect_ratio = 9/16 # Standard vertical video ratio | |
| if clip.w / clip.h < aspect_ratio: | |
| # Image is too tall, crop height | |
| clip = crop( | |
| clip, | |
| width=clip.w, | |
| height=round(clip.w / aspect_ratio), | |
| x_center=clip.w / 2, | |
| y_center=clip.h / 2 | |
| ) | |
| else: | |
| # Image is too wide, crop width | |
| clip = crop( | |
| clip, | |
| width=round(aspect_ratio * clip.h), | |
| height=clip.h, | |
| x_center=clip.w / 2, | |
| y_center=clip.h / 2 | |
| ) | |
| # Resize to standard size for shorts | |
| clip = clip.resize((1080, 1920)) | |
| clips.append(clip) | |
| tot_dur += clip.duration | |
| # If we've exceeded the duration, break | |
| if tot_dur >= max_duration: | |
| break | |
| except Exception as e: | |
| self.log(warning(f"Error processing image {image_path}: {str(e)}")) | |
| # Create video from clips | |
| self.log(f"Creating video from {len(clips)} clips") | |
| final_clip = concatenate_videoclips(clips) | |
| final_clip = final_clip.set_fps(30) | |
| # Add background music if available | |
| music_path = None | |
| if self.music_file == "random": | |
| music_path = choose_random_music() | |
| elif self.music_file != "none" and os.path.exists(os.path.join(MUSIC_DIR, self.music_file)): | |
| music_path = os.path.join(MUSIC_DIR, self.music_file) | |
| if music_path and os.path.exists(music_path): | |
| self.log(f"Adding background music: {music_path}") | |
| try: | |
| music_clip = AudioFileClip(music_path) | |
| # Loop music if it's shorter than the video | |
| if music_clip.duration < max_duration: | |
| repeats = int(max_duration / music_clip.duration) + 1 | |
| music_clip = concatenate_audioclips([music_clip] * repeats) | |
| # Trim if it's longer | |
| music_clip = music_clip.subclip(0, max_duration) | |
| # Reduce volume | |
| music_clip = music_clip.fx(volumex, 0.1) | |
| # Combine audio tracks | |
| comp_audio = CompositeAudioClip([tts_clip, music_clip]) | |
| final_clip = final_clip.set_audio(comp_audio) | |
| except Exception as e: | |
| self.log(warning(f"Error adding background music: {str(e)}")) | |
| final_clip = final_clip.set_audio(tts_clip) | |
| else: | |
| self.log("No background music found, using TTS audio only") | |
| final_clip = final_clip.set_audio(tts_clip) | |
| # Set final duration | |
| final_clip = final_clip.set_duration(tts_clip.duration) | |
| # Generate subtitles if enabled | |
| subtitle_clips = [] | |
| if self.subtitles_enabled: | |
| subtitles = self.generate_subtitles(self.tts_path) | |
| if subtitles and 'wordlevel' in subtitles: | |
| self.log("Adding word-level subtitles") | |
| from moviepy.video.tools.subtitles import TextClip | |
| # Define subtitle styles | |
| font = subtitles['settings']['font'] if subtitles['settings']['font'] != "default" and os.path.exists(os.path.join(FONTS_DIR, f"{subtitles['settings']['font']}.ttf")) else None | |
| fontsize = subtitles['settings']['fontsize'] | |
| color = subtitles['settings']['color'] | |
| bg_color = subtitles['settings']['bg_color'] if subtitles['settings']['highlighting_enabled'] else None | |
| # Calculate position based on subtitle_position setting | |
| frame_width, frame_height = 1080, 1920 | |
| if self.subtitle_position == "top": | |
| y_pos = frame_height * 0.1 # Position at 10% from top | |
| elif self.subtitle_position == "middle": | |
| y_pos = frame_height * 0.5 # Position at middle | |
| else: # bottom (default) | |
| y_pos = frame_height * 0.85 # Position at 85% from top | |
| for subtitle in subtitles['linelevel']: | |
| full_duration = subtitle['end'] - subtitle['start'] | |
| # Initialize position for each subtitle line | |
| x_pos = 0 | |
| x_buffer = frame_width * 1 / 10 | |
| # Handle word-level subtitles if highlighting is enabled | |
| if self.highlighting_enabled: | |
| # Add each word with proper timing and highlighting | |
| for word_data in subtitle['words']: | |
| word = word_data['word'] | |
| start = word_data['start'] | |
| end = word_data['end'] | |
| # Create text clip for word | |
| try: | |
| word_clip = TextClip( | |
| txt=word, | |
| font=font, | |
| fontsize=fontsize, | |
| color=color, | |
| bg_color=bg_color, | |
| stroke_color='black', | |
| stroke_width=1 | |
| ).set_position((x_pos + x_buffer, y_pos)).set_start(start).set_duration(end - start) | |
| subtitle_clips.append(word_clip) | |
| x_pos += word_clip.w + 10 # Add spacing between words | |
| # Wrap to next line if needed | |
| if x_pos + word_clip.w > frame_width - 2 * x_buffer: | |
| x_pos = 0 | |
| y_pos += word_clip.h + 10 | |
| except Exception as e: | |
| self.log(warning(f"Error creating subtitle for word '{word}': {str(e)}")) | |
| else: | |
| # Show entire line without word-level highlighting | |
| try: | |
| line_clip = TextClip( | |
| txt=subtitle['text'], | |
| font=font, | |
| fontsize=fontsize, | |
| color=color, | |
| bg_color=None, | |
| stroke_color='black', | |
| stroke_width=1, | |
| method='caption', | |
| size=(frame_width - 2 * x_buffer, None), | |
| align='center' | |
| ).set_position(('center', y_pos)).set_start(subtitle['start']).set_duration(full_duration) | |
| subtitle_clips.append(line_clip) | |
| except Exception as e: | |
| self.log(warning(f"Error creating subtitle line: {str(e)}")) | |
| # Add subtitles to video if any were created | |
| if subtitle_clips: | |
| self.log(f"Adding {len(subtitle_clips)} subtitle clips to video") | |
| final_clip = CompositeVideoClip([final_clip] + subtitle_clips) | |
| # Write final video | |
| self.log("Writing final video file") | |
| final_clip.write_videofile(output_path, threads=4, codec='libx264', audio_codec='aac') | |
| success_msg = f"Video successfully created at: {output_path}" | |
| self.log(success(success_msg)) | |
| self.video_path = output_path | |
| return output_path | |
| except Exception as e: | |
| error_msg = f"Error combining video: {str(e)}" | |
| self.log(error(error_msg)) | |
| # Create a minimal fallback video if possible | |
| try: | |
| # Try to create a simple video with just the first image and audio | |
| fallback_path = os.path.join(CACHE_DIR, f"fallback_{int(time.time())}.mp4") | |
| if self.images and os.path.exists(self.images[0]) and hasattr(self, 'tts_path') and os.path.exists(self.tts_path): | |
| img_clip = ImageClip(self.images[0]).set_duration(10) | |
| img_clip = img_clip.resize((1080, 1920)) | |
| audio_clip = AudioFileClip(self.tts_path).subclip(0, min(10, AudioFileClip(self.tts_path).duration)) | |
| video_clip = img_clip.set_audio(audio_clip) | |
| video_clip.write_videofile(fallback_path, threads=2, codec='libx264', audio_codec='aac') | |
| self.log(warning(f"Created fallback video at: {fallback_path}")) | |
| self.video_path = fallback_path | |
| return fallback_path | |
| else: | |
| raise Exception("Cannot create fallback video: missing images or audio") | |
| except Exception as fallback_error: | |
| self.log(error(f"Failed to create fallback video: {str(fallback_error)}")) | |
| return None | |
| def generate_video(self) -> dict: | |
| """Generate complete video with all components.""" | |
| try: | |
| self.log("Starting video generation process") | |
| # Step 1: Generate topic | |
| self.log("Generating topic") | |
| self.generate_topic() | |
| # Step 2: Generate script | |
| self.progress(0.1, desc="Creating script") | |
| self.log("Generating script") | |
| self.generate_script() | |
| # Step 3: Generate metadata | |
| self.progress(0.2, desc="Creating metadata") | |
| self.log("Generating metadata") | |
| self.generate_metadata() | |
| # Step 4: Generate image prompts | |
| self.progress(0.3, desc="Creating image prompts") | |
| self.log("Generating image prompts") | |
| self.generate_prompts() | |
| # Step 5: Generate images | |
| self.progress(0.4, desc="Generating images") | |
| self.log("Generating images") | |
| for i, prompt in enumerate(self.image_prompts, 1): | |
| self.progress(0.4 + 0.2 * (i / len(self.image_prompts)), | |
| desc=f"Generating image {i}/{len(self.image_prompts)}") | |
| self.log(f"Generating image {i}/{len(self.image_prompts)}") | |
| self.generate_image(prompt) | |
| # Step 6: Generate speech | |
| self.progress(0.6, desc="Creating speech") | |
| self.log("Generating speech") | |
| self.generate_speech(self.script) | |
| # Step 7: Combine all elements into final video | |
| self.progress(0.8, desc="Creating final video") | |
| self.log("Combining all elements into final video") | |
| path = self.combine() | |
| self.progress(0.95, desc="Finalizing") | |
| self.log(f"Video generation complete. File saved at: {path}") | |
| # Return the result | |
| return { | |
| 'video_path': path, | |
| 'title': self.metadata['title'], | |
| 'description': self.metadata['description'], | |
| 'subject': self.subject, | |
| 'script': self.script, | |
| 'logs': self.logs | |
| } | |
| except Exception as e: | |
| error_msg = f"Error during video generation: {str(e)}" | |
| self.log(error(error_msg)) | |
| raise Exception(error_msg) | |
| # Data for dynamic dropdowns | |
| def get_text_generator_models(generator): | |
| """Get available models for the selected text generator.""" | |
| models = { | |
| "gemini": [ | |
| "gemini-2.0-flash", | |
| "gemini-2.0-flash-lite", | |
| "gemini-1.5-flash", | |
| "gemini-1.5-flash-8b", | |
| "gemini-1.5-pro" | |
| ], | |
| "g4f": [ | |
| "gpt-4", | |
| "gpt-4o", | |
| "gpt-3.5-turbo", | |
| "llama-3-70b-chat", | |
| "claude-3-opus-20240229", | |
| "claude-3-sonnet-20240229", | |
| "claude-3-haiku-20240307" | |
| ], | |
| "openai": [ | |
| "gpt-4o", | |
| "gpt-4-turbo", | |
| "gpt-3.5-turbo" | |
| ] | |
| } | |
| return models.get(generator, ["default"]) | |
| def get_image_generator_models(generator): | |
| """Get available models for the selected image generator.""" | |
| models = { | |
| "prodia": [ | |
| "sdxl", | |
| "realvisxl", | |
| "juggernaut", | |
| "dreamshaper", | |
| "dalle" | |
| ], | |
| "hercai": [ | |
| "v1", | |
| "v2", | |
| "v3", | |
| "lexica" | |
| ], | |
| "g4f": [ | |
| "flux", | |
| "dall-e-3", | |
| "dall-e-2", | |
| "midjourney" | |
| ], | |
| "segmind": [ | |
| "sdxl-turbo", | |
| "realistic-vision", | |
| "sd3" | |
| ], | |
| "pollinations": [ | |
| "default" | |
| ] | |
| } | |
| return models.get(generator, ["default"]) | |
| def get_tts_voices(engine): | |
| """Get available voices for the selected TTS engine.""" | |
| voices = { | |
| "elevenlabs": [ | |
| "Sarah", | |
| "Brian", | |
| "Lily", | |
| "Monika Sogam", | |
| "George", | |
| "River", | |
| "Matilda", | |
| "Will", | |
| "Jessica" | |
| ], | |
| "openai": [ | |
| "alloy", | |
| "echo", | |
| "fable", | |
| "onyx", | |
| "nova", | |
| "shimmer" | |
| ], | |
| "edge": [ | |
| "en-US-AriaNeural", | |
| "en-US-GuyNeural", | |
| "en-GB-SoniaNeural", | |
| "en-AU-NatashaNeural" | |
| ], | |
| "gtts": [ | |
| "en", | |
| "es", | |
| "fr", | |
| "de", | |
| "it", | |
| "pt", | |
| "ru", | |
| "ja", | |
| "zh", | |
| "hi" | |
| ] | |
| } | |
| return voices.get(engine, ["default"]) | |
| # Create the Gradio interface | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo"), title="YouTube Shorts Generator") as demo: | |
| with gr.Row(): | |
| gr.Markdown( | |
| """ | |
| # 📱 YouTube Shorts Generator | |
| Generate engaging YouTube Shorts videos with AI. Just provide a niche and language to get started! | |
| """ | |
| ) | |
| with gr.Row(equal_height=True): | |
| # Left panel: Content Settings | |
| with gr.Column(scale=1, min_width=400): | |
| with gr.Group(): | |
| gr.Markdown("### 📝 Content") | |
| niche = gr.Textbox( | |
| label="Niche/Topic", | |
| placeholder="What's your video about?", | |
| value="Historical Facts" | |
| ) | |
| language = gr.Dropdown( | |
| choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", | |
| "Russian", "Japanese", "Chinese", "Hindi"], | |
| label="Language", | |
| value="English" | |
| ) | |
| # Middle panel: Generator Settings | |
| with gr.Group(): | |
| gr.Markdown("### 🔧 Generator Settings") | |
| with gr.Tabs(): | |
| with gr.TabItem("Text"): | |
| text_gen = gr.Dropdown( | |
| choices=["g4f", "gemini", "openai"], | |
| label="Text Generator", | |
| value="g4f" | |
| ) | |
| text_model = gr.Dropdown( | |
| choices=get_text_generator_models("g4f"), | |
| label="Text Model", | |
| value="gpt-4" | |
| ) | |
| with gr.TabItem("Image"): | |
| image_gen = gr.Dropdown( | |
| choices=["g4f", "prodia", "hercai", "segmind", "pollinations"], | |
| label="Image Generator", | |
| value="g4f" | |
| ) | |
| image_model = gr.Dropdown( | |
| choices=get_image_generator_models("g4f"), | |
| label="Image Model", | |
| value="flux" | |
| ) | |
| with gr.TabItem("Audio"): | |
| tts_engine = gr.Dropdown( | |
| choices=["edge", "elevenlabs", "gtts", "openai"], | |
| label="Speech Engine", | |
| value="edge" | |
| ) | |
| tts_voice = gr.Dropdown( | |
| choices=get_tts_voices("edge"), | |
| label="Voice", | |
| value="en-US-AriaNeural" | |
| ) | |
| music_file = gr.Dropdown( | |
| choices=get_music_files(), | |
| label="Background Music", | |
| value="random" | |
| ) | |
| with gr.TabItem("Subtitles"): | |
| subtitles_enabled = gr.Checkbox(label="Enable Subtitles", value=True) | |
| highlighting_enabled = gr.Checkbox(label="Enable Word Highlighting", value=True) | |
| subtitle_font = gr.Dropdown( | |
| choices=get_font_files(), | |
| label="Font", | |
| value="default" | |
| ) | |
| with gr.Row(): | |
| font_size = gr.Slider( | |
| minimum=40, | |
| maximum=120, | |
| value=80, | |
| step=5, | |
| label="Font Size" | |
| ) | |
| subtitle_position = gr.Dropdown( | |
| choices=["bottom", "middle", "top"], | |
| label="Position", | |
| value="bottom" | |
| ) | |
| with gr.Row(): | |
| text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") | |
| highlight_color = gr.ColorPicker(label="Highlight Color", value="#0000FF") | |
| # API Keys section | |
| with gr.Accordion("🔑 API Keys", open=False): | |
| gemini_api_key = gr.Textbox( | |
| label="Gemini API Key", | |
| type="password", | |
| value=os.environ.get("GEMINI_API_KEY", "") | |
| ) | |
| assemblyai_api_key = gr.Textbox( | |
| label="AssemblyAI API Key", | |
| type="password", | |
| value=os.environ.get("ASSEMBLYAI_API_KEY", "") | |
| ) | |
| elevenlabs_api_key = gr.Textbox( | |
| label="ElevenLabs API Key", | |
| type="password", | |
| value=os.environ.get("ELEVENLABS_API_KEY", "") | |
| ) | |
| segmind_api_key = gr.Textbox( | |
| label="Segmind API Key", | |
| type="password", | |
| value=os.environ.get("SEGMIND_API_KEY", "") | |
| ) | |
| openai_api_key = gr.Textbox( | |
| label="OpenAI API Key", | |
| type="password", | |
| value=os.environ.get("OPENAI_API_KEY", "") | |
| ) | |
| # Generate button | |
| generate_btn = gr.Button("🎬 Generate Video", variant="primary", size="lg") | |
| # Right panel: Output display | |
| with gr.Column(scale=1, min_width=400): | |
| with gr.Tabs(): | |
| with gr.TabItem("Video"): | |
| video_output = gr.Video(label="Generated Video", height=600) | |
| with gr.TabItem("Metadata"): | |
| title_output = gr.Textbox(label="Title", lines=2) | |
| description_output = gr.Textbox(label="Description", lines=4) | |
| script_output = gr.Textbox(label="Script", lines=8) | |
| with gr.TabItem("Log"): | |
| log_output = gr.Textbox(label="Process Log", lines=20, max_lines=100) | |
| # Dynamic dropdown updates | |
| def update_text_models(generator): | |
| return gr.Dropdown(choices=get_text_generator_models(generator)) | |
| def update_image_models(generator): | |
| return gr.Dropdown(choices=get_image_generator_models(generator)) | |
| def update_tts_voices(engine): | |
| return gr.Dropdown(choices=get_tts_voices(engine)) | |
| # Connect the change events | |
| text_gen.change(fn=update_text_models, inputs=text_gen, outputs=text_model) | |
| image_gen.change(fn=update_image_models, inputs=image_gen, outputs=image_model) | |
| tts_engine.change(fn=update_tts_voices, inputs=tts_engine, outputs=tts_voice) | |
| # Main generation function | |
| def generate_youtube_short(niche, language, gemini_api_key, assemblyai_api_key, | |
| elevenlabs_api_key, segmind_api_key, openai_api_key, | |
| text_gen, text_model, image_gen, image_model, | |
| tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, | |
| subtitle_font, font_size, subtitle_position, | |
| text_color, highlight_color, music_file, progress=gr.Progress()): | |
| if not niche.strip(): | |
| return { | |
| video_output: None, | |
| title_output: "ERROR: Please enter a niche/topic", | |
| description_output: "", | |
| script_output: "", | |
| log_output: "Error: Niche/Topic is required. Please enter a valid topic and try again." | |
| } | |
| # Create API keys dictionary | |
| api_keys = { | |
| 'gemini': gemini_api_key, | |
| 'assemblyai': assemblyai_api_key, | |
| 'elevenlabs': elevenlabs_api_key, | |
| 'segmind': segmind_api_key, | |
| 'openai': openai_api_key | |
| } | |
| try: | |
| # Initialize YouTube class | |
| yt = YouTube( | |
| niche=niche, | |
| language=language, | |
| text_gen=text_gen, | |
| text_model=text_model, | |
| image_gen=image_gen, | |
| image_model=image_model, | |
| tts_engine=tts_engine, | |
| tts_voice=tts_voice, | |
| subtitle_font=subtitle_font, | |
| font_size=font_size, | |
| text_color=text_color, | |
| highlight_color=highlight_color, | |
| subtitles_enabled=subtitles_enabled, | |
| highlighting_enabled=highlighting_enabled, | |
| subtitle_position=subtitle_position, | |
| music_file=music_file, | |
| api_keys=api_keys, | |
| progress=progress | |
| ) | |
| # Generate video | |
| result = yt.generate_video() | |
| # Check if video was successfully created | |
| if not result or not result.get('video_path') or not os.path.exists(result.get('video_path', '')): | |
| return { | |
| video_output: None, | |
| title_output: "ERROR: Video generation failed", | |
| description_output: "", | |
| script_output: "", | |
| log_output: "\n".join(yt.logs) | |
| } | |
| return { | |
| video_output: result['video_path'], | |
| title_output: result['title'], | |
| description_output: result['description'], | |
| script_output: result['script'], | |
| log_output: "\n".join(result['logs']) | |
| } | |
| except Exception as e: | |
| import traceback | |
| error_details = f"Error: {str(e)}\n\n{traceback.format_exc()}" | |
| return { | |
| video_output: None, | |
| title_output: f"ERROR: {str(e)}", | |
| description_output: "", | |
| script_output: "", | |
| log_output: error_details | |
| } | |
| # Connect the button click event | |
| generate_btn.click( | |
| fn=generate_youtube_short, | |
| inputs=[ | |
| niche, language, gemini_api_key, assemblyai_api_key, elevenlabs_api_key, | |
| segmind_api_key, openai_api_key, text_gen, text_model, image_gen, image_model, | |
| tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, | |
| subtitle_font, font_size, subtitle_position, text_color, highlight_color, music_file | |
| ], | |
| outputs=[video_output, title_output, description_output, script_output, log_output] | |
| ) | |
| # Add examples | |
| gr.Examples( | |
| [ | |
| ["Historical Facts", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#0000FF", "random"], | |
| ["Cooking Tips", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#FF0000", "random"], | |
| ["Technology News", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-GuyNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#00FF00", "random"], | |
| ], | |
| [niche, language, text_gen, text_model, image_gen, image_model, tts_engine, tts_voice, | |
| subtitles_enabled, highlighting_enabled, subtitle_font, font_size, | |
| subtitle_position, text_color, highlight_color, music_file], | |
| label="Quick Start Templates" | |
| ) | |
| return demo | |
| # Create and launch the interface | |
| if __name__ == "__main__": | |
| # Create necessary directories | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| os.makedirs(MUSIC_DIR, exist_ok=True) | |
| os.makedirs(FONTS_DIR, exist_ok=True) | |
| # Launch the app | |
| demo = create_interface() | |
| demo.launch() |