from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for, send_from_directory from functools import wraps from functools import update_wrapper import os from app_rvc2 import SoniTranslate, TTS_Info from madkoala.language_configuration import LANGUAGES_LIST, LANGUAGES, LANGUAGES_UNIDIRECTIONAL from datetime import datetime import re import shutil import browser_cookie3 import json import time import sys import logging from flask import Response import queue import threading from werkzeug.utils import secure_filename from flask_sqlalchemy import SQLAlchemy from sqlalchemy.orm import DeclarativeBase import requests import signal app = Flask(__name__) # Create TTS_Info instance to get XTTS status tts_info = TTS_Info(piper_enabled=True, xtts_enabled=True) # Adjust these values based on your setup logger = logging.getLogger() logger.info(f"XTTS is {'enabled' if tts_info.xtts_enabled else 'disabled'}") app.secret_key = os.environ.get('SECRET_KEY', 'default_secret_key') APP_PASSWORD = os.environ.get('APP_PASSWORD', 'default_password') YOUTUBE_USERNAME = os.environ.get('YOUTUBE_USERNAME', 'default_ysecret_key') YOUTUBE_PASSWORD = os.environ.get('YOUTUBE_PASSWORD', 'default_ypassword') # Konstanty pro proxy nastavení PROXY_URL = "" # Změňte na vaši proxy PROXY_USER = "" # Změňte na vaše uživatelské jméno PROXY_PASS = "" # Změňte na vaše heslo # Middleware pro kontrolu přihlášení def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'logged_in' not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': if request.form['password'] == APP_PASSWORD: session['logged_in'] = True return redirect(url_for('home')) return render_template('login.html', error='Nesprávné heslo') return render_template('login.html') @app.route('/logout') def logout(): session.pop('logged_in', None) return redirect(url_for('login')) soni = SoniTranslate() # Přidání cesty pro servírování výsledných videí @app.route('/outputs/') def serve_output(filename): return send_file('outputs/' + filename) @app.route('/uploads/') def serve_upload(filename): return send_file('uploads/' + filename) @app.route('/exports/') def serve_export(filename): return send_file('exports/' + filename) @app.route('/') @login_required def home(): return render_template('index_new.html', languages=LANGUAGES_LIST, tts_voices=soni.tts_info.tts_list()) def create_project_folder(project_name): # Vytvoření názvu složky ve formátu DD-MM-YYYY-H:M-project_name current_time = datetime.now() sanitized_name = re.sub(r'[<>:"/\\|?*]', '_', project_name) # Nahrazení neplatných znaků folder_name = current_time.strftime("%d-%m-%Y-%H-%M-") + sanitized_name # Vytvoření složky v exports project_path = os.path.join('exports', folder_name) os.makedirs(project_path, exist_ok=True) return project_path @app.route('/translate', methods=['POST']) @login_required def translate(): try: data = request.form files = request.files # Kontrola manuální korekce manual_correction = data.get('translation_correction', 'off') == 'on' # Vytvoření projektové složky project_name = data.get('project_name', 'untitled') project_path = create_project_folder(project_name) # Handle file upload or YouTube URL media_file = None if 'video' in files and files['video'].filename: # Handle file upload media_filer = files['video'] custom_filename = "video.mp4" upload_paths = os.path.join(project_path, custom_filename) os.makedirs(project_path, exist_ok=True) media_filer.save(upload_paths) media_file = upload_paths print("Video saved to project:", media_file) elif 'downloaded_video_path' in data and data['downloaded_video_path'].strip(): # Použít stažené video z YouTube downloaded_path = data['downloaded_video_path'].strip() if os.path.exists(downloaded_path): media_file = downloaded_path print("Using downloaded YouTube video:", media_file) else: return jsonify({'success': False, 'error': 'Stažené video nebylo nalezeno'}) elif 'url' in data and data['url'].strip(): # Handle YouTube URL url = data['url'].strip() if 'youtube.com' in url or 'youtu.be' in url: # Check if we already have this video downloaded if 'video_path' in session and os.path.exists(session['video_path']): media_file = session['video_path'] print("Using already downloaded video:", media_file) else: try: media_file, thumbnail_url = download_youtube_video(url, project_path ) #PROXY_URL, PROXY_USER, PROXY_PASS # Copy downloaded file to video.mp4 final_path = os.path.join(project_path, "video.mp4") shutil.copy(media_file, final_path) media_file = final_path print("YouTube video downloaded to:", media_file) except Exception as e: return jsonify({'success': False, 'error': f'YouTube download failed: {str(e)}'}) else: return jsonify({'success': False, 'error': 'Invalid YouTube URL'}) # Pokud je zapnuta manuální korekce, uložit cestu k videu do session if manual_correction and media_file: session['current_video_path'] = media_file print(f"Uloženo do session: {media_file}") if not media_file: return jsonify({'success': False, 'error': 'No video file or valid YouTube URL provided'}) # Get parameters source_lang = data.get('source_language', 'Automatic detection') target_lang = data.get('target_language', 'English (en)') max_speakers = int(data.get('max_speakers', 1)) # Get edited subtitles if available edited_subtitles = data.get('edited_subtitles') # If edited subtitles are available, save them to a temporary file subtitle_file = None logger.info(f"edited_subtitles: {edited_subtitles}") if edited_subtitles: logger.info(f"Získány edited_subtitles: {edited_subtitles}") # If edited subtitles are provided, use target language as source language # since the subtitles are already in the target language source_lang = target_lang get_translated_text=False get_video_from_text_json=True text_json=edited_subtitles os.makedirs('uploads', exist_ok=True) subtitle_file = os.path.join('uploads', 'edited_subtitles.srt') with open(subtitle_file, 'w', encoding='utf-8') as f: f.write(edited_subtitles) else: get_translated_text=False get_video_from_text_json=False text_json="" tts_voices = {} for i in range(max_speakers): voice_key = f'tts_voice{i:02d}' if voice_key in data: tts_voices[voice_key] = data[voice_key] # Process the translation result = soni.multilingual_media_conversion( media_file=media_file, link_media="", directory_input="", origin_language=source_lang, target_language=target_lang, max_speakers=max_speakers, get_translated_text=get_translated_text, get_video_from_text_json=get_video_from_text_json, text_json=text_json, max_accelerate_audio=1.0, acceleration_rate_regulation=False, **tts_voices, is_gui=True ) if isinstance(result, list): # Přesun výstupních souborů do projektové složky new_paths = [] for file_path in result: if os.path.exists(file_path): new_path = os.path.join(project_path, os.path.basename(file_path)) shutil.move(file_path, new_path) # Převedení na relativní cestu pro frontend relative_path = os.path.relpath(new_path, 'exports') new_paths.append(f'/exports/{relative_path.replace(os.sep, "/")}') # Převedení originálního videa na relativní cestu original_video_path = None if media_file and os.path.exists(media_file): original_video_path = f'/exports/{os.path.relpath(media_file, "exports").replace(os.sep, "/")}' return jsonify({ 'success': True, 'video': new_paths[0] if new_paths else None, 'original_video': original_video_path, 'files': new_paths }) else: return jsonify({'success': False, 'error': str(result)}) except Exception as e: # Clean up temporary subtitle file in case of error if 'subtitle_file' in locals() and subtitle_file and os.path.exists(subtitle_file): os.remove(subtitle_file) return jsonify({'success': False, 'error': str(e)}) def get_youtube_cookies(): print("Získávám cookies pomocí automatického přihlášení...") logger.info("Získávám cookies pomocí automatického přihlášení...") options = webdriver.ChromeOptions() # Removed headless mode as it often causes issues with Google login options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') options.add_argument('--disable-features=TranslateUI') options.add_argument('--disable-translate') options.add_argument('--lang=en') options.add_argument('--disable-blink-features=AutomationControlled') try: driver_manager = ChromeDriverManager() driver_path = driver_manager.install() service = Service(driver_path) print(f"Používám ChromeDriver z: {driver_path}") logger.info(f"Používám ChromeDriver z: {driver_path}") driver = webdriver.Chrome(service=service, options=options) # Add undetected characteristics driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") wait = WebDriverWait(driver, 30) # Increased timeout print("Přihlašuji se do Google účtu...") logger.info("*YOUTUBE*Přihlašuji se do Google účtu...") driver.get('https://accounts.google.com/signin/v2/identifier?service=youtube') # Wait for and interact with email field email_input = wait.until(EC.presence_of_element_located((By.NAME, "identifier"))) email_input.send_keys("") email_input.send_keys(Keys.RETURN) # Wait for and interact with password field try: password_input = wait.until(EC.presence_of_element_located((By.NAME, "Passwd"))) password_input.send_keys("") password_input.send_keys(Keys.RETURN) except TimeoutException: print("⚠ Timeout při čekání na pole pro heslo. Možná je vyžadováno ruční ověření.") logger.error("⚠ Timeout při čekání na pole pro heslo. Možná je vyžadováno ruční ověření.") driver.quit() return None # Wait for successful login and redirect to YouTube print("*YOUTUBE*Přecházím na YouTube...") logger.info("*YOUTUBE*Přecházím na YouTube...") try: wait.until(lambda driver: "youtube.com" in driver.current_url) except TimeoutException: pass # Continue anyway # Get and save cookies cookies = driver.get_cookies() if not cookies: print("⚠ Nepodařilo se získat cookies. Přihlášení možná selhalo.") logger.error("⚠ Nepodařilo se získat cookies. Přihlášení možná selhalo.") driver.quit() return None # Save cookies to file cookies_dir = "cookies" os.makedirs(cookies_dir, exist_ok=True) cookies_file = os.path.join(cookies_dir, "youtube.txt") with open(cookies_file, 'w', encoding='utf-8') as f: for cookie in cookies: domain = cookie['domain'] flag = "TRUE" path = cookie['path'] secure = str(cookie.get('secure', False)).upper() expires = str(int(cookie.get('expiry', 0)) if cookie.get('expiry') else 0) f.write(f"{domain}\tTRUE\t{path}\t{secure}\t{expires}\t{cookie['name']}\t{cookie['value']}\n") print(f"*YOUTUBE*✓ Cookies úspěšně uloženy do: {cookies_file}") logger.info(f"*YOUTUBE*✓ Cookies úspěšně uloženy do: {cookies_file}") driver.quit() return cookies_file except Exception as e: print(f"⚠ Chyba při získávání cookies: {str(e)}") logger.error(f"⚠ Chyba při získávání cookies: {str(e)}") if 'driver' in locals(): driver.quit() return None def download_youtube_video(url, project_path, proxy=None, proxy_user=None, proxy_pass=None): """Stáhne video z YouTube""" try: print("\nZahajuji stahování videa...") logger.info("*YOUTUBE-zahajeni*Zahajuji stahování videa...") # Prepare the API request api_url = 'https://hound-patient-honestly.ngrok-free.app/download-video' headers = { 'Accept': '*/*', # Accept any content type 'Content-Type': 'application/json' } payload = { 'url': url, 'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5' } print(f"Odesílám požadavek na API: {api_url}") print(f"Payload: {payload}") logger.info(f"*YOUTUBE*Odesílám požadavek na API: {api_url}") logger.info(f"*YOUTUBE*Payload: {payload}") # Make the API request with proper JSON encoding response = requests.post( api_url, headers=headers, json=payload, stream=True ) print(f"Status code: {response.status_code}") print(f"Response headers: {dict(response.headers)}") logger.info(f"*YOUTUBE*Status code: {response.status_code}") logger.info(f"Response headers: {dict(response.headers)}") # First check if the response is JSON (error message) content_type = response.headers.get('Content-Type', '').lower() if 'application/json' in content_type: try: error_data = response.json() error_message = error_data.get('error', 'Unknown API error') raise Exception(f"API returned error: {error_message}") except json.JSONDecodeError: pass # Not JSON, continue with file download if response.status_code != 200: error_message = f"API request failed with status code {response.status_code}" try: error_message += f": {response.text[:200]}" except Exception as e: error_message += f" (Error reading response: {str(e)})" raise Exception(error_message) # Get content type and suggested filename from headers content_type = response.headers.get('Content-Type', '').lower() content_disp = response.headers.get('Content-Disposition', '') print(f"Content-Type: {content_type}") print(f"Content-Disposition: {content_disp}") logger.info(f"Content-Type: {content_type}") logger.info(f"Content-Disposition: {content_disp}") video_path = os.path.join(project_path, "video.mp4") total_size = 0 print("Začínám stahovat video po částech...") logger.info("*YOUTUBE-download*Začínám stahovat video po částech...") with open(video_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: chunk_size = len(chunk) total_size += chunk_size f.write(chunk) print(f"\rStaženo: {total_size / (1024*1024):.2f} MB", end='', flush=True) if total_size % (1024*1024) == 0: # Log every 1MB logger.info(f"*YOUTUBE-download*Staženo: {total_size / (1024*1024):.2f} MB") print("\nKontroluji stažený soubor...") logger.info("Kontroluji stažený soubor...") if os.path.exists(video_path): final_size = os.path.getsize(video_path) print(f"Velikost souboru: {final_size / (1024*1024):.2f} MB") logger.info(f"*YOUTUBE-finish*Velikost souboru: {final_size / (1024*1024):.2f} MB") if final_size > 0: print(f"✓ Video bylo úspěšně staženo do: {video_path}") logger.info(f"*YOUTUBE*✓ Video bylo úspěšně staženo do: {video_path}") return video_path else: os.remove(video_path) raise Exception("Stažený soubor je prázdný") else: raise Exception("Soubor nebyl vytvořen") except Exception as e: print(f"Chyba při stahování videa: {str(e)}") logger.error(f"Chyba při stahování videa: {str(e)}") if 'video_path' in locals() and os.path.exists(video_path): os.remove(video_path) return None @app.route('/download_youtube', methods=['POST']) @login_required def download_yt(): try: # Get data from form submission url = request.form.get('url') project_name = request.form.get('project_name', '') if not url: return jsonify({'success': False, 'error': 'URL není zadána'}) if not any(x in url.lower() for x in ['youtube.com', 'youtu.be']): return jsonify({'success': False, 'error': 'Není platná YouTube URL'}) project_path = create_project_folder(project_name) if not project_path: return jsonify({'success': False, 'error': 'Nepodařilo se vytvořit složku projektu'}) # Get thumbnail from API api_url = 'https://hound-patient-honestly.ngrok-free.app/download-thumbnail' headers = { 'accept': 'application/json', 'Content-Type': 'application/json' } payload = { 'url': url, 'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5' } # Get thumbnail URL with error handling thumbnail_url = None # Initialize as None try: print(f"Sending request to thumbnail API with payload: {payload}") thumbnail_response = requests.post(api_url, headers=headers, json=payload) print(f"Thumbnail API response status: {thumbnail_response.status_code}") print(f"Thumbnail API response content: {thumbnail_response.text}") if thumbnail_response.status_code == 200: try: thumbnail_data = thumbnail_response.json() if isinstance(thumbnail_data, dict): thumbnail_url = thumbnail_data.get('thumbnail_url') if not thumbnail_url: # Extract video ID and use YouTube thumbnail URL directly video_id = None if 'youtube.com/watch?v=' in url: video_id = url.split('watch?v=')[1].split('&')[0] elif 'youtu.be/' in url: video_id = url.split('youtu.be/')[1].split('?')[0] if video_id: thumbnail_url = f'https://img.youtube.com/vi/{video_id}/maxresdefault.jpg' print(f"Final thumbnail URL: {thumbnail_url}") except ValueError as e: print(f"Error parsing thumbnail JSON: {str(e)}") # Fallback to direct YouTube thumbnail if 'youtube.com/watch?v=' in url: video_id = url.split('watch?v=')[1].split('&')[0] thumbnail_url = f'https://img.youtube.com/vi/{video_id}/maxresdefault.jpg' except Exception as e: print(f"Error getting thumbnail: {str(e)}") video_path = download_youtube_video(url, project_path) if video_path and os.path.exists(video_path): session['video_path'] = video_path session['project_path'] = project_path return jsonify({ 'success': True, 'file_path': video_path, 'filename': os.path.basename(video_path), 'thumbnail_url': thumbnail_url }) else: return jsonify({'success': False, 'error': 'Nepodařilo se stáhnout video'}) except Exception as e: print(f"Route error: {str(e)}") return jsonify({'success': False, 'error': f'Chyba: {str(e)}'}) @app.route('/get_subtitles', methods=['POST']) @login_required def get_subtitles(): try: data = request.form files = request.files # Vytvoření projektové složky project_name = data.get('project_name', 'untitled') project_path = create_project_folder(project_name) # Get file or URL or directory media_file = None link_media = "" directory_input = "" # Handle file upload if 'video' in files and files['video'].filename: # Handle file upload media_filer = files['video'] custom_filename = "video.mp4" upload_paths = os.path.join(project_path, custom_filename) os.makedirs(project_path, exist_ok=True) media_filer.save(upload_paths) media_file = upload_paths print("Video saved to project:", media_file) # Also save to uploads folder upload_path = os.path.join('uploads', custom_filename) os.makedirs('uploads', exist_ok=True) shutil.copy(media_file, upload_path) print("Video copied to uploads:", upload_path) # Save video path to session session['video_path'] = media_file # Handle URL elif 'url' in data and data['url'].strip(): link_media = data['url'].strip() print("URL provided:", link_media) session['video_url'] = link_media # Handle directory elif 'directory' in data and data['directory'].strip(): directory_input = data['directory'].strip() print("Directory provided:", directory_input) session['directory_path'] = directory_input # Get parameters source_lang = data.get('source_language', 'Automatic detection') target_lang = data.get('target_language', 'English (en)') # Process to get subtitles result = soni.multilingual_media_conversion( media_file=media_file, link_media=link_media, directory_input=directory_input, origin_language=source_lang, target_language=target_lang, get_translated_text=True, is_gui=True ) # Save subtitles to session if result: session['subtitles'] = result print("Subtitles saved to session") print("Subtitles result:", result) return jsonify({ 'success': True, 'subtitles': result, 'video_path': session.get('video_path'), 'video_url': session.get('video_url'), 'directory_path': session.get('directory_path') }) except Exception as e: print("Error in get_subtitles:", str(e)) return jsonify({'success': False, 'error': str(e)}) @app.route('/edit_subtitles', methods=['POST']) @login_required def edit_subtitles(): try: data = request.json subtitle_text = data.get('subtitle_text', '') # Process the edited subtitles # This would integrate with your existing subtitle processing logic return jsonify({'success': True, 'message': 'Subtitles updated successfully'}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/voice_imitation', methods=['POST']) @login_required def voice_imitation(): try: data = request.form files = request.files voice_imitation_enabled = data.get('voice_imitation', 'false').lower() == 'true' voice_imitation_method = data.get('voice_imitation_method', 'freevc') voice_imitation_max_segments = int(data.get('voice_imitation_max_segments', 3)) voice_imitation_vocals_dereverb = data.get('voice_imitation_vocals_dereverb', 'false').lower() == 'true' voice_imitation_remove_previous = data.get('voice_imitation_remove_previous', 'true').lower() == 'true' return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/get_voice_models', methods=['GET']) @login_required def get_voice_models(): try: method = request.args.get('method', 'RVC') models_dir = 'weights' if not os.path.exists(models_dir): return jsonify({'success': False, 'error': 'Models directory not found'}) # Get all .pth files from the weights directory models = [f for f in os.listdir(models_dir) if f.endswith('.pth')] return jsonify({'success': True, 'models': models}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/subtitle_settings', methods=['POST']) @login_required def subtitle_settings(): try: data = request.form # Get subtitle settings output_format = data.get('subtitle_format', 'srt') soft_subtitles = data.get('soft_subtitles', 'false').lower() == 'true' burn_subtitles = data.get('burn_subtitles', 'false').lower() == 'true' # Get Whisper settings literalize_numbers = data.get('literalize_numbers', 'true').lower() == 'true' vocal_refinement = data.get('vocal_refinement', 'false').lower() == 'true' segment_duration = int(data.get('segment_duration', 15)) whisper_model = data.get('whisper_model', 'large-v3') compute_type = data.get('compute_type', 'float16') batch_size = int(data.get('batch_size', 8)) # Get text segmentation settings text_segmentation = data.get('text_segmentation', 'sentence') divide_text_by = data.get('divide_text_by', '') # Get diarization and translation settings diarization_model = data.get('diarization_model', 'pyannote_2.1') translation_process = data.get('translation_process', 'google_translator_batch') return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/output_settings', methods=['POST']) @login_required def output_settings(): try: data = request.form # Get output settings output_type = data.get('output_type', 'video (mp4)') output_name = data.get('output_name', '') play_sound = data.get('play_sound', 'true').lower() == 'true' enable_cache = data.get('enable_cache', 'true').lower() == 'false' preview = data.get('preview', 'false').lower() == 'true' return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/save_srt', methods=['POST']) @login_required def save_srt(): try: data = request.json srt_content = data.get('srt_content') project_name = data.get('project_name') if not srt_content or not project_name: return jsonify({'success': False, 'error': 'Missing required data'}) video_path = session.get('video_path') # Create project directory with timestamp like in /translate #timestamp = datetime.now().strftime("%d-%m-%Y-%H-%M") #project_path = os.path.join('exports', f"{timestamp}-{project_name}") #os.makedirs(project_path, exist_ok=True) if video_path: # Úprava řetězce pomocí regulárních výrazů #cleaned_path = re.sub(r'^exports\\', '', video_path) # Odstraní 'exports\' na začátku #cleaned_path = re.sub(r'\\video\.mp4$', '', cleaned_path) # Odstraní '\video.mp4' na konci cleaned_path = re.sub(r'^exports[\\/]', '', video_path) cleaned_path = re.sub(r'[\\/]+video\.mp4$', '', cleaned_path) print("===================cleaned") print(cleaned_path) else: print("Hodnota video_path není v session.") project_path = os.path.join('exports', cleaned_path) print("===================project") print(project_path) # Save SRT file in the same directory as video srt_path = os.path.join(project_path, 'titulky.srt') print("===================srt") print(srt_path) with open(srt_path, 'w', encoding='utf-8') as f: f.write(srt_content) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/purge', methods=['POST']) @login_required def purge_folders(): try: # Seznam složek k vymazání folders = ['outputs', 'audio'] for folder in folders: if os.path.exists(folder): # Vymaže obsah složky for filename in os.listdir(folder): file_path = os.path.join(folder, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f'Failed to delete {file_path}. Reason: {e}') return jsonify({'success': True, 'message': 'Složky byly úspěšně vymazány'}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/get_current_video_path', methods=['GET']) @login_required def get_current_video_path(): video_path = session.get('current_video_path') if video_path: return jsonify({ 'success': True, 'video_path': video_path, 'filename': os.path.basename(video_path) }) return jsonify({'success': False, 'error': 'No video path in session'}) @app.route('/get_status') def get_status(): # Check if ngrok server is online try: response = requests.get( 'https://hound-patient-honestly.ngrok-free.app/', params={'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5'}, headers={'accept': 'application/json'}, timeout=5 ) ngrok_online = response.status_code == 200 logger.info(f"Ngrok server status check: {response.status_code}") except Exception as e: logger.error(f"Error checking ngrok server: {str(e)}") ngrok_online = False return jsonify({ 'xtts_enabled': tts_info.xtts_enabled, 'piper_enabled': tts_info.piper_enabled, 'ngrok_server_online': ngrok_online }) #@app.route('/reset_sessions', methods=['POST']) #def reset_sessions(): # session.clear() # return jsonify({'status': 'success'}) # Create a queue for log messages log_queue = queue.Queue() # Create a custom handler that puts messages into the queue class QueueHandler(logging.Handler): def emit(self, record): log_queue.put({ 'level': record.levelname, 'message': self.format(record) }) # Add the queue handler to the logger logger = logging.getLogger() queue_handler = QueueHandler() logger.addHandler(queue_handler) @app.route('/logs') def logs(): def generate(): while True: try: # Get message from queue message = log_queue.get(timeout=20) # 20 second timeout yield f"data: {json.dumps(message)}\n\n" except queue.Empty: # Send keepalive every 20 seconds yield f"data: {json.dumps({'level': 'INFO', 'message': 'keepalive'})}\n\n" return Response(generate(), mimetype='text/event-stream') logging.basicConfig(level=logging.DEBUG) @app.route('/editace', methods=['GET', 'POST']) def edit_page(): app.logger.info("-----------EDITACE-----------") #if request.method == 'POST': # folder_id = request.form.get('project_name') #else: # folder_id = request.args.get('project_name') session['editovano'] = True app.logger.info("Nastavena session hodnota 'editovano' na True") folder_id = request.args.get('folder_id') app.logger.info(f"Request parameters: folder_id={folder_id}") video_path = None subtitle_content = None if folder_id: app.logger.info("Processing request with folder_id") app.logger.info(f"Processing request for folder_id: {folder_id}") current_directory = os.getcwd() three_levels_up = os.path.abspath(os.path.join(current_directory, "..")) base_path = os.path.abspath(os.path.join(three_levels_up, "exports", folder_id)) upload_dir = os.path.join('static', 'uploads') folder_dir = os.path.join(base_path, folder_id) video_file = os.path.join(base_path, 'video.mp4') subtitle_file = os.path.join("exports", folder_id, 'titulky.srt') video_fullfile = os.path.join("exports", folder_id, 'video.mp4') app.logger.info( f"Full subtitle file path: {os.path.abspath(subtitle_file)}" f"Full video file path: {os.path.abspath(video_fullfile)}") # Create directories if they don't exist os.makedirs(base_path, exist_ok=True) # Check file permissions if os.path.exists(video_file): try: with open(video_file, 'rb') as f: app.logger.info("Successfully opened video file") video_path = f'{folder_id}/video.mp4' app.logger.info(f"Set video path to: {video_path}") except Exception as e: app.logger.error(f"Error accessing video file: {e}") else: app.logger.error(f"Video file does not exist at: {video_file}") if os.path.exists(subtitle_file): try: with open(subtitle_file, 'r', encoding='utf-8') as f: subtitle_content = f.read() app.logger.info( f"Successfully read subtitle content {subtitle_content}") except Exception as e: app.logger.error(f"Error reading subtitle file: {e}") else: app.logger.error( f"Subtitle file does not exist at: {subtitle_file}") app.logger.info(f"base path: {base_path}") app.logger.info(f"video file: {video_file}") # Ensure subtitle content is properly escaped and formatted if subtitle_content: # Replace HTML entities with their actual characters import html subtitle_content = html.unescape(subtitle_content.strip()) app.logger.info(f"Sending subtitle content with length: {len(subtitle_content)}") app.logger.debug(f"Subtitle content sample: {subtitle_content[:200]}") return render_template('index.html', video_path=f"/serve_video/{folder_id}", subtitle_content=subtitle_content, languages=LANGUAGES_LIST, tts_voices=soni.tts_info.tts_list()) @app.route('/upload_video', methods=['POST']) def upload_video(): """ Upload a video file either through form-data or raw binary content curl examples: Form-data: curl -X POST -F "video=@/path/to/video.mp4" http://localhost:5000/upload_video Raw binary: curl -X POST --data-binary @/path/to/video.mp4 -H "Content-Type: video/mp4" http://localhost:5000/upload_video?filename=video.mp4 """ upload_dir = os.path.join('static', 'uploads') os.makedirs(upload_dir, exist_ok=True) if request.files and 'video' in request.files: # Handle form-data upload video = request.files['video'] if video.filename == '': return jsonify({'error': 'No selected file'}), 400 filename = secure_filename(video.filename) content = video.read() else: # Handle raw binary upload content = request.get_data() if not content: return jsonify({'error': 'No content received'}), 400 filename = secure_filename(request.args.get('filename', 'video.mp4')) if content: filepath = os.path.join(upload_dir, filename) with open(filepath, 'wb') as f: f.write(content) return jsonify({ 'success': True, 'filename': filename, 'filepath': f'/static/uploads/{filename}' }) return jsonify({'error': 'No valid content received'}), 400 @app.route('/generate_subtitles', methods=['POST']) def generate_subtitles(): """ Generate subtitles from video file using Whisper """ try: import whisper model = whisper.load_model("base") video_file = request.files.get('video') if not video_file: return jsonify({'error': 'No video file provided'}), 400 # Save video temporarily temp_path = os.path.join('static', 'uploads', 'temp_' + secure_filename(video_file.filename)) video_file.save(temp_path) try: # Transcribe the audio result = model.transcribe(temp_path) subtitles = [] # Convert Whisper segments to SRT format for i, segment in enumerate(result["segments"]): subtitle = { 'start': segment['start'], 'end': segment['end'], 'text': segment['text'].strip() } subtitles.append(subtitle) return jsonify({ 'success': True, 'subtitles': subtitles }) finally: # Clean up temporary file if os.path.exists(temp_path): os.remove(temp_path) except Exception as e: app.logger.error(f"Error generating subtitles: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/serve_video/') def serve_video(folder_id): """ Endpoint pro servírování video souboru z adresáře exports. """ try: # Vytvoření cesty k souboru current_directory = os.getcwd() three_levels_up = os.path.abspath(os.path.join(current_directory, "..")) base_path = os.path.abspath(os.path.join("exports", folder_id)) app.logger.error(f"Error serving video file: {base_path}/video.mp4") # Vrátí soubor video.mp4 return send_from_directory(base_path, "video.mp4") except Exception as e: app.logger.error(f"Error serving video file: {e}") return jsonify({'success': False, 'error': str(e)}), 404 @app.route('/upload_subtitle', methods=['POST']) def upload_subtitle(): """ Upload subtitle file either through form-data or raw text content curl examples: Form-data: curl -X POST -F "subtitle=@/path/to/subtitles.srt" http://localhost:5000/upload_subtitle Raw text: curl -X POST --data-binary @/path/to/subtitles.srt -H "Content-Type: text/plain" http://localhost:5000/upload_subtitle """ if request.files and 'subtitle' in request.files: # Handle form-data upload subtitle = request.files['subtitle'] if subtitle.filename == '': return jsonify({'error': 'No selected file'}), 400 content = subtitle.read() else: # Handle raw text upload content = request.get_data() if not content: return jsonify({'error': 'No content received'}), 400 try: # Decode content to string if isinstance(content, bytes): content = content.decode('utf-8') return jsonify({'success': True, 'content': content}) except UnicodeDecodeError: return jsonify( {'error': 'Invalid subtitle file encoding, must be UTF-8'}), 400 with app.app_context(): # Create upload directory if it doesn't exist os.makedirs(os.path.join('static', 'uploads'), exist_ok=True) # Pokud je skript spuštěn přímo, exportujeme cookies if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True) #get_youtube_cookies()