koala2 / app.py
arcanus's picture
Update app.py
f14a52e verified
raw
history blame
42.4 kB
from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for, send_from_directory
from functools import wraps
from functools import update_wrapper
import os
from app_rvc2 import SoniTranslate, TTS_Info
from madkoala.language_configuration import LANGUAGES_LIST, LANGUAGES, LANGUAGES_UNIDIRECTIONAL
from datetime import datetime
import re
import shutil
import browser_cookie3
import json
import time
import sys
import logging
from flask import Response
import queue
import threading
from werkzeug.utils import secure_filename
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase
import requests
import signal
app = Flask(__name__)
# Create TTS_Info instance to get XTTS status
tts_info = TTS_Info(piper_enabled=True, xtts_enabled=True) # Adjust these values based on your setup
logger = logging.getLogger()
logger.info(f"XTTS is {'enabled' if tts_info.xtts_enabled else 'disabled'}")
app.secret_key = os.environ.get('SECRET_KEY', 'default_secret_key')
APP_PASSWORD = os.environ.get('APP_PASSWORD', 'default_password')
YOUTUBE_USERNAME = os.environ.get('YOUTUBE_USERNAME', 'default_ysecret_key')
YOUTUBE_PASSWORD = os.environ.get('YOUTUBE_PASSWORD', 'default_ypassword')
# Konstanty pro proxy nastavení
PROXY_URL = "" # Změňte na vaši proxy
PROXY_USER = "" # Změňte na vaše uživatelské jméno
PROXY_PASS = "" # Změňte na vaše heslo
# Middleware pro kontrolu přihlášení
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
if request.form['password'] == APP_PASSWORD:
session['logged_in'] = True
return redirect(url_for('home'))
return render_template('login.html', error='Nesprávné heslo')
return render_template('login.html')
@app.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('login'))
soni = SoniTranslate()
# Přidání cesty pro servírování výsledných videí
@app.route('/outputs/<path:filename>')
def serve_output(filename):
return send_file('outputs/' + filename)
@app.route('/uploads/<path:filename>')
def serve_upload(filename):
return send_file('uploads/' + filename)
@app.route('/exports/<path:filename>')
def serve_export(filename):
return send_file('exports/' + filename)
@app.route('/')
@login_required
def home():
return render_template('index_new.html',
languages=LANGUAGES_LIST,
tts_voices=soni.tts_info.tts_list())
def create_project_folder(project_name):
# Vytvoření názvu složky ve formátu DD-MM-YYYY-H:M-project_name
current_time = datetime.now()
sanitized_name = re.sub(r'[<>:"/\\|?*]', '_', project_name) # Nahrazení neplatných znaků
folder_name = current_time.strftime("%d-%m-%Y-%H-%M-") + sanitized_name
# Vytvoření složky v exports
project_path = os.path.join('exports', folder_name)
os.makedirs(project_path, exist_ok=True)
return project_path
@app.route('/translate', methods=['POST'])
@login_required
def translate():
try:
data = request.form
files = request.files
# Kontrola manuální korekce
manual_correction = data.get('translation_correction', 'off') == 'on'
# Vytvoření projektové složky
project_name = data.get('project_name', 'untitled')
project_path = create_project_folder(project_name)
# Handle file upload or YouTube URL
media_file = None
if 'video' in files and files['video'].filename:
# Handle file upload
media_filer = files['video']
custom_filename = "video.mp4"
upload_paths = os.path.join(project_path, custom_filename)
os.makedirs(project_path, exist_ok=True)
media_filer.save(upload_paths)
media_file = upload_paths
print("Video saved to project:", media_file)
elif 'downloaded_video_path' in data and data['downloaded_video_path'].strip():
# Použít stažené video z YouTube
downloaded_path = data['downloaded_video_path'].strip()
if os.path.exists(downloaded_path):
media_file = downloaded_path
print("Using downloaded YouTube video:", media_file)
else:
return jsonify({'success': False, 'error': 'Stažené video nebylo nalezeno'})
elif 'url' in data and data['url'].strip():
# Handle YouTube URL
url = data['url'].strip()
if 'youtube.com' in url or 'youtu.be' in url:
# Check if we already have this video downloaded
if 'video_path' in session and os.path.exists(session['video_path']):
media_file = session['video_path']
print("Using already downloaded video:", media_file)
else:
try:
media_file, thumbnail_url = download_youtube_video(url, project_path ) #PROXY_URL, PROXY_USER, PROXY_PASS
# Copy downloaded file to video.mp4
final_path = os.path.join(project_path, "video.mp4")
shutil.copy(media_file, final_path)
media_file = final_path
print("YouTube video downloaded to:", media_file)
except Exception as e:
return jsonify({'success': False, 'error': f'YouTube download failed: {str(e)}'})
else:
return jsonify({'success': False, 'error': 'Invalid YouTube URL'})
# Pokud je zapnuta manuální korekce, uložit cestu k videu do session
if manual_correction and media_file:
session['current_video_path'] = media_file
print(f"Uloženo do session: {media_file}")
if not media_file:
return jsonify({'success': False, 'error': 'No video file or valid YouTube URL provided'})
# Get parameters
source_lang = data.get('source_language', 'Automatic detection')
target_lang = data.get('target_language', 'English (en)')
max_speakers = int(data.get('max_speakers', 1))
# Get edited subtitles if available
edited_subtitles = data.get('edited_subtitles')
# If edited subtitles are available, save them to a temporary file
subtitle_file = None
logger.info(f"edited_subtitles: {edited_subtitles}")
if edited_subtitles:
logger.info(f"Získány edited_subtitles: {edited_subtitles}")
# If edited subtitles are provided, use target language as source language
# since the subtitles are already in the target language
source_lang = target_lang
get_translated_text=False
get_video_from_text_json=True
text_json=edited_subtitles
os.makedirs('uploads', exist_ok=True)
subtitle_file = os.path.join('uploads', 'edited_subtitles.srt')
with open(subtitle_file, 'w', encoding='utf-8') as f:
f.write(edited_subtitles)
else:
get_translated_text=False
get_video_from_text_json=False
text_json=""
tts_voices = {}
for i in range(max_speakers):
voice_key = f'tts_voice{i:02d}'
if voice_key in data:
tts_voices[voice_key] = data[voice_key]
# Process the translation
result = soni.multilingual_media_conversion(
media_file=media_file,
link_media="",
directory_input="",
origin_language=source_lang,
target_language=target_lang,
max_speakers=max_speakers,
get_translated_text=get_translated_text,
get_video_from_text_json=get_video_from_text_json,
text_json=text_json,
max_accelerate_audio=1.0,
acceleration_rate_regulation=False,
**tts_voices,
is_gui=True
)
if isinstance(result, list):
# Přesun výstupních souborů do projektové složky
new_paths = []
for file_path in result:
if os.path.exists(file_path):
new_path = os.path.join(project_path, os.path.basename(file_path))
shutil.move(file_path, new_path)
# Převedení na relativní cestu pro frontend
relative_path = os.path.relpath(new_path, 'exports')
new_paths.append(f'/exports/{relative_path.replace(os.sep, "/")}')
# Převedení originálního videa na relativní cestu
original_video_path = None
if media_file and os.path.exists(media_file):
original_video_path = f'/exports/{os.path.relpath(media_file, "exports").replace(os.sep, "/")}'
return jsonify({
'success': True,
'video': new_paths[0] if new_paths else None,
'original_video': original_video_path,
'files': new_paths
})
else:
return jsonify({'success': False, 'error': str(result)})
except Exception as e:
# Clean up temporary subtitle file in case of error
if 'subtitle_file' in locals() and subtitle_file and os.path.exists(subtitle_file):
os.remove(subtitle_file)
return jsonify({'success': False, 'error': str(e)})
def get_youtube_cookies():
print("Získávám cookies pomocí automatického přihlášení...")
logger.info("Získávám cookies pomocí automatického přihlášení...")
options = webdriver.ChromeOptions()
# Removed headless mode as it often causes issues with Google login
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--disable-features=TranslateUI')
options.add_argument('--disable-translate')
options.add_argument('--lang=en')
options.add_argument('--disable-blink-features=AutomationControlled')
try:
driver_manager = ChromeDriverManager()
driver_path = driver_manager.install()
service = Service(driver_path)
print(f"Používám ChromeDriver z: {driver_path}")
logger.info(f"Používám ChromeDriver z: {driver_path}")
driver = webdriver.Chrome(service=service, options=options)
# Add undetected characteristics
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
wait = WebDriverWait(driver, 30) # Increased timeout
print("Přihlašuji se do Google účtu...")
logger.info("*YOUTUBE*Přihlašuji se do Google účtu...")
driver.get('https://accounts.google.com/signin/v2/identifier?service=youtube')
# Wait for and interact with email field
email_input = wait.until(EC.presence_of_element_located((By.NAME, "identifier")))
email_input.send_keys("")
email_input.send_keys(Keys.RETURN)
# Wait for and interact with password field
try:
password_input = wait.until(EC.presence_of_element_located((By.NAME, "Passwd")))
password_input.send_keys("")
password_input.send_keys(Keys.RETURN)
except TimeoutException:
print("⚠ Timeout při čekání na pole pro heslo. Možná je vyžadováno ruční ověření.")
logger.error("⚠ Timeout při čekání na pole pro heslo. Možná je vyžadováno ruční ověření.")
driver.quit()
return None
# Wait for successful login and redirect to YouTube
print("*YOUTUBE*Přecházím na YouTube...")
logger.info("*YOUTUBE*Přecházím na YouTube...")
try:
wait.until(lambda driver: "youtube.com" in driver.current_url)
except TimeoutException:
pass # Continue anyway
# Get and save cookies
cookies = driver.get_cookies()
if not cookies:
print("⚠ Nepodařilo se získat cookies. Přihlášení možná selhalo.")
logger.error("⚠ Nepodařilo se získat cookies. Přihlášení možná selhalo.")
driver.quit()
return None
# Save cookies to file
cookies_dir = "cookies"
os.makedirs(cookies_dir, exist_ok=True)
cookies_file = os.path.join(cookies_dir, "youtube.txt")
with open(cookies_file, 'w', encoding='utf-8') as f:
for cookie in cookies:
domain = cookie['domain']
flag = "TRUE"
path = cookie['path']
secure = str(cookie.get('secure', False)).upper()
expires = str(int(cookie.get('expiry', 0)) if cookie.get('expiry') else 0)
f.write(f"{domain}\tTRUE\t{path}\t{secure}\t{expires}\t{cookie['name']}\t{cookie['value']}\n")
print(f"*YOUTUBE*✓ Cookies úspěšně uloženy do: {cookies_file}")
logger.info(f"*YOUTUBE*✓ Cookies úspěšně uloženy do: {cookies_file}")
driver.quit()
return cookies_file
except Exception as e:
print(f"⚠ Chyba při získávání cookies: {str(e)}")
logger.error(f"⚠ Chyba při získávání cookies: {str(e)}")
if 'driver' in locals():
driver.quit()
return None
def download_youtube_video(url, project_path, proxy=None, proxy_user=None, proxy_pass=None):
"""Stáhne video z YouTube"""
try:
print("\nZahajuji stahování videa...")
logger.info("*YOUTUBE-zahajeni*Zahajuji stahování videa...")
# Prepare the API request
api_url = 'https://hound-patient-honestly.ngrok-free.app/download-video'
headers = {
'Accept': '*/*', # Accept any content type
'Content-Type': 'application/json'
}
payload = {
'url': url,
'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5'
}
print(f"Odesílám požadavek na API: {api_url}")
print(f"Payload: {payload}")
logger.info(f"*YOUTUBE*Odesílám požadavek na API: {api_url}")
logger.info(f"*YOUTUBE*Payload: {payload}")
# Make the API request with proper JSON encoding
response = requests.post(
api_url,
headers=headers,
json=payload,
stream=True
)
print(f"Status code: {response.status_code}")
print(f"Response headers: {dict(response.headers)}")
logger.info(f"*YOUTUBE*Status code: {response.status_code}")
logger.info(f"Response headers: {dict(response.headers)}")
# First check if the response is JSON (error message)
content_type = response.headers.get('Content-Type', '').lower()
if 'application/json' in content_type:
try:
error_data = response.json()
error_message = error_data.get('error', 'Unknown API error')
raise Exception(f"API returned error: {error_message}")
except json.JSONDecodeError:
pass # Not JSON, continue with file download
if response.status_code != 200:
error_message = f"API request failed with status code {response.status_code}"
try:
error_message += f": {response.text[:200]}"
except Exception as e:
error_message += f" (Error reading response: {str(e)})"
raise Exception(error_message)
# Get content type and suggested filename from headers
content_type = response.headers.get('Content-Type', '').lower()
content_disp = response.headers.get('Content-Disposition', '')
print(f"Content-Type: {content_type}")
print(f"Content-Disposition: {content_disp}")
logger.info(f"Content-Type: {content_type}")
logger.info(f"Content-Disposition: {content_disp}")
video_path = os.path.join(project_path, "video.mp4")
total_size = 0
print("Začínám stahovat video po částech...")
logger.info("*YOUTUBE-download*Začínám stahovat video po částech...")
with open(video_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
chunk_size = len(chunk)
total_size += chunk_size
f.write(chunk)
print(f"\rStaženo: {total_size / (1024*1024):.2f} MB", end='', flush=True)
if total_size % (1024*1024) == 0: # Log every 1MB
logger.info(f"*YOUTUBE-download*Staženo: {total_size / (1024*1024):.2f} MB")
print("\nKontroluji stažený soubor...")
logger.info("Kontroluji stažený soubor...")
if os.path.exists(video_path):
final_size = os.path.getsize(video_path)
print(f"Velikost souboru: {final_size / (1024*1024):.2f} MB")
logger.info(f"*YOUTUBE-finish*Velikost souboru: {final_size / (1024*1024):.2f} MB")
if final_size > 0:
print(f"✓ Video bylo úspěšně staženo do: {video_path}")
logger.info(f"*YOUTUBE*✓ Video bylo úspěšně staženo do: {video_path}")
return video_path
else:
os.remove(video_path)
raise Exception("Stažený soubor je prázdný")
else:
raise Exception("Soubor nebyl vytvořen")
except Exception as e:
print(f"Chyba při stahování videa: {str(e)}")
logger.error(f"Chyba při stahování videa: {str(e)}")
if 'video_path' in locals() and os.path.exists(video_path):
os.remove(video_path)
return None
@app.route('/download_youtube', methods=['POST'])
@login_required
def download_yt():
try:
# Get data from form submission
url = request.form.get('url')
project_name = request.form.get('project_name', '')
if not url:
return jsonify({'success': False, 'error': 'URL není zadána'})
if not any(x in url.lower() for x in ['youtube.com', 'youtu.be']):
return jsonify({'success': False, 'error': 'Není platná YouTube URL'})
project_path = create_project_folder(project_name)
if not project_path:
return jsonify({'success': False, 'error': 'Nepodařilo se vytvořit složku projektu'})
# Get thumbnail from API
api_url = 'https://hound-patient-honestly.ngrok-free.app/download-thumbnail'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
payload = {
'url': url,
'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5'
}
# Get thumbnail URL with error handling
thumbnail_url = None # Initialize as None
try:
print(f"Sending request to thumbnail API with payload: {payload}")
thumbnail_response = requests.post(api_url, headers=headers, json=payload)
print(f"Thumbnail API response status: {thumbnail_response.status_code}")
print(f"Thumbnail API response content: {thumbnail_response.text}")
if thumbnail_response.status_code == 200:
try:
thumbnail_data = thumbnail_response.json()
if isinstance(thumbnail_data, dict):
thumbnail_url = thumbnail_data.get('thumbnail_url')
if not thumbnail_url:
# Extract video ID and use YouTube thumbnail URL directly
video_id = None
if 'youtube.com/watch?v=' in url:
video_id = url.split('watch?v=')[1].split('&')[0]
elif 'youtu.be/' in url:
video_id = url.split('youtu.be/')[1].split('?')[0]
if video_id:
thumbnail_url = f'https://img.youtube.com/vi/{video_id}/maxresdefault.jpg'
print(f"Final thumbnail URL: {thumbnail_url}")
except ValueError as e:
print(f"Error parsing thumbnail JSON: {str(e)}")
# Fallback to direct YouTube thumbnail
if 'youtube.com/watch?v=' in url:
video_id = url.split('watch?v=')[1].split('&')[0]
thumbnail_url = f'https://img.youtube.com/vi/{video_id}/maxresdefault.jpg'
except Exception as e:
print(f"Error getting thumbnail: {str(e)}")
video_path = download_youtube_video(url, project_path)
if video_path and os.path.exists(video_path):
session['video_path'] = video_path
session['project_path'] = project_path
return jsonify({
'success': True,
'file_path': video_path,
'filename': os.path.basename(video_path),
'thumbnail_url': thumbnail_url
})
else:
return jsonify({'success': False, 'error': 'Nepodařilo se stáhnout video'})
except Exception as e:
print(f"Route error: {str(e)}")
return jsonify({'success': False, 'error': f'Chyba: {str(e)}'})
@app.route('/get_subtitles', methods=['POST'])
@login_required
def get_subtitles():
try:
data = request.form
files = request.files
# Vytvoření projektové složky
project_name = data.get('project_name', 'untitled')
project_path = create_project_folder(project_name)
# Get file or URL or directory
media_file = None
link_media = ""
directory_input = ""
# Handle file upload
if 'video' in files and files['video'].filename:
# Handle file upload
media_filer = files['video']
custom_filename = "video.mp4"
upload_paths = os.path.join(project_path, custom_filename)
os.makedirs(project_path, exist_ok=True)
media_filer.save(upload_paths)
media_file = upload_paths
print("Video saved to project:", media_file)
# Also save to uploads folder
upload_path = os.path.join('uploads', custom_filename)
os.makedirs('uploads', exist_ok=True)
shutil.copy(media_file, upload_path)
print("Video copied to uploads:", upload_path)
# Save video path to session
session['video_path'] = media_file
# Handle URL
elif 'url' in data and data['url'].strip():
link_media = data['url'].strip()
print("URL provided:", link_media)
session['video_url'] = link_media
# Handle directory
elif 'directory' in data and data['directory'].strip():
directory_input = data['directory'].strip()
print("Directory provided:", directory_input)
session['directory_path'] = directory_input
# Get parameters
source_lang = data.get('source_language', 'Automatic detection')
target_lang = data.get('target_language', 'English (en)')
# Process to get subtitles
result = soni.multilingual_media_conversion(
media_file=media_file,
link_media=link_media,
directory_input=directory_input,
origin_language=source_lang,
target_language=target_lang,
get_translated_text=True,
is_gui=True
)
# Save subtitles to session
if result:
session['subtitles'] = result
print("Subtitles saved to session")
print("Subtitles result:", result)
return jsonify({
'success': True,
'subtitles': result,
'video_path': session.get('video_path'),
'video_url': session.get('video_url'),
'directory_path': session.get('directory_path')
})
except Exception as e:
print("Error in get_subtitles:", str(e))
return jsonify({'success': False, 'error': str(e)})
@app.route('/edit_subtitles', methods=['POST'])
@login_required
def edit_subtitles():
try:
data = request.json
subtitle_text = data.get('subtitle_text', '')
# Process the edited subtitles
# This would integrate with your existing subtitle processing logic
return jsonify({'success': True, 'message': 'Subtitles updated successfully'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/voice_imitation', methods=['POST'])
@login_required
def voice_imitation():
try:
data = request.form
files = request.files
voice_imitation_enabled = data.get('voice_imitation', 'false').lower() == 'true'
voice_imitation_method = data.get('voice_imitation_method', 'freevc')
voice_imitation_max_segments = int(data.get('voice_imitation_max_segments', 3))
voice_imitation_vocals_dereverb = data.get('voice_imitation_vocals_dereverb', 'false').lower() == 'true'
voice_imitation_remove_previous = data.get('voice_imitation_remove_previous', 'true').lower() == 'true'
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/get_voice_models', methods=['GET'])
@login_required
def get_voice_models():
try:
method = request.args.get('method', 'RVC')
models_dir = 'weights'
if not os.path.exists(models_dir):
return jsonify({'success': False, 'error': 'Models directory not found'})
# Get all .pth files from the weights directory
models = [f for f in os.listdir(models_dir) if f.endswith('.pth')]
return jsonify({'success': True, 'models': models})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/subtitle_settings', methods=['POST'])
@login_required
def subtitle_settings():
try:
data = request.form
# Get subtitle settings
output_format = data.get('subtitle_format', 'srt')
soft_subtitles = data.get('soft_subtitles', 'false').lower() == 'true'
burn_subtitles = data.get('burn_subtitles', 'false').lower() == 'true'
# Get Whisper settings
literalize_numbers = data.get('literalize_numbers', 'true').lower() == 'true'
vocal_refinement = data.get('vocal_refinement', 'false').lower() == 'true'
segment_duration = int(data.get('segment_duration', 15))
whisper_model = data.get('whisper_model', 'large-v3')
compute_type = data.get('compute_type', 'float16')
batch_size = int(data.get('batch_size', 8))
# Get text segmentation settings
text_segmentation = data.get('text_segmentation', 'sentence')
divide_text_by = data.get('divide_text_by', '')
# Get diarization and translation settings
diarization_model = data.get('diarization_model', 'pyannote_2.1')
translation_process = data.get('translation_process', 'google_translator_batch')
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/output_settings', methods=['POST'])
@login_required
def output_settings():
try:
data = request.form
# Get output settings
output_type = data.get('output_type', 'video (mp4)')
output_name = data.get('output_name', '')
play_sound = data.get('play_sound', 'true').lower() == 'true'
enable_cache = data.get('enable_cache', 'true').lower() == 'false'
preview = data.get('preview', 'false').lower() == 'true'
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/save_srt', methods=['POST'])
@login_required
def save_srt():
try:
data = request.json
srt_content = data.get('srt_content')
project_name = data.get('project_name')
if not srt_content or not project_name:
return jsonify({'success': False, 'error': 'Missing required data'})
video_path = session.get('video_path')
# Create project directory with timestamp like in /translate
#timestamp = datetime.now().strftime("%d-%m-%Y-%H-%M")
#project_path = os.path.join('exports', f"{timestamp}-{project_name}")
#os.makedirs(project_path, exist_ok=True)
if video_path:
# Úprava řetězce pomocí regulárních výrazů
#cleaned_path = re.sub(r'^exports\\', '', video_path) # Odstraní 'exports\' na začátku
#cleaned_path = re.sub(r'\\video\.mp4$', '', cleaned_path) # Odstraní '\video.mp4' na konci
cleaned_path = re.sub(r'^exports[\\/]', '', video_path)
cleaned_path = re.sub(r'[\\/]+video\.mp4$', '', cleaned_path)
print("===================cleaned")
print(cleaned_path)
else:
print("Hodnota video_path není v session.")
project_path = os.path.join('exports', cleaned_path)
print("===================project")
print(project_path)
# Save SRT file in the same directory as video
srt_path = os.path.join(project_path, 'titulky.srt')
print("===================srt")
print(srt_path)
with open(srt_path, 'w', encoding='utf-8') as f:
f.write(srt_content)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/purge', methods=['POST'])
@login_required
def purge_folders():
try:
# Seznam složek k vymazání
folders = ['outputs', 'audio']
for folder in folders:
if os.path.exists(folder):
# Vymaže obsah složky
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
return jsonify({'success': True, 'message': 'Složky byly úspěšně vymazány'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/get_current_video_path', methods=['GET'])
@login_required
def get_current_video_path():
video_path = session.get('current_video_path')
if video_path:
return jsonify({
'success': True,
'video_path': video_path,
'filename': os.path.basename(video_path)
})
return jsonify({'success': False, 'error': 'No video path in session'})
@app.route('/get_status')
def get_status():
# Check if ngrok server is online
try:
response = requests.get(
'https://hound-patient-honestly.ngrok-free.app/',
params={'api_key': '5as4d4f12sxdf45sfg46vawd74879ad5sd5AF4g6d8f4hfgb5'},
headers={'accept': 'application/json'},
timeout=5
)
ngrok_online = response.status_code == 200
logger.info(f"Ngrok server status check: {response.status_code}")
except Exception as e:
logger.error(f"Error checking ngrok server: {str(e)}")
ngrok_online = False
return jsonify({
'xtts_enabled': tts_info.xtts_enabled,
'piper_enabled': tts_info.piper_enabled,
'ngrok_server_online': ngrok_online
})
#@app.route('/reset_sessions', methods=['POST'])
#def reset_sessions():
# session.clear()
# return jsonify({'status': 'success'})
# Create a queue for log messages
log_queue = queue.Queue()
# Create a custom handler that puts messages into the queue
class QueueHandler(logging.Handler):
def emit(self, record):
log_queue.put({
'level': record.levelname,
'message': self.format(record)
})
# Add the queue handler to the logger
logger = logging.getLogger()
queue_handler = QueueHandler()
logger.addHandler(queue_handler)
@app.route('/logs')
def logs():
def generate():
while True:
try:
# Get message from queue
message = log_queue.get(timeout=20) # 20 second timeout
yield f"data: {json.dumps(message)}\n\n"
except queue.Empty:
# Send keepalive every 20 seconds
yield f"data: {json.dumps({'level': 'INFO', 'message': 'keepalive'})}\n\n"
return Response(generate(), mimetype='text/event-stream')
logging.basicConfig(level=logging.DEBUG)
@app.route('/editace', methods=['GET', 'POST'])
def edit_page():
app.logger.info("-----------EDITACE-----------")
#if request.method == 'POST':
# folder_id = request.form.get('project_name')
#else:
# folder_id = request.args.get('project_name')
session['editovano'] = True
app.logger.info("Nastavena session hodnota 'editovano' na True")
folder_id = request.args.get('folder_id')
app.logger.info(f"Request parameters: folder_id={folder_id}")
video_path = None
subtitle_content = None
if folder_id:
app.logger.info("Processing request with folder_id")
app.logger.info(f"Processing request for folder_id: {folder_id}")
current_directory = os.getcwd()
three_levels_up = os.path.abspath(os.path.join(current_directory, ".."))
base_path = os.path.abspath(os.path.join(three_levels_up, "exports", folder_id))
upload_dir = os.path.join('static', 'uploads')
folder_dir = os.path.join(base_path, folder_id)
video_file = os.path.join(base_path, 'video.mp4')
subtitle_file = os.path.join("exports", folder_id, 'titulky.srt')
video_fullfile = os.path.join("exports", folder_id, 'video.mp4')
app.logger.info(
f"Full subtitle file path: {os.path.abspath(subtitle_file)}"
f"Full video file path: {os.path.abspath(video_fullfile)}")
# Create directories if they don't exist
os.makedirs(base_path, exist_ok=True)
# Check file permissions
if os.path.exists(video_file):
try:
with open(video_file, 'rb') as f:
app.logger.info("Successfully opened video file")
video_path = f'{folder_id}/video.mp4'
app.logger.info(f"Set video path to: {video_path}")
except Exception as e:
app.logger.error(f"Error accessing video file: {e}")
else:
app.logger.error(f"Video file does not exist at: {video_file}")
if os.path.exists(subtitle_file):
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
app.logger.info(
f"Successfully read subtitle content {subtitle_content}")
except Exception as e:
app.logger.error(f"Error reading subtitle file: {e}")
else:
app.logger.error(
f"Subtitle file does not exist at: {subtitle_file}")
app.logger.info(f"base path: {base_path}")
app.logger.info(f"video file: {video_file}")
# Ensure subtitle content is properly escaped and formatted
if subtitle_content:
# Replace HTML entities with their actual characters
import html
subtitle_content = html.unescape(subtitle_content.strip())
app.logger.info(f"Sending subtitle content with length: {len(subtitle_content)}")
app.logger.debug(f"Subtitle content sample: {subtitle_content[:200]}")
return render_template('index.html',
video_path=f"/serve_video/{folder_id}",
subtitle_content=subtitle_content,
languages=LANGUAGES_LIST,
tts_voices=soni.tts_info.tts_list())
@app.route('/upload_video', methods=['POST'])
def upload_video():
"""
Upload a video file either through form-data or raw binary content
curl examples:
Form-data:
curl -X POST -F "video=@/path/to/video.mp4" http://localhost:5000/upload_video
Raw binary:
curl -X POST --data-binary @/path/to/video.mp4 -H "Content-Type: video/mp4" http://localhost:5000/upload_video?filename=video.mp4
"""
upload_dir = os.path.join('static', 'uploads')
os.makedirs(upload_dir, exist_ok=True)
if request.files and 'video' in request.files:
# Handle form-data upload
video = request.files['video']
if video.filename == '':
return jsonify({'error': 'No selected file'}), 400
filename = secure_filename(video.filename)
content = video.read()
else:
# Handle raw binary upload
content = request.get_data()
if not content:
return jsonify({'error': 'No content received'}), 400
filename = secure_filename(request.args.get('filename', 'video.mp4'))
if content:
filepath = os.path.join(upload_dir, filename)
with open(filepath, 'wb') as f:
f.write(content)
return jsonify({
'success': True,
'filename': filename,
'filepath': f'/static/uploads/{filename}'
})
return jsonify({'error': 'No valid content received'}), 400
@app.route('/generate_subtitles', methods=['POST'])
def generate_subtitles():
"""
Generate subtitles from video file using Whisper
"""
try:
import whisper
model = whisper.load_model("base")
video_file = request.files.get('video')
if not video_file:
return jsonify({'error': 'No video file provided'}), 400
# Save video temporarily
temp_path = os.path.join('static', 'uploads', 'temp_' + secure_filename(video_file.filename))
video_file.save(temp_path)
try:
# Transcribe the audio
result = model.transcribe(temp_path)
subtitles = []
# Convert Whisper segments to SRT format
for i, segment in enumerate(result["segments"]):
subtitle = {
'start': segment['start'],
'end': segment['end'],
'text': segment['text'].strip()
}
subtitles.append(subtitle)
return jsonify({
'success': True,
'subtitles': subtitles
})
finally:
# Clean up temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
app.logger.error(f"Error generating subtitles: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/serve_video/<folder_id>')
def serve_video(folder_id):
"""
Endpoint pro servírování video souboru z adresáře exports.
"""
try:
# Vytvoření cesty k souboru
current_directory = os.getcwd()
three_levels_up = os.path.abspath(os.path.join(current_directory, ".."))
base_path = os.path.abspath(os.path.join("exports", folder_id))
app.logger.error(f"Error serving video file: {base_path}/video.mp4")
# Vrátí soubor video.mp4
return send_from_directory(base_path, "video.mp4")
except Exception as e:
app.logger.error(f"Error serving video file: {e}")
return jsonify({'success': False, 'error': str(e)}), 404
@app.route('/upload_subtitle', methods=['POST'])
def upload_subtitle():
"""
Upload subtitle file either through form-data or raw text content
curl examples:
Form-data:
curl -X POST -F "subtitle=@/path/to/subtitles.srt" http://localhost:5000/upload_subtitle
Raw text:
curl -X POST --data-binary @/path/to/subtitles.srt -H "Content-Type: text/plain" http://localhost:5000/upload_subtitle
"""
if request.files and 'subtitle' in request.files:
# Handle form-data upload
subtitle = request.files['subtitle']
if subtitle.filename == '':
return jsonify({'error': 'No selected file'}), 400
content = subtitle.read()
else:
# Handle raw text upload
content = request.get_data()
if not content:
return jsonify({'error': 'No content received'}), 400
try:
# Decode content to string
if isinstance(content, bytes):
content = content.decode('utf-8')
return jsonify({'success': True, 'content': content})
except UnicodeDecodeError:
return jsonify(
{'error': 'Invalid subtitle file encoding, must be UTF-8'}), 400
with app.app_context():
# Create upload directory if it doesn't exist
os.makedirs(os.path.join('static', 'uploads'), exist_ok=True)
# Pokud je skript spuštěn přímo, exportujeme cookies
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)
#get_youtube_cookies()