Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for, send_from_directory | |
from functools import wraps | |
from functools import update_wrapper | |
import os | |
from app_rvc2 import SoniTranslate, TTS_Info | |
from madkoala.language_configuration import LANGUAGES_LIST, LANGUAGES, LANGUAGES_UNIDIRECTIONAL | |
from datetime import datetime | |
import re | |
import shutil | |
import browser_cookie3 | |
import json | |
import time | |
import sys | |
import logging | |
from flask import Response | |
import queue | |
import threading | |
from werkzeug.utils import secure_filename | |
from flask_sqlalchemy import SQLAlchemy | |
from sqlalchemy.orm import DeclarativeBase | |
import requests | |
import signal | |
app = Flask(__name__) | |
# Create TTS_Info instance to get XTTS status | |
tts_info = TTS_Info(piper_enabled=True, xtts_enabled=True) # Adjust these values based on your setup | |
logger = logging.getLogger() | |
logger.info(f"XTTS is {'enabled' if tts_info.xtts_enabled else 'disabled'}") | |
app.secret_key = os.environ.get('SECRET_KEY', 'default_secret_key') | |
APP_PASSWORD = os.environ.get('APP_PASSWORD', 'default_password') | |
YOUTUBE_USERNAME = os.environ.get('YOUTUBE_USERNAME', 'default_ysecret_key') | |
YOUTUBE_PASSWORD = os.environ.get('YOUTUBE_PASSWORD', 'default_ypassword') | |
# Konstanty pro proxy nastavení | |
PROXY_URL = "" # Změňte na vaši proxy | |
PROXY_USER = "" # Změňte na vaše uživatelské jméno | |
PROXY_PASS = "" # Změňte na vaše heslo | |
# Middleware pro kontrolu přihlášení | |
def login_required(f): | |
def decorated_function(*args, **kwargs): | |
if 'logged_in' not in session: | |
return redirect(url_for('login')) | |
return f(*args, **kwargs) | |
return decorated_function | |
def login(): | |
if request.method == 'POST': | |
if request.form['password'] == APP_PASSWORD: | |
session['logged_in'] = True | |
return redirect(url_for('home')) | |
return render_template('login.html', error='Nesprávné heslo') | |
return render_template('login.html') | |
def logout(): | |
session.pop('logged_in', None) | |
return redirect(url_for('login')) | |
soni = SoniTranslate() | |
# Přidání cesty pro servírování výsledných videí | |
def serve_output(filename): | |
return send_file('outputs/' + filename) | |
def serve_upload(filename): | |
return send_file('uploads/' + filename) | |
def serve_export(filename): | |
return send_file('exports/' + filename) | |
def home(): | |
return render_template('index_new.html', | |
languages=LANGUAGES_LIST, | |
tts_voices=soni.tts_info.tts_list()) | |
def create_project_folder(project_name): | |
# Vytvoření názvu složky ve formátu DD-MM-YYYY-H:M-project_name | |
current_time = datetime.now() | |
sanitized_name = re.sub(r'[<>:"/\\|?*]', '_', project_name) # Nahrazení neplatných znaků | |
folder_name = current_time.strftime("%d-%m-%Y-%H-%M-") + sanitized_name | |
# Vytvoření složky v exports | |
project_path = os.path.join('exports', folder_name) | |
os.makedirs(project_path, exist_ok=True) | |
return project_path | |
def translate(): | |
try: | |
data = request.form | |
files = request.files | |
# Kontrola manuální korekce | |
manual_correction = data.get('translation_correction', 'off') == 'on' | |
# Vytvoření projektové složky | |
project_name = data.get('project_name', 'untitled') | |
project_path = create_project_folder(project_name) | |
# Handle file upload or YouTube URL | |
media_file = None | |
if 'video' in files and files['video'].filename: | |
# Handle file upload | |
media_filer = files['video'] | |
custom_filename = "video.mp4" | |
upload_paths = os.path.join(project_path, custom_filename) | |
os.makedirs(project_path, exist_ok=True) | |
media_filer.save(upload_paths) | |
media_file = upload_paths | |
print("Video saved to project:", media_file) | |
elif 'downloaded_video_path' in data and data['downloaded_video_path'].strip(): | |
# Použít stažené video z YouTube | |
downloaded_path = data['downloaded_video_path'].strip() | |
if os.path.exists(downloaded_path): | |
media_file = downloaded_path | |
print("Using downloaded YouTube video:", media_file) | |
else: | |
return jsonify({'success': False, 'error': 'Stažené video nebylo nalezeno'}) | |
elif 'url' in data and data['url'].strip(): | |
# Handle YouTube URL | |
url = data['url'].strip() | |
if 'youtube.com' in url or 'youtu.be' in url: | |
# Check if we already have this video downloaded | |
if 'video_path' in session and os.path.exists(session['video_path']): | |
media_file = session['video_path'] | |
print("Using already downloaded video:", media_file) | |
else: | |
try: | |
media_file, thumbnail_url = download_youtube_video(url, project_path ) #PROXY_URL, PROXY_USER, PROXY_PASS | |
# Copy downloaded file to video.mp4 | |
final_path = os.path.join(project_path, "video.mp4") | |
shutil.copy(media_file, final_path) | |
media_file = final_path | |
print("YouTube video downloaded to:", media_file) | |
except Exception as e: | |
return jsonify({'success': False, 'error': f'YouTube download failed: {str(e)}'}) | |
else: | |
return jsonify({'success': False, 'error': 'Invalid YouTube URL'}) | |
# Pokud je zapnuta manuální korekce, uložit cestu k videu do session | |
if manual_correction and media_file: | |
session['current_video_path'] = media_file | |
print(f"Uloženo do session: {media_file}") | |
if not media_file: | |
return jsonify({'success': False, 'error': 'No video file or valid YouTube URL provided'}) | |
# Get parameters | |
source_lang = data.get('source_language', 'Automatic detection') | |
target_lang = data.get('target_language', 'English (en)') | |
max_speakers = int(data.get('max_speakers', 1)) | |
# Get edited subtitles if available | |
edited_subtitles = data.get('edited_subtitles') | |
# If edited subtitles are available, save them to a temporary file | |
subtitle_file = None | |
logger.info(f"edited_subtitles: {edited_subtitles}") | |
if edited_subtitles: | |
logger.info(f"Získány edited_subtitles: {edited_subtitles}") | |
# If edited subtitles are provided, use target language as source language | |
# since the subtitles are already in the target language | |
source_lang = target_lang | |
get_translated_text=False | |
get_video_from_text_json=True | |
text_json=edited_subtitles | |
os.makedirs('uploads', exist_ok=True) | |
subtitle_file = os.path.join('uploads', 'edited_subtitles.srt') | |
with open(subtitle_file, 'w', encoding='utf-8') as f: | |
f.write(edited_subtitles) | |
else: | |
get_translated_text=False | |
get_video_from_text_json=False | |
text_json="" | |
tts_voices = {} | |
for i in range(max_speakers): | |
voice_key = f'tts_voice{i:02d}' | |
if voice_key in data: | |
tts_voices[voice_key] = data[voice_key] | |
# Process the translation | |
result = soni.multilingual_media_conversion( | |
media_file=media_file, | |
link_media="", | |
directory_input="", | |
origin_language=source_lang, | |
target_language=target_lang, | |
max_speakers=max_speakers, | |
get_translated_text=get_translated_text, | |
get_video_from_text_json=get_video_from_text_json, | |
text_json=text_json, | |
max_accelerate_audio=1.0, | |
acceleration_rate_regulation=False, | |
**tts_voices, | |
is_gui=True | |
) | |
if isinstance(result, list): | |
# Přesun výstupních souborů do projektové složky | |
new_paths = [] | |
for file_path in result: | |
if os.path.exists(file_path): | |
new_path = os.path.join(project_path, os.path.basename(file_path)) | |
shutil.move(file_path, new_path) | |
# Převedení na relativní cestu pro frontend | |
relative_path = os.path.relpath(new_path, 'exports') | |
new_paths.append(f'/exports/{relative_path.replace(os.sep, "/")}') | |
# Převedení originálního videa na relativní cestu | |
original_video_path = None | |
if media_file and os.path.exists(media_file): | |
original_video_path = f'/exports/{os.path.relpath(media_file, "exports").replace(os.sep, "/")}' | |
return jsonify({ | |
'success': True, | |
'video': new_paths[0] if new_paths else None, | |
'original_video': original_video_path, | |
'files': new_paths | |
}) | |
else: | |
return jsonify({'success': False, 'error': str(result)}) | |
except Exception as e: | |
# Clean up temporary subtitle file in case of error | |
if 'subtitle_file' in locals() and subtitle_file and os.path.exists(subtitle_file): | |
os.remove(subtitle_file) | |
return jsonify({'success': False, 'error': str(e)}) | |
def get_youtube_cookies(): | |
print("Získávám cookies pomocí automatického přihlášení...") | |
logger.info("Získávám cookies pomocí automatického přihlášení...") | |
options = webdriver.ChromeOptions() | |
# Removed headless mode as it often causes issues with Google login | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
options.add_argument('--disable-gpu') | |
options.add_argument('--disable-features=TranslateUI') | |
options.add_argument('--disable-translate') | |
options.add_argument('--lang=en') | |
options.add_argument('--disable-blink-features=AutomationControlled') | |
try: | |
driver_manager = ChromeDriverManager() | |
driver_path = driver_manager.install() | |
service = Service(driver_path) | |
print(f"Používám ChromeDriver z: {driver_path}") | |
logger.info(f"Používám ChromeDriver z: {driver_path}") | |
driver = webdriver.Chrome(service=service, options=options) | |
# Add undetected characteristics | |
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") | |
wait = WebDriverWait(driver, 30) # Increased timeout | |
print("Přihlašuji se do Google účtu...") | |
logger.info("*YOUTUBE*Přihlašuji se do Google účtu...") | |
driver.get('https://accounts.google.com/signin/v2/identifier?service=youtube') | |
# Wait for and interact with email field | |
email_input = wait.until(EC.presence_of_element_located((By.NAME, "identifier"))) | |
email_input.send_keys("") | |
email_input.send_keys(Keys.RETURN) | |
# Wait for and interact with password field | |
try: | |
password_input = wait.until(EC.presence_of_element_located((By.NAME, "Passwd"))) | |
password_input.send_keys("") | |
password_input.send_keys(Keys.RETURN) | |
except TimeoutException: | |
print("⚠ Timeout při čekání na pole pro heslo. Možná je vyžadováno ruční ověření.") | |
logger.error("⚠ Timeout při čekání na pole pro heslo. Možná je vyžadováno ruční ověření.") | |
driver.quit() | |
return None | |
# Wait for successful login and redirect to YouTube | |
print("*YOUTUBE*Přecházím na YouTube...") | |
logger.info("*YOUTUBE*Přecházím na YouTube...") | |
try: | |
wait.until(lambda driver: "youtube.com" in driver.current_url) | |
except TimeoutException: | |
pass # Continue anyway | |
# Get and save cookies | |
cookies = driver.get_cookies() | |
if not cookies: | |
print("⚠ Nepodařilo se získat cookies. Přihlášení možná selhalo.") | |
logger.error("⚠ Nepodařilo se získat cookies. Přihlášení možná selhalo.") | |
driver.quit() | |
return None | |
# Save cookies to file | |
cookies_dir = "cookies" | |
os.makedirs(cookies_dir, exist_ok=True) | |
cookies_file = os.path.join(cookies_dir, "youtube.txt") | |
with open(cookies_file, 'w', encoding='utf-8') as f: | |
for cookie in cookies: | |
domain = cookie['domain'] | |
flag = "TRUE" | |
path = cookie['path'] | |
secure = str(cookie.get('secure', False)).upper() | |
expires = str(int(cookie.get('expiry', 0)) if cookie.get('expiry') else 0) | |
f.write(f"{domain}\tTRUE\t{path}\t{secure}\t{expires}\t{cookie['name']}\t{cookie['value']}\n") | |
print(f"*YOUTUBE*✓ Cookies úspěšně uloženy do: {cookies_file}") | |
logger.info(f"*YOUTUBE*✓ Cookies úspěšně uloženy do: {cookies_file}") | |
driver.quit() | |
return cookies_file | |
except Exception as e: | |
print(f"⚠ Chyba při získávání cookies: {str(e)}") | |
logger.error(f"⚠ Chyba při získávání cookies: {str(e)}") | |
if 'driver' in locals(): | |
driver.quit() | |
return None | |
def download_youtube_video(url, project_path, proxy=None, proxy_user=None, proxy_pass=None): | |
"""Stáhne video z YouTube""" | |
try: | |
print("\nZahajuji stahování videa...") | |
logger.info("*YOUTUBE-zahajeni*Zahajuji stahování videa...") | |
# Prepare the API request | |
api_url = 'https://hound-patient-honestly.ngrok-free.app/download-video' | |
headers = { | |
'Accept': '*/*', # Accept any content type | |
'Content-Type': 'application/json' | |
} | |
payload = { | |
'url': url, | |
'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5' | |
} | |
print(f"Odesílám požadavek na API: {api_url}") | |
print(f"Payload: {payload}") | |
logger.info(f"*YOUTUBE*Odesílám požadavek na API: {api_url}") | |
logger.info(f"*YOUTUBE*Payload: {payload}") | |
# Make the API request with proper JSON encoding | |
response = requests.post( | |
api_url, | |
headers=headers, | |
json=payload, | |
stream=True | |
) | |
print(f"Status code: {response.status_code}") | |
print(f"Response headers: {dict(response.headers)}") | |
logger.info(f"*YOUTUBE*Status code: {response.status_code}") | |
logger.info(f"Response headers: {dict(response.headers)}") | |
# First check if the response is JSON (error message) | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'application/json' in content_type: | |
try: | |
error_data = response.json() | |
error_message = error_data.get('error', 'Unknown API error') | |
raise Exception(f"API returned error: {error_message}") | |
except json.JSONDecodeError: | |
pass # Not JSON, continue with file download | |
if response.status_code != 200: | |
error_message = f"API request failed with status code {response.status_code}" | |
try: | |
error_message += f": {response.text[:200]}" | |
except Exception as e: | |
error_message += f" (Error reading response: {str(e)})" | |
raise Exception(error_message) | |
# Get content type and suggested filename from headers | |
content_type = response.headers.get('Content-Type', '').lower() | |
content_disp = response.headers.get('Content-Disposition', '') | |
print(f"Content-Type: {content_type}") | |
print(f"Content-Disposition: {content_disp}") | |
logger.info(f"Content-Type: {content_type}") | |
logger.info(f"Content-Disposition: {content_disp}") | |
video_path = os.path.join(project_path, "video.mp4") | |
total_size = 0 | |
print("Začínám stahovat video po částech...") | |
logger.info("*YOUTUBE-download*Začínám stahovat video po částech...") | |
with open(video_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
chunk_size = len(chunk) | |
total_size += chunk_size | |
f.write(chunk) | |
print(f"\rStaženo: {total_size / (1024*1024):.2f} MB", end='', flush=True) | |
if total_size % (1024*1024) == 0: # Log every 1MB | |
logger.info(f"*YOUTUBE-download*Staženo: {total_size / (1024*1024):.2f} MB") | |
print("\nKontroluji stažený soubor...") | |
logger.info("Kontroluji stažený soubor...") | |
if os.path.exists(video_path): | |
final_size = os.path.getsize(video_path) | |
print(f"Velikost souboru: {final_size / (1024*1024):.2f} MB") | |
logger.info(f"*YOUTUBE-finish*Velikost souboru: {final_size / (1024*1024):.2f} MB") | |
if final_size > 0: | |
print(f"✓ Video bylo úspěšně staženo do: {video_path}") | |
logger.info(f"*YOUTUBE*✓ Video bylo úspěšně staženo do: {video_path}") | |
return video_path | |
else: | |
os.remove(video_path) | |
raise Exception("Stažený soubor je prázdný") | |
else: | |
raise Exception("Soubor nebyl vytvořen") | |
except Exception as e: | |
print(f"Chyba při stahování videa: {str(e)}") | |
logger.error(f"Chyba při stahování videa: {str(e)}") | |
if 'video_path' in locals() and os.path.exists(video_path): | |
os.remove(video_path) | |
return None | |
def download_yt(): | |
try: | |
# Get data from form submission | |
url = request.form.get('url') | |
project_name = request.form.get('project_name', '') | |
if not url: | |
return jsonify({'success': False, 'error': 'URL není zadána'}) | |
if not any(x in url.lower() for x in ['youtube.com', 'youtu.be']): | |
return jsonify({'success': False, 'error': 'Není platná YouTube URL'}) | |
project_path = create_project_folder(project_name) | |
if not project_path: | |
return jsonify({'success': False, 'error': 'Nepodařilo se vytvořit složku projektu'}) | |
# Get thumbnail from API | |
api_url = 'https://hound-patient-honestly.ngrok-free.app/download-thumbnail' | |
headers = { | |
'accept': 'application/json', | |
'Content-Type': 'application/json' | |
} | |
payload = { | |
'url': url, | |
'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5' | |
} | |
# Get thumbnail URL with error handling | |
thumbnail_url = None # Initialize as None | |
try: | |
print(f"Sending request to thumbnail API with payload: {payload}") | |
thumbnail_response = requests.post(api_url, headers=headers, json=payload) | |
print(f"Thumbnail API response status: {thumbnail_response.status_code}") | |
print(f"Thumbnail API response content: {thumbnail_response.text}") | |
if thumbnail_response.status_code == 200: | |
try: | |
thumbnail_data = thumbnail_response.json() | |
if isinstance(thumbnail_data, dict): | |
thumbnail_url = thumbnail_data.get('thumbnail_url') | |
if not thumbnail_url: | |
# Extract video ID and use YouTube thumbnail URL directly | |
video_id = None | |
if 'youtube.com/watch?v=' in url: | |
video_id = url.split('watch?v=')[1].split('&')[0] | |
elif 'youtu.be/' in url: | |
video_id = url.split('youtu.be/')[1].split('?')[0] | |
if video_id: | |
thumbnail_url = f'https://img.youtube.com/vi/{video_id}/maxresdefault.jpg' | |
print(f"Final thumbnail URL: {thumbnail_url}") | |
except ValueError as e: | |
print(f"Error parsing thumbnail JSON: {str(e)}") | |
# Fallback to direct YouTube thumbnail | |
if 'youtube.com/watch?v=' in url: | |
video_id = url.split('watch?v=')[1].split('&')[0] | |
thumbnail_url = f'https://img.youtube.com/vi/{video_id}/maxresdefault.jpg' | |
except Exception as e: | |
print(f"Error getting thumbnail: {str(e)}") | |
video_path = download_youtube_video(url, project_path) | |
if video_path and os.path.exists(video_path): | |
session['video_path'] = video_path | |
session['project_path'] = project_path | |
return jsonify({ | |
'success': True, | |
'file_path': video_path, | |
'filename': os.path.basename(video_path), | |
'thumbnail_url': thumbnail_url | |
}) | |
else: | |
return jsonify({'success': False, 'error': 'Nepodařilo se stáhnout video'}) | |
except Exception as e: | |
print(f"Route error: {str(e)}") | |
return jsonify({'success': False, 'error': f'Chyba: {str(e)}'}) | |
def get_subtitles(): | |
try: | |
data = request.form | |
files = request.files | |
# Vytvoření projektové složky | |
project_name = data.get('project_name', 'untitled') | |
project_path = create_project_folder(project_name) | |
# Get file or URL or directory | |
media_file = None | |
link_media = "" | |
directory_input = "" | |
# Handle file upload | |
if 'video' in files and files['video'].filename: | |
# Handle file upload | |
media_filer = files['video'] | |
custom_filename = "video.mp4" | |
upload_paths = os.path.join(project_path, custom_filename) | |
os.makedirs(project_path, exist_ok=True) | |
media_filer.save(upload_paths) | |
media_file = upload_paths | |
print("Video saved to project:", media_file) | |
# Also save to uploads folder | |
upload_path = os.path.join('uploads', custom_filename) | |
os.makedirs('uploads', exist_ok=True) | |
shutil.copy(media_file, upload_path) | |
print("Video copied to uploads:", upload_path) | |
# Save video path to session | |
session['video_path'] = media_file | |
# Handle URL | |
elif 'url' in data and data['url'].strip(): | |
link_media = data['url'].strip() | |
print("URL provided:", link_media) | |
session['video_url'] = link_media | |
# Handle directory | |
elif 'directory' in data and data['directory'].strip(): | |
directory_input = data['directory'].strip() | |
print("Directory provided:", directory_input) | |
session['directory_path'] = directory_input | |
# Get parameters | |
source_lang = data.get('source_language', 'Automatic detection') | |
target_lang = data.get('target_language', 'English (en)') | |
# Process to get subtitles | |
result = soni.multilingual_media_conversion( | |
media_file=media_file, | |
link_media=link_media, | |
directory_input=directory_input, | |
origin_language=source_lang, | |
target_language=target_lang, | |
get_translated_text=True, | |
is_gui=True | |
) | |
# Save subtitles to session | |
if result: | |
session['subtitles'] = result | |
print("Subtitles saved to session") | |
print("Subtitles result:", result) | |
return jsonify({ | |
'success': True, | |
'subtitles': result, | |
'video_path': session.get('video_path'), | |
'video_url': session.get('video_url'), | |
'directory_path': session.get('directory_path') | |
}) | |
except Exception as e: | |
print("Error in get_subtitles:", str(e)) | |
return jsonify({'success': False, 'error': str(e)}) | |
def edit_subtitles(): | |
try: | |
data = request.json | |
subtitle_text = data.get('subtitle_text', '') | |
# Process the edited subtitles | |
# This would integrate with your existing subtitle processing logic | |
return jsonify({'success': True, 'message': 'Subtitles updated successfully'}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def voice_imitation(): | |
try: | |
data = request.form | |
files = request.files | |
voice_imitation_enabled = data.get('voice_imitation', 'false').lower() == 'true' | |
voice_imitation_method = data.get('voice_imitation_method', 'freevc') | |
voice_imitation_max_segments = int(data.get('voice_imitation_max_segments', 3)) | |
voice_imitation_vocals_dereverb = data.get('voice_imitation_vocals_dereverb', 'false').lower() == 'true' | |
voice_imitation_remove_previous = data.get('voice_imitation_remove_previous', 'true').lower() == 'true' | |
return jsonify({'success': True}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def get_voice_models(): | |
try: | |
method = request.args.get('method', 'RVC') | |
models_dir = 'weights' | |
if not os.path.exists(models_dir): | |
return jsonify({'success': False, 'error': 'Models directory not found'}) | |
# Get all .pth files from the weights directory | |
models = [f for f in os.listdir(models_dir) if f.endswith('.pth')] | |
return jsonify({'success': True, 'models': models}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def subtitle_settings(): | |
try: | |
data = request.form | |
# Get subtitle settings | |
output_format = data.get('subtitle_format', 'srt') | |
soft_subtitles = data.get('soft_subtitles', 'false').lower() == 'true' | |
burn_subtitles = data.get('burn_subtitles', 'false').lower() == 'true' | |
# Get Whisper settings | |
literalize_numbers = data.get('literalize_numbers', 'true').lower() == 'true' | |
vocal_refinement = data.get('vocal_refinement', 'false').lower() == 'true' | |
segment_duration = int(data.get('segment_duration', 15)) | |
whisper_model = data.get('whisper_model', 'large-v3') | |
compute_type = data.get('compute_type', 'float16') | |
batch_size = int(data.get('batch_size', 8)) | |
# Get text segmentation settings | |
text_segmentation = data.get('text_segmentation', 'sentence') | |
divide_text_by = data.get('divide_text_by', '') | |
# Get diarization and translation settings | |
diarization_model = data.get('diarization_model', 'pyannote_2.1') | |
translation_process = data.get('translation_process', 'google_translator_batch') | |
return jsonify({'success': True}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def output_settings(): | |
try: | |
data = request.form | |
# Get output settings | |
output_type = data.get('output_type', 'video (mp4)') | |
output_name = data.get('output_name', '') | |
play_sound = data.get('play_sound', 'true').lower() == 'true' | |
enable_cache = data.get('enable_cache', 'true').lower() == 'false' | |
preview = data.get('preview', 'false').lower() == 'true' | |
return jsonify({'success': True}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def save_srt(): | |
try: | |
data = request.json | |
srt_content = data.get('srt_content') | |
project_name = data.get('project_name') | |
if not srt_content or not project_name: | |
return jsonify({'success': False, 'error': 'Missing required data'}) | |
video_path = session.get('video_path') | |
# Create project directory with timestamp like in /translate | |
#timestamp = datetime.now().strftime("%d-%m-%Y-%H-%M") | |
#project_path = os.path.join('exports', f"{timestamp}-{project_name}") | |
#os.makedirs(project_path, exist_ok=True) | |
if video_path: | |
# Úprava řetězce pomocí regulárních výrazů | |
#cleaned_path = re.sub(r'^exports\\', '', video_path) # Odstraní 'exports\' na začátku | |
#cleaned_path = re.sub(r'\\video\.mp4$', '', cleaned_path) # Odstraní '\video.mp4' na konci | |
cleaned_path = re.sub(r'^exports[\\/]', '', video_path) | |
cleaned_path = re.sub(r'[\\/]+video\.mp4$', '', cleaned_path) | |
print("===================cleaned") | |
print(cleaned_path) | |
else: | |
print("Hodnota video_path není v session.") | |
project_path = os.path.join('exports', cleaned_path) | |
print("===================project") | |
print(project_path) | |
# Save SRT file in the same directory as video | |
srt_path = os.path.join(project_path, 'titulky.srt') | |
print("===================srt") | |
print(srt_path) | |
with open(srt_path, 'w', encoding='utf-8') as f: | |
f.write(srt_content) | |
return jsonify({'success': True}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def purge_folders(): | |
try: | |
# Seznam složek k vymazání | |
folders = ['outputs', 'audio'] | |
for folder in folders: | |
if os.path.exists(folder): | |
# Vymaže obsah složky | |
for filename in os.listdir(folder): | |
file_path = os.path.join(folder, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) | |
except Exception as e: | |
print(f'Failed to delete {file_path}. Reason: {e}') | |
return jsonify({'success': True, 'message': 'Složky byly úspěšně vymazány'}) | |
except Exception as e: | |
return jsonify({'success': False, 'error': str(e)}) | |
def get_current_video_path(): | |
video_path = session.get('current_video_path') | |
if video_path: | |
return jsonify({ | |
'success': True, | |
'video_path': video_path, | |
'filename': os.path.basename(video_path) | |
}) | |
return jsonify({'success': False, 'error': 'No video path in session'}) | |
def get_status(): | |
# Check if ngrok server is online | |
try: | |
response = requests.get( | |
'https://hound-patient-honestly.ngrok-free.app/', | |
params={'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5'}, | |
headers={'accept': 'application/json'}, | |
timeout=5 | |
) | |
ngrok_online = response.status_code == 200 | |
logger.info(f"Ngrok server status check: {response.status_code}") | |
except Exception as e: | |
logger.error(f"Error checking ngrok server: {str(e)}") | |
ngrok_online = False | |
return jsonify({ | |
'xtts_enabled': tts_info.xtts_enabled, | |
'piper_enabled': tts_info.piper_enabled, | |
'ngrok_server_online': ngrok_online | |
}) | |
#@app.route('/reset_sessions', methods=['POST']) | |
#def reset_sessions(): | |
# session.clear() | |
# return jsonify({'status': 'success'}) | |
# Create a queue for log messages | |
log_queue = queue.Queue() | |
# Create a custom handler that puts messages into the queue | |
class QueueHandler(logging.Handler): | |
def emit(self, record): | |
log_queue.put({ | |
'level': record.levelname, | |
'message': self.format(record) | |
}) | |
# Add the queue handler to the logger | |
logger = logging.getLogger() | |
queue_handler = QueueHandler() | |
logger.addHandler(queue_handler) | |
def logs(): | |
def generate(): | |
while True: | |
try: | |
# Get message from queue | |
message = log_queue.get(timeout=20) # 20 second timeout | |
yield f"data: {json.dumps(message)}\n\n" | |
except queue.Empty: | |
# Send keepalive every 20 seconds | |
yield f"data: {json.dumps({'level': 'INFO', 'message': 'keepalive'})}\n\n" | |
return Response(generate(), mimetype='text/event-stream') | |
logging.basicConfig(level=logging.DEBUG) | |
def edit_page(): | |
app.logger.info("-----------EDITACE-----------") | |
#if request.method == 'POST': | |
# folder_id = request.form.get('project_name') | |
#else: | |
# folder_id = request.args.get('project_name') | |
session['editovano'] = True | |
app.logger.info("Nastavena session hodnota 'editovano' na True") | |
folder_id = request.args.get('folder_id') | |
app.logger.info(f"Request parameters: folder_id={folder_id}") | |
video_path = None | |
subtitle_content = None | |
if folder_id: | |
app.logger.info("Processing request with folder_id") | |
app.logger.info(f"Processing request for folder_id: {folder_id}") | |
current_directory = os.getcwd() | |
three_levels_up = os.path.abspath(os.path.join(current_directory, "..")) | |
base_path = os.path.abspath(os.path.join(three_levels_up, "exports", folder_id)) | |
upload_dir = os.path.join('static', 'uploads') | |
folder_dir = os.path.join(base_path, folder_id) | |
video_file = os.path.join(base_path, 'video.mp4') | |
subtitle_file = os.path.join("exports", folder_id, 'titulky.srt') | |
video_fullfile = os.path.join("exports", folder_id, 'video.mp4') | |
app.logger.info( | |
f"Full subtitle file path: {os.path.abspath(subtitle_file)}" | |
f"Full video file path: {os.path.abspath(video_fullfile)}") | |
# Create directories if they don't exist | |
os.makedirs(base_path, exist_ok=True) | |
# Check file permissions | |
if os.path.exists(video_file): | |
try: | |
with open(video_file, 'rb') as f: | |
app.logger.info("Successfully opened video file") | |
video_path = f'{folder_id}/video.mp4' | |
app.logger.info(f"Set video path to: {video_path}") | |
except Exception as e: | |
app.logger.error(f"Error accessing video file: {e}") | |
else: | |
app.logger.error(f"Video file does not exist at: {video_file}") | |
if os.path.exists(subtitle_file): | |
try: | |
with open(subtitle_file, 'r', encoding='utf-8') as f: | |
subtitle_content = f.read() | |
app.logger.info( | |
f"Successfully read subtitle content {subtitle_content}") | |
except Exception as e: | |
app.logger.error(f"Error reading subtitle file: {e}") | |
else: | |
app.logger.error( | |
f"Subtitle file does not exist at: {subtitle_file}") | |
app.logger.info(f"base path: {base_path}") | |
app.logger.info(f"video file: {video_file}") | |
# Ensure subtitle content is properly escaped and formatted | |
if subtitle_content: | |
# Replace HTML entities with their actual characters | |
import html | |
subtitle_content = html.unescape(subtitle_content.strip()) | |
app.logger.info(f"Sending subtitle content with length: {len(subtitle_content)}") | |
app.logger.debug(f"Subtitle content sample: {subtitle_content[:200]}") | |
return render_template('index.html', | |
video_path=f"/serve_video/{folder_id}", | |
subtitle_content=subtitle_content, | |
languages=LANGUAGES_LIST, | |
tts_voices=soni.tts_info.tts_list()) | |
def upload_video(): | |
""" | |
Upload a video file either through form-data or raw binary content | |
curl examples: | |
Form-data: | |
curl -X POST -F "video=@/path/to/video.mp4" http://localhost:5000/upload_video | |
Raw binary: | |
curl -X POST --data-binary @/path/to/video.mp4 -H "Content-Type: video/mp4" http://localhost:5000/upload_video?filename=video.mp4 | |
""" | |
upload_dir = os.path.join('static', 'uploads') | |
os.makedirs(upload_dir, exist_ok=True) | |
if request.files and 'video' in request.files: | |
# Handle form-data upload | |
video = request.files['video'] | |
if video.filename == '': | |
return jsonify({'error': 'No selected file'}), 400 | |
filename = secure_filename(video.filename) | |
content = video.read() | |
else: | |
# Handle raw binary upload | |
content = request.get_data() | |
if not content: | |
return jsonify({'error': 'No content received'}), 400 | |
filename = secure_filename(request.args.get('filename', 'video.mp4')) | |
if content: | |
filepath = os.path.join(upload_dir, filename) | |
with open(filepath, 'wb') as f: | |
f.write(content) | |
return jsonify({ | |
'success': True, | |
'filename': filename, | |
'filepath': f'/static/uploads/{filename}' | |
}) | |
return jsonify({'error': 'No valid content received'}), 400 | |
def generate_subtitles(): | |
""" | |
Generate subtitles from video file using Whisper | |
""" | |
try: | |
import whisper | |
model = whisper.load_model("base") | |
video_file = request.files.get('video') | |
if not video_file: | |
return jsonify({'error': 'No video file provided'}), 400 | |
# Save video temporarily | |
temp_path = os.path.join('static', 'uploads', 'temp_' + secure_filename(video_file.filename)) | |
video_file.save(temp_path) | |
try: | |
# Transcribe the audio | |
result = model.transcribe(temp_path) | |
subtitles = [] | |
# Convert Whisper segments to SRT format | |
for i, segment in enumerate(result["segments"]): | |
subtitle = { | |
'start': segment['start'], | |
'end': segment['end'], | |
'text': segment['text'].strip() | |
} | |
subtitles.append(subtitle) | |
return jsonify({ | |
'success': True, | |
'subtitles': subtitles | |
}) | |
finally: | |
# Clean up temporary file | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
except Exception as e: | |
app.logger.error(f"Error generating subtitles: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def serve_video(folder_id): | |
""" | |
Endpoint pro servírování video souboru z adresáře exports. | |
""" | |
try: | |
# Vytvoření cesty k souboru | |
current_directory = os.getcwd() | |
three_levels_up = os.path.abspath(os.path.join(current_directory, "..")) | |
base_path = os.path.abspath(os.path.join("exports", folder_id)) | |
app.logger.error(f"Error serving video file: {base_path}/video.mp4") | |
# Vrátí soubor video.mp4 | |
return send_from_directory(base_path, "video.mp4") | |
except Exception as e: | |
app.logger.error(f"Error serving video file: {e}") | |
return jsonify({'success': False, 'error': str(e)}), 404 | |
def upload_subtitle(): | |
""" | |
Upload subtitle file either through form-data or raw text content | |
curl examples: | |
Form-data: | |
curl -X POST -F "subtitle=@/path/to/subtitles.srt" http://localhost:5000/upload_subtitle | |
Raw text: | |
curl -X POST --data-binary @/path/to/subtitles.srt -H "Content-Type: text/plain" http://localhost:5000/upload_subtitle | |
""" | |
if request.files and 'subtitle' in request.files: | |
# Handle form-data upload | |
subtitle = request.files['subtitle'] | |
if subtitle.filename == '': | |
return jsonify({'error': 'No selected file'}), 400 | |
content = subtitle.read() | |
else: | |
# Handle raw text upload | |
content = request.get_data() | |
if not content: | |
return jsonify({'error': 'No content received'}), 400 | |
try: | |
# Decode content to string | |
if isinstance(content, bytes): | |
content = content.decode('utf-8') | |
return jsonify({'success': True, 'content': content}) | |
except UnicodeDecodeError: | |
return jsonify( | |
{'error': 'Invalid subtitle file encoding, must be UTF-8'}), 400 | |
with app.app_context(): | |
# Create upload directory if it doesn't exist | |
os.makedirs(os.path.join('static', 'uploads'), exist_ok=True) | |
# Pokud je skript spuštěn přímo, exportujeme cookies | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860, debug=True) | |
#get_youtube_cookies() | |