# from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session | |
# import os, json, threading, time, signal, sys | |
# from datetime import datetime | |
# from extract_signed_segments_from_annotations import ClipExtractor, VideoClip | |
# import logging | |
# from dotenv import load_dotenv | |
# # Load environment variables | |
# load_dotenv() | |
# # Add this near the top with other environment variables | |
# bypass_auth = os.getenv('BYPASS_AUTH', 'false').lower() == 'true' | |
# # Configure logging first | |
# logging.basicConfig( | |
# level=logging.INFO, | |
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
# ) | |
# logger = logging.getLogger(__name__) | |
# # Hugging Face specific configuration | |
# is_hf_space = os.getenv('SPACE_ID') is not None | |
# if is_hf_space: | |
# logger.info("Running in Hugging Face Spaces environment") | |
# # Allow insecure transport for development in HF | |
# os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' | |
# # Ensure port is set correctly | |
# os.environ['PORT'] = '7860' | |
# app = Flask(__name__) | |
# app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') | |
# # Configure session for HF | |
# if is_hf_space: | |
# app.config['SESSION_COOKIE_SECURE'] = False | |
# app.config['SESSION_COOKIE_HTTPONLY'] = True | |
# app.config['SESSION_COOKIE_SAMESITE'] = None # Add this line | |
# app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours | |
# # Directory paths | |
# VIDEO_DIR = os.path.abspath("data/videos") | |
# ANNOTATIONS_DIR = os.path.abspath("data/annotations") | |
# TEMP_DIR = os.path.abspath("data/temp") | |
# WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") | |
# ALIGNMENTS_DIR = os.path.abspath("data/alignments") | |
# TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") | |
# # Ensure all required directories exist | |
# for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: | |
# os.makedirs(directory, exist_ok=True) | |
# # Global dictionaries for progress tracking | |
# clip_extraction_status = {} | |
# transcription_progress_status = {} | |
# # Graceful shutdown handler | |
# def graceful_shutdown(signum, frame): | |
# """Handle graceful shutdown on signals.""" | |
# logger.info(f"Received signal {signum}, shutting down gracefully...") | |
# # Clean up as needed here | |
# sys.exit(0) | |
# # Register signal handlers | |
# signal.signal(signal.SIGTERM, graceful_shutdown) | |
# signal.signal(signal.SIGINT, graceful_shutdown) | |
# # Login required decorator | |
# def login_required(f): | |
# from functools import wraps | |
# @wraps(f) | |
# def decorated_function(*args, **kwargs): | |
# if 'user' not in session: | |
# logger.info(f"User not in session, redirecting to login") | |
# return redirect(url_for('login')) | |
# return f(*args, **kwargs) | |
# return decorated_function | |
# # Allow specific users (for testing) | |
# def is_allowed_user(username): | |
# allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username | |
# allowed_users = [user.strip() for user in allowed_users_env.split(',')] | |
# return username in allowed_users or not is_hf_space # Allow all users in local dev | |
# def update_extraction_progress(video_id, current, total): | |
# percent = int((current / total) * 100) | |
# clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} | |
# def run_clip_extraction(video_id): | |
# try: | |
# base_dir = app.root_path | |
# extractor = ClipExtractor(base_dir) | |
# extractor.extract_clips_from_annotations( | |
# video_id, | |
# progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) | |
# ) | |
# if video_id in clip_extraction_status: | |
# status = clip_extraction_status[video_id] | |
# if status.get("percent", 0) < 100: | |
# update_extraction_progress(video_id, status["total"], status["total"]) | |
# else: | |
# update_extraction_progress(video_id, 1, 1) | |
# except Exception as e: | |
# logger.error(f"Error during clip extraction for {video_id}: {str(e)}") | |
# clip_extraction_status[video_id] = {"error": str(e)} | |
# def run_transcription(video_id): | |
# try: | |
# base_dir = app.root_path | |
# output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") | |
# # Check if transcription already exists and is valid. | |
# if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
# logger.info(f"Using cached transcription for video {video_id}.") | |
# transcription_progress_status[video_id] = {"status": "completed", "percent": 100} | |
# return | |
# video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") | |
# transcription_progress_status[video_id] = {"status": "started", "percent": 10} | |
# # Check if AWS credentials are available | |
# if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'): | |
# logger.warning("AWS credentials not found. Transcription will not work properly.") | |
# transcription_progress_status[video_id] = { | |
# "status": "error", | |
# "percent": 0, | |
# "message": "AWS credentials missing" | |
# } | |
# return | |
# # Run transcription via the imported function from get_transcription_with_amazon.py | |
# from get_transcription_with_amazon import get_word_timestamps | |
# word_timestamps = get_word_timestamps(video_path) | |
# with open(output_path, "w") as f: | |
# json.dump(word_timestamps, f, indent=4) | |
# transcription_progress_status[video_id] = {"status": "completed", "percent": 100} | |
# except Exception as e: | |
# logger.error(f"Error during transcription for {video_id}: {str(e)}") | |
# transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} | |
# # Authentication routes | |
# @app.route('/login') | |
# def login(): | |
# """Handle login for both local and HF environments.""" | |
# logger.info(f"Login route called. Headers: {dict(request.headers)}") | |
# if is_hf_space: | |
# username = request.headers.get('X-Spaces-Username') | |
# logger.info(f"Username from headers in login: {username}") | |
# if username and is_allowed_user(username): | |
# session['user'] = {'name': username, 'is_hf': True} | |
# return redirect(url_for('index')) | |
# else: | |
# # Redirect to the HF auth endpoint | |
# return redirect('/auth') | |
# else: | |
# # For local development | |
# session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} | |
# return redirect(url_for('index')) | |
# @app.route('/auth/callback') | |
# def auth_callback(): | |
# """This route will be called by Hugging Face after successful authentication.""" | |
# logger.info(f"Auth callback called. Headers: {dict(request.headers)}") | |
# if is_hf_space: | |
# # In Hugging Face Spaces, the user info is available in the request headers | |
# username = request.headers.get('X-Spaces-Username') | |
# if username: | |
# session['user'] = {'name': username, 'is_hf': True} | |
# return redirect(url_for('index')) | |
# else: | |
# return render_template('error.html', message="Authentication failed. No username provided.") | |
# return redirect(url_for('login')) | |
# # Replace the health check route with this improved version | |
# @app.route('/health') | |
# def health_check(): | |
# """Health check endpoint for container verification.""" | |
# # Log environment variables for debugging | |
# env_vars = { | |
# "FLASK_ENV": os.environ.get('FLASK_ENV', 'production'), | |
# "DEBUG": os.environ.get('DEBUG', 'Not set'), | |
# "SPACE_ID": os.environ.get('SPACE_ID', 'Not set'), | |
# "BYPASS_AUTH": os.environ.get('BYPASS_AUTH', 'Not set'), | |
# "SECRET_KEY": os.environ.get('SECRET_KEY', 'Not set')[:5] + '...' if os.environ.get('SECRET_KEY') else 'Not set' | |
# } | |
# logger.info(f"Health check called. Environment: {env_vars}") | |
# # Get session information for debugging | |
# session_info = dict(session) if session else None | |
# session_keys = list(session.keys()) if session else [] | |
# return jsonify({ | |
# "status": "healthy", | |
# "environment": env_vars, | |
# "session_keys": session_keys, | |
# "is_hf_space": is_hf_space, | |
# "bypass_auth": bypass_auth, | |
# "directories": { | |
# "videos": os.path.exists(VIDEO_DIR), | |
# "annotations": os.path.exists(ANNOTATIONS_DIR), | |
# "temp": os.path.exists(TEMP_DIR) | |
# } | |
# }) | |
# @app.route('/auth') | |
# def auth(): | |
# """This route handles HF authentication.""" | |
# logger.info(f"Auth route called. Headers: {dict(request.headers)}") | |
# # Force bypass auth to be true for debugging | |
# bypass_auth = True | |
# # If bypass is enabled, authenticate immediately | |
# if bypass_auth: | |
# logger.info("Auth bypass enabled, setting default user") | |
# session['user'] = {'name': 'Perilon', 'is_hf': True} | |
# return redirect(url_for('index')) | |
# # Normal authentication logic | |
# username = request.headers.get('X-Spaces-Username') | |
# logger.info(f"Username from headers in auth: {username}") | |
# if is_hf_space and username and is_allowed_user(username): | |
# logger.info(f"Setting user in session: {username}") | |
# session['user'] = {'name': username, 'is_hf': True} | |
# return redirect(url_for('index')) | |
# elif not is_hf_space: | |
# # For local development | |
# session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} | |
# return redirect(url_for('index')) | |
# else: | |
# # For HF with no valid username yet | |
# return render_template('error.html', message= | |
# "Waiting for Hugging Face authentication. If you continue to see this message, " | |
# "please make sure you're logged into Hugging Face and your username is allowed.") | |
# @app.before_request | |
# def check_auth(): | |
# """Check authentication before processing requests.""" | |
# # Skip authentication for certain routes and static files | |
# if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug', '/health'] or request.path.startswith('/static/'): | |
# return | |
# # Force bypass auth to be true for debugging | |
# bypass_auth = True | |
# # Log all request paths to help troubleshoot | |
# logger.debug(f"Request path: {request.path}, User in session: {'user' in session}") | |
# if bypass_auth: | |
# # Set default user for bypass mode if not already set | |
# if 'user' not in session: | |
# session['user'] = {'name': 'Perilon', 'is_hf': True} | |
# return | |
# if is_hf_space: | |
# # Check for HF username header | |
# username = request.headers.get('X-Spaces-Username') | |
# if 'user' in session: | |
# logger.debug(f"User in session: {session['user']}") | |
# return | |
# if username and is_allowed_user(username): | |
# logger.info(f"Setting user from headers: {username}") | |
# session['user'] = {'name': username, 'is_hf': True} | |
# return | |
# # No valid user in session or headers | |
# logger.info(f"No authenticated user, redirecting to /auth") | |
# return redirect('/auth') | |
# elif 'user' not in session: | |
# return redirect(url_for('login')) | |
# @app.route('/logout') | |
# def logout(): | |
# """Clear session and redirect to login.""" | |
# session.clear() # Clear the entire session | |
# if is_hf_space: | |
# return redirect('/auth/logout') | |
# return redirect(url_for('login')) | |
# @app.route('/debug') | |
# def debug_info(): | |
# """Return debug information.""" | |
# cookies = {key: request.cookies.get(key) for key in request.cookies.keys()} | |
# info = { | |
# "session": dict(session) if session else None, | |
# "headers": dict(request.headers), | |
# "cookies": cookies, | |
# "is_hf_space": is_hf_space, | |
# "allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'), | |
# "app_config": {k: str(v) for k, v in app.config.items() if k in | |
# ['SESSION_COOKIE_SECURE', 'SESSION_COOKIE_HTTPONLY', | |
# 'SESSION_COOKIE_SAMESITE', 'PERMANENT_SESSION_LIFETIME']}, | |
# } | |
# return jsonify(info) | |
# # Main application routes | |
# @app.route('/') | |
# @login_required | |
# def index(): | |
# """Main entry point, redirects to video selection.""" | |
# return redirect(url_for('select_video')) | |
# @app.route('/select_video') | |
# @login_required | |
# def select_video(): | |
# """Page to select a video for annotation.""" | |
# if not os.path.exists(VIDEO_DIR): | |
# return render_template('error.html', message="Video directory not found.") | |
# videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] | |
# video_ids = [os.path.splitext(v)[0] for v in videos] | |
# return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) | |
# @app.route('/player/<video_id>') | |
# @login_required | |
# def player(video_id): | |
# """Video player page for annotation.""" | |
# return render_template('player.html', video_id=video_id, user=session.get('user')) | |
# @app.route('/videos') | |
# @login_required | |
# def get_videos(): | |
# """API endpoint to get available videos.""" | |
# if not os.path.exists(VIDEO_DIR): | |
# return jsonify({'error': 'Video directory not found'}), 404 | |
# videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] | |
# if not videos: | |
# return jsonify({'error': 'No videos found'}), 404 | |
# return jsonify(videos) | |
# @app.route('/video/<path:filename>') | |
# @login_required | |
# def serve_video(filename): | |
# """Serve a video file.""" | |
# if not os.path.exists(os.path.join(VIDEO_DIR, filename)): | |
# return jsonify({'error': 'Video not found'}), 404 | |
# return send_from_directory(VIDEO_DIR, filename) | |
# @app.route('/save_annotations', methods=['POST']) | |
# @login_required | |
# def save_annotations(): | |
# """Save annotation data.""" | |
# data = request.json | |
# if not data or 'video' not in data or 'timestamps' not in data: | |
# return jsonify({'success': False, 'message': 'Invalid data'}), 400 | |
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") | |
# annotation_data = { | |
# "video_name": data['video'] + ".mp4", | |
# "timestamps": sorted(data['timestamps']), | |
# "annotation_date": datetime.now().isoformat(), | |
# "annotated_by": session.get('user', {}).get('name', 'unknown') | |
# } | |
# with open(annotation_file, 'w') as f: | |
# json.dump(annotation_data, f, indent=4) | |
# return jsonify({'success': True, 'message': 'Annotations saved successfully'}) | |
# @app.route('/get_annotations/<path:video_name>') | |
# @login_required | |
# def get_annotations(video_name): | |
# """Get annotations for a video.""" | |
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") | |
# if not os.path.exists(annotation_file): | |
# return jsonify({'error': 'No annotations found'}), 404 | |
# with open(annotation_file, 'r') as f: | |
# annotations = json.load(f) | |
# return jsonify(annotations) | |
# @app.route("/alignment/<video_id>") | |
# @login_required | |
# def alignment_mode(video_id): | |
# """Page for aligning sign language with transcribed text.""" | |
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") | |
# if not os.path.exists(annotation_file): | |
# return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") | |
# with open(annotation_file, 'r') as f: | |
# annotations = json.load(f) | |
# return render_template( | |
# "alignment.html", | |
# video_id=video_id, | |
# total_clips=len(annotations['timestamps']) - 1, | |
# user=session.get('user') | |
# ) | |
# @app.route("/api/transcript/<video_id>") | |
# @login_required | |
# def get_transcript(video_id): | |
# """Get transcript for a video.""" | |
# timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") | |
# logger.info(f"Attempting to load word timestamps from: {timestamps_file}") | |
# if not os.path.exists(timestamps_file): | |
# logger.warning(f"Word timestamps file not found: {timestamps_file}") | |
# return jsonify({ | |
# "status": "error", | |
# "message": "No word timestamps found for this video" | |
# }), 404 | |
# try: | |
# with open(timestamps_file, 'r') as f: | |
# word_data = json.load(f) | |
# full_text = " ".join(item["punctuated_word"] for item in word_data) | |
# words_with_times = [{ | |
# "word": item["punctuated_word"], | |
# "start": float(item["start_time"]), | |
# "end": float(item["end_time"]) | |
# } for item in word_data] | |
# logger.info(f"Successfully created transcript ({len(full_text)} characters)") | |
# return jsonify({ | |
# "status": "success", | |
# "text": full_text, | |
# "words": words_with_times | |
# }) | |
# except Exception as e: | |
# logger.error(f"Error processing word timestamps: {str(e)}") | |
# return jsonify({ | |
# "status": "error", | |
# "message": f"Error processing word timestamps: {str(e)}" | |
# }), 500 | |
# @app.route("/api/word_timestamps/<video_id>") | |
# @login_required | |
# def get_word_timestamps(video_id): | |
# """Get word-level timestamps for a video.""" | |
# timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") | |
# logger.info(f"Attempting to load word timestamps from: {timestamps_file}") | |
# if not os.path.exists(timestamps_file): | |
# logger.warning(f"Word timestamps file not found: {timestamps_file}") | |
# return jsonify({ | |
# "status": "error", | |
# "message": "No word timestamps found for this video" | |
# }), 404 | |
# try: | |
# with open(timestamps_file, 'r') as f: | |
# word_data = json.load(f) | |
# logger.info(f"Successfully loaded {len(word_data)} word timestamps") | |
# return jsonify({ | |
# "status": "success", | |
# "words": word_data | |
# }) | |
# except Exception as e: | |
# logger.error(f"Error processing word timestamps: {str(e)}") | |
# return jsonify({ | |
# "status": "error", | |
# "message": f"Error processing word timestamps: {str(e)}" | |
# }), 500 | |
# @app.route("/api/clips/<video_id>") | |
# @login_required | |
# def get_video_clips(video_id): | |
# """Get clips for a video.""" | |
# try: | |
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") | |
# if not os.path.exists(annotation_file): | |
# raise FileNotFoundError("Annotations not found") | |
# with open(annotation_file, 'r') as f: | |
# annotations = json.load(f) | |
# timestamps = annotations['timestamps'] | |
# clips = [] | |
# for i in range(len(timestamps)-1): | |
# clips.append({ | |
# "index": i, | |
# "start": timestamps[i], | |
# "end": timestamps[i+1], | |
# "path": f"/clip/{video_id}/{i}" | |
# }) | |
# return jsonify({ | |
# "status": "success", | |
# "clips": clips | |
# }) | |
# except Exception as e: | |
# logger.error(f"Error getting clips: {str(e)}") | |
# return jsonify({ | |
# "status": "error", | |
# "message": str(e) | |
# }), 500 | |
# @app.route("/clip/<video_id>/<int:clip_index>") | |
# @login_required | |
# def serve_clip(video_id, clip_index): | |
# """Serve a specific clip.""" | |
# clip_path = os.path.join( | |
# TEMP_DIR, | |
# f"{video_id}_clip_{clip_index:03d}.mp4" | |
# ) | |
# logger.info(f"Attempting to serve clip: {clip_path}") | |
# if not os.path.exists(clip_path): | |
# logger.error(f"Clip not found: {clip_path}") | |
# return jsonify({ | |
# "status": "error", | |
# "message": "Clip not found" | |
# }), 404 | |
# return send_file(clip_path, mimetype="video/mp4") | |
# @app.route("/api/save_alignments", methods=["POST"]) | |
# @login_required | |
# def save_alignments(): | |
# """Save alignment data.""" | |
# try: | |
# data = request.json | |
# if not data or 'video_id' not in data or 'alignments' not in data: | |
# return jsonify({'success': False, 'message': 'Invalid data'}), 400 | |
# # Add user information to the alignments | |
# for alignment in data['alignments']: | |
# if alignment: | |
# alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') | |
# output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") | |
# with open(output_path, "w") as f: | |
# json.dump(data['alignments'], f, indent=2) | |
# return jsonify({ | |
# "success": True, | |
# "message": "Alignments saved successfully" | |
# }) | |
# except Exception as e: | |
# logger.error(f"Error saving alignments: {str(e)}") | |
# return jsonify({ | |
# "success": False, | |
# "message": str(e) | |
# }), 500 | |
# @app.route("/api/extract_clips/<video_id>") | |
# @login_required | |
# def extract_clips_for_video(video_id): | |
# """Extract clips and start transcription for a video.""" | |
# status = clip_extraction_status.get(video_id, {}) | |
# if status.get("percent", 0) < 100: | |
# thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) | |
# thread.start() | |
# if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: | |
# thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) | |
# thread_trans.start() | |
# return jsonify({"status": "started"}) | |
# @app.route("/api/clip_progress/<video_id>") | |
# @login_required | |
# def clip_progress(video_id): | |
# """Get clip extraction progress.""" | |
# progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) | |
# return jsonify(progress) | |
# @app.route("/api/transcription_progress/<video_id>") | |
# @login_required | |
# def transcription_progress(video_id): | |
# """Get transcription progress.""" | |
# progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) | |
# return jsonify(progress) | |
# if __name__ == '__main__': | |
# try: | |
# # Print diagnostic information | |
# print("=" * 50) | |
# print(f"Starting app with configuration:") | |
# print(f"- Running in HF Space: {is_hf_space}") | |
# print(f"- Auth bypass: {bypass_auth}") | |
# print(f"- Port: {os.getenv('PORT', 5000)}") | |
# print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}") | |
# print("=" * 50) | |
# port = int(os.getenv('PORT', 5000)) | |
# app.run(host='0.0.0.0', port=port, debug=True) | |
# except Exception as e: | |
# print(f"Error starting the application: {e}") | |
# import traceback | |
# traceback.print_exc() | |
from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session | |
import os, json, threading, time, signal, sys | |
from datetime import datetime | |
from extract_signed_segments_from_annotations import ClipExtractor, VideoClip | |
import logging | |
from dotenv import load_dotenv | |
import boto3 | |
from botocore.exceptions import ClientError | |
import tempfile | |
import uuid | |
import requests | |
from urllib.parse import urlparse | |
# Load environment variables | |
load_dotenv() | |
# Add this near the top with other environment variables | |
bypass_auth = os.getenv('BYPASS_AUTH', 'false').lower() == 'true' | |
# Configure logging first | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Hugging Face specific configuration | |
is_hf_space = os.getenv('SPACE_ID') is not None | |
if is_hf_space: | |
logger.info("Running in Hugging Face Spaces environment") | |
# Allow insecure transport for development in HF | |
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' | |
# Ensure port is set correctly | |
os.environ['PORT'] = '7860' | |
app = Flask(__name__) | |
app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') | |
# Configure session for HF | |
if is_hf_space: | |
app.config['SESSION_COOKIE_SECURE'] = False | |
app.config['SESSION_COOKIE_HTTPONLY'] = True | |
app.config['SESSION_COOKIE_SAMESITE'] = None # Add this line | |
app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours | |
# Directory paths | |
VIDEO_DIR = os.path.abspath("data/videos") | |
ANNOTATIONS_DIR = os.path.abspath("data/annotations") | |
TEMP_DIR = os.path.abspath("data/temp") | |
WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") | |
ALIGNMENTS_DIR = os.path.abspath("data/alignments") | |
TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") | |
# S3 configuration | |
S3_BUCKET = os.getenv('S3_BUCKET', "sorenson-ai-sb-scratch") | |
S3_VIDEO_PREFIX = os.getenv('S3_VIDEO_PREFIX', "awilkinson/kylie_dataset_videos_for_alignment_webapp/") | |
USE_S3_FOR_VIDEOS = os.getenv('USE_S3_FOR_VIDEOS', 'true').lower() == 'true' | |
# Ensure all required directories exist | |
for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: | |
os.makedirs(directory, exist_ok=True) | |
# Global dictionaries for progress tracking | |
clip_extraction_status = {} | |
transcription_progress_status = {} | |
# S3 helper functions | |
def get_s3_client(): | |
"""Get a boto3 S3 client.""" | |
return boto3.client( | |
's3', | |
region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-west-2'), | |
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), | |
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY') | |
) | |
def list_s3_videos(): | |
"""List all videos in the S3 bucket with the given prefix.""" | |
try: | |
s3_client = get_s3_client() | |
response = s3_client.list_objects_v2( | |
Bucket=S3_BUCKET, | |
Prefix=S3_VIDEO_PREFIX | |
) | |
if 'Contents' not in response: | |
logger.warning(f"No videos found in S3 bucket {S3_BUCKET} with prefix {S3_VIDEO_PREFIX}") | |
return [] | |
# Extract video IDs (filenames without extension) from S3 keys | |
videos = [] | |
for item in response['Contents']: | |
key = item['Key'] | |
if key.endswith('.mp4'): | |
# Extract just the filename without extension | |
filename = os.path.basename(key) | |
video_id = os.path.splitext(filename)[0] | |
videos.append(video_id) | |
return videos | |
except ClientError as e: | |
logger.error(f"Error listing videos from S3: {str(e)}") | |
return [] | |
def download_video_from_s3(video_id): | |
"""Download a video from S3 to the local videos directory.""" | |
video_filename = f"{video_id}.mp4" | |
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" | |
local_path = os.path.join(VIDEO_DIR, video_filename) | |
# Check if the file already exists locally | |
if os.path.exists(local_path): | |
logger.info(f"Video {video_id} already exists locally.") | |
return local_path | |
try: | |
logger.info(f"Downloading video {video_id} from S3...") | |
s3_client = get_s3_client() | |
s3_client.download_file(S3_BUCKET, s3_key, local_path) | |
logger.info(f"Video {video_id} downloaded successfully to {local_path}") | |
return local_path | |
except ClientError as e: | |
logger.error(f"Error downloading video from S3: {str(e)}") | |
return None | |
def generate_presigned_url(video_id, expiration=3600): | |
"""Generate a presigned URL for direct access to the video in S3.""" | |
video_filename = f"{video_id}.mp4" | |
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" | |
try: | |
s3_client = get_s3_client() | |
url = s3_client.generate_presigned_url( | |
'get_object', | |
Params={'Bucket': S3_BUCKET, 'Key': s3_key}, | |
ExpiresIn=expiration | |
) | |
return url | |
except ClientError as e: | |
logger.error(f"Error generating presigned URL: {str(e)}") | |
return None | |
# Graceful shutdown handler | |
def graceful_shutdown(signum, frame): | |
"""Handle graceful shutdown on signals.""" | |
logger.info(f"Received signal {signum}, shutting down gracefully...") | |
# Clean up as needed here | |
sys.exit(0) | |
# Register signal handlers | |
signal.signal(signal.SIGTERM, graceful_shutdown) | |
signal.signal(signal.SIGINT, graceful_shutdown) | |
# Login required decorator | |
def login_required(f): | |
from functools import wraps | |
def decorated_function(*args, **kwargs): | |
if 'user' not in session: | |
logger.info(f"User not in session, redirecting to login") | |
return redirect(url_for('login')) | |
return f(*args, **kwargs) | |
return decorated_function | |
# Allow specific users (for testing) | |
def is_allowed_user(username): | |
allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username | |
allowed_users = [user.strip() for user in allowed_users_env.split(',')] | |
return username in allowed_users or not is_hf_space # Allow all users in local dev | |
def update_extraction_progress(video_id, current, total): | |
percent = int((current / total) * 100) | |
clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} | |
def run_clip_extraction(video_id): | |
try: | |
base_dir = app.root_path | |
extractor = ClipExtractor(base_dir) | |
extractor.extract_clips_from_annotations( | |
video_id, | |
progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) | |
) | |
if video_id in clip_extraction_status: | |
status = clip_extraction_status[video_id] | |
if status.get("percent", 0) < 100: | |
update_extraction_progress(video_id, status["total"], status["total"]) | |
else: | |
update_extraction_progress(video_id, 1, 1) | |
except Exception as e: | |
logger.error(f"Error during clip extraction for {video_id}: {str(e)}") | |
clip_extraction_status[video_id] = {"error": str(e)} | |
def run_transcription(video_id): | |
try: | |
base_dir = app.root_path | |
output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") | |
# Check if transcription already exists and is valid. | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
logger.info(f"Using cached transcription for video {video_id}.") | |
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} | |
return | |
# Download video from S3 if needed | |
if USE_S3_FOR_VIDEOS: | |
video_path = download_video_from_s3(video_id) | |
if not video_path: | |
transcription_progress_status[video_id] = { | |
"status": "error", | |
"percent": 0, | |
"message": f"Failed to download video {video_id} from S3" | |
} | |
return | |
else: | |
video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") | |
transcription_progress_status[video_id] = {"status": "started", "percent": 10} | |
# Check if AWS credentials are available | |
if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'): | |
logger.warning("AWS credentials not found. Transcription will not work properly.") | |
transcription_progress_status[video_id] = { | |
"status": "error", | |
"percent": 0, | |
"message": "AWS credentials missing" | |
} | |
return | |
# Run transcription via the imported function from get_transcription_with_amazon.py | |
from get_transcription_with_amazon import get_word_timestamps | |
word_timestamps = get_word_timestamps(video_path) | |
with open(output_path, "w") as f: | |
json.dump(word_timestamps, f, indent=4) | |
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} | |
except Exception as e: | |
logger.error(f"Error during transcription for {video_id}: {str(e)}") | |
transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} | |
# Authentication routes | |
def login(): | |
"""Handle login for both local and HF environments.""" | |
logger.info(f"Login route called. Headers: {dict(request.headers)}") | |
if is_hf_space: | |
username = request.headers.get('X-Spaces-Username') | |
logger.info(f"Username from headers in login: {username}") | |
if username and is_allowed_user(username): | |
session['user'] = {'name': username, 'is_hf': True} | |
return redirect(url_for('index')) | |
else: | |
# Redirect to the HF auth endpoint | |
return redirect('/auth') | |
else: | |
# For local development | |
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} | |
return redirect(url_for('index')) | |
def auth_callback(): | |
"""This route will be called by Hugging Face after successful authentication.""" | |
logger.info(f"Auth callback called. Headers: {dict(request.headers)}") | |
if is_hf_space: | |
# In Hugging Face Spaces, the user info is available in the request headers | |
username = request.headers.get('X-Spaces-Username') | |
if username: | |
session['user'] = {'name': username, 'is_hf': True} | |
return redirect(url_for('index')) | |
else: | |
return render_template('error.html', message="Authentication failed. No username provided.") | |
return redirect(url_for('login')) | |
def health_check(): | |
"""Health check endpoint for container verification.""" | |
# Log environment variables for debugging | |
env_vars = { | |
"FLASK_ENV": os.environ.get('FLASK_ENV', 'production'), | |
"DEBUG": os.environ.get('DEBUG', 'Not set'), | |
"SPACE_ID": os.environ.get('SPACE_ID', 'Not set'), | |
"BYPASS_AUTH": os.environ.get('BYPASS_AUTH', 'Not set'), | |
"SECRET_KEY": os.environ.get('SECRET_KEY', 'Not set')[:5] + '...' if os.environ.get('SECRET_KEY') else 'Not set', | |
"S3_BUCKET": os.environ.get('S3_BUCKET', 'Not set'), | |
"S3_VIDEO_PREFIX": os.environ.get('S3_VIDEO_PREFIX', 'Not set'), | |
"USE_S3_FOR_VIDEOS": os.environ.get('USE_S3_FOR_VIDEOS', 'Not set') | |
} | |
logger.info(f"Health check called. Environment: {env_vars}") | |
# Get session information for debugging | |
session_info = dict(session) if session else None | |
session_keys = list(session.keys()) if session else [] | |
return jsonify({ | |
"status": "healthy", | |
"environment": env_vars, | |
"session_keys": session_keys, | |
"is_hf_space": is_hf_space, | |
"bypass_auth": bypass_auth, | |
"directories": { | |
"videos": os.path.exists(VIDEO_DIR), | |
"annotations": os.path.exists(ANNOTATIONS_DIR), | |
"temp": os.path.exists(TEMP_DIR) | |
} | |
}) | |
def auth(): | |
"""This route handles HF authentication.""" | |
logger.info(f"Auth route called. Headers: {dict(request.headers)}") | |
# Force bypass auth to be true for debugging | |
bypass_auth = True | |
# If bypass is enabled, authenticate immediately | |
if bypass_auth: | |
logger.info("Auth bypass enabled, setting default user") | |
session['user'] = {'name': 'Perilon', 'is_hf': True} | |
return redirect(url_for('index')) | |
# Normal authentication logic | |
username = request.headers.get('X-Spaces-Username') | |
logger.info(f"Username from headers in auth: {username}") | |
if is_hf_space and username and is_allowed_user(username): | |
logger.info(f"Setting user in session: {username}") | |
session['user'] = {'name': username, 'is_hf': True} | |
return redirect(url_for('index')) | |
elif not is_hf_space: | |
# For local development | |
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} | |
return redirect(url_for('index')) | |
else: | |
# For HF with no valid username yet | |
return render_template('error.html', message= | |
"Waiting for Hugging Face authentication. If you continue to see this message, " | |
"please make sure you're logged into Hugging Face and your username is allowed.") | |
def check_auth(): | |
"""Check authentication before processing requests.""" | |
# Skip authentication for certain routes and static files | |
if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug', '/health'] or request.path.startswith('/static/'): | |
return | |
# Force bypass auth to be true for debugging | |
bypass_auth = True | |
# Log all request paths to help troubleshoot | |
logger.debug(f"Request path: {request.path}, User in session: {'user' in session}") | |
if bypass_auth: | |
# Set default user for bypass mode if not already set | |
if 'user' not in session: | |
session['user'] = {'name': 'Perilon', 'is_hf': True} | |
return | |
if is_hf_space: | |
# Check for HF username header | |
username = request.headers.get('X-Spaces-Username') | |
if 'user' in session: | |
logger.debug(f"User in session: {session['user']}") | |
return | |
if username and is_allowed_user(username): | |
logger.info(f"Setting user from headers: {username}") | |
session['user'] = {'name': username, 'is_hf': True} | |
return | |
# No valid user in session or headers | |
logger.info(f"No authenticated user, redirecting to /auth") | |
return redirect('/auth') | |
elif 'user' not in session: | |
return redirect(url_for('login')) | |
def logout(): | |
"""Clear session and redirect to login.""" | |
session.clear() # Clear the entire session | |
if is_hf_space: | |
return redirect('/auth/logout') | |
return redirect(url_for('login')) | |
def debug_info(): | |
"""Return debug information.""" | |
cookies = {key: request.cookies.get(key) for key in request.cookies.keys()} | |
info = { | |
"session": dict(session) if session else None, | |
"headers": dict(request.headers), | |
"cookies": cookies, | |
"is_hf_space": is_hf_space, | |
"allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'), | |
"app_config": {k: str(v) for k, v in app.config.items() if k in | |
['SESSION_COOKIE_SECURE', 'SESSION_COOKIE_HTTPONLY', | |
'SESSION_COOKIE_SAMESITE', 'PERMANENT_SESSION_LIFETIME']}, | |
"s3_config": { | |
"S3_BUCKET": S3_BUCKET, | |
"S3_VIDEO_PREFIX": S3_VIDEO_PREFIX, | |
"USE_S3_FOR_VIDEOS": USE_S3_FOR_VIDEOS | |
} | |
} | |
return jsonify(info) | |
# Main application routes | |
def index(): | |
"""Main entry point, redirects to video selection.""" | |
return redirect(url_for('select_video')) | |
def select_video(): | |
"""Page to select a video for annotation.""" | |
if USE_S3_FOR_VIDEOS: | |
video_ids = list_s3_videos() | |
else: | |
if not os.path.exists(VIDEO_DIR): | |
return render_template('error.html', message="Video directory not found.") | |
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] | |
video_ids = [os.path.splitext(v)[0] for v in videos] | |
return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) | |
def player(video_id): | |
"""Video player page for annotation.""" | |
return render_template('player.html', video_id=video_id, user=session.get('user')) | |
def get_videos(): | |
"""API endpoint to get available videos.""" | |
if USE_S3_FOR_VIDEOS: | |
videos = list_s3_videos() | |
if not videos: | |
return jsonify({'error': 'No videos found in S3'}), 404 | |
# Return just the filenames with .mp4 extension for compatibility | |
return jsonify([f"{vid}.mp4" for vid in videos]) | |
else: | |
# Original local file behavior | |
if not os.path.exists(VIDEO_DIR): | |
return jsonify({'error': 'Video directory not found'}), 404 | |
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] | |
if not videos: | |
return jsonify({'error': 'No videos found'}), 404 | |
return jsonify(videos) | |
def serve_video(filename): | |
"""Serve a video file from S3 or local storage.""" | |
video_id = os.path.splitext(filename)[0] # Remove extension | |
if USE_S3_FOR_VIDEOS: | |
# Option 1: Generate a presigned URL and redirect | |
presigned_url = generate_presigned_url(video_id) | |
if presigned_url: | |
return redirect(presigned_url) | |
# Option 2 (fallback): Download from S3 to local temporary storage and serve | |
local_path = download_video_from_s3(video_id) | |
if local_path and os.path.exists(local_path): | |
return send_from_directory(VIDEO_DIR, filename) | |
return jsonify({'error': 'Video not found in S3'}), 404 | |
else: | |
# Original local file behavior | |
if not os.path.exists(os.path.join(VIDEO_DIR, filename)): | |
return jsonify({'error': 'Video not found'}), 404 | |
return send_from_directory(VIDEO_DIR, filename) | |
def save_annotations(): | |
"""Save annotation data.""" | |
data = request.json | |
if not data or 'video' not in data or 'timestamps' not in data: | |
return jsonify({'success': False, 'message': 'Invalid data'}), 400 | |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") | |
annotation_data = { | |
"video_name": data['video'] + ".mp4", | |
"timestamps": sorted(data['timestamps']), | |
"annotation_date": datetime.now().isoformat(), | |
"annotated_by": session.get('user', {}).get('name', 'unknown') | |
} | |
with open(annotation_file, 'w') as f: | |
json.dump(annotation_data, f, indent=4) | |
return jsonify({'success': True, 'message': 'Annotations saved successfully'}) | |
def get_annotations(video_name): | |
"""Get annotations for a video.""" | |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") | |
if not os.path.exists(annotation_file): | |
return jsonify({'error': 'No annotations found'}), 404 | |
with open(annotation_file, 'r') as f: | |
annotations = json.load(f) | |
return jsonify(annotations) | |
def alignment_mode(video_id): | |
"""Page for aligning sign language with transcribed text.""" | |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") | |
if not os.path.exists(annotation_file): | |
return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") | |
with open(annotation_file, 'r') as f: | |
annotations = json.load(f) | |
return render_template( | |
"alignment.html", | |
video_id=video_id, | |
total_clips=len(annotations['timestamps']) - 1, | |
user=session.get('user') | |
) | |
def get_transcript(video_id): | |
"""Get transcript for a video.""" | |
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") | |
logger.info(f"Attempting to load word timestamps from: {timestamps_file}") | |
if not os.path.exists(timestamps_file): | |
logger.warning(f"Word timestamps file not found: {timestamps_file}") | |
return jsonify({ | |
"status": "error", | |
"message": "No word timestamps found for this video" | |
}), 404 | |
try: | |
with open(timestamps_file, 'r') as f: | |
word_data = json.load(f) | |
full_text = " ".join(item["punctuated_word"] for item in word_data) | |
words_with_times = [{ | |
"word": item["punctuated_word"], | |
"start": float(item["start_time"]), | |
"end": float(item["end_time"]) | |
} for item in word_data] | |
logger.info(f"Successfully created transcript ({len(full_text)} characters)") | |
return jsonify({ | |
"status": "success", | |
"text": full_text, | |
"words": words_with_times | |
}) | |
except Exception as e: | |
logger.error(f"Error processing word timestamps: {str(e)}") | |
return jsonify({ | |
"status": "error", | |
"message": f"Error processing word timestamps: {str(e)}" | |
}), 500 | |
def get_word_timestamps(video_id): | |
"""Get word-level timestamps for a video.""" | |
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") | |
logger.info(f"Attempting to load word timestamps from: {timestamps_file}") | |
if not os.path.exists(timestamps_file): | |
logger.warning(f"Word timestamps file not found: {timestamps_file}") | |
return jsonify({ | |
"status": "error", | |
"message": "No word timestamps found for this video" | |
}), 404 | |
try: | |
with open(timestamps_file, 'r') as f: | |
word_data = json.load(f) | |
logger.info(f"Successfully loaded {len(word_data)} word timestamps") | |
return jsonify({ | |
"status": "success", | |
"words": word_data | |
}) | |
except Exception as e: | |
logger.error(f"Error processing word timestamps: {str(e)}") | |
return jsonify({ | |
"status": "error", | |
"message": f"Error processing word timestamps: {str(e)}" | |
}), 500 | |
def get_video_clips(video_id): | |
"""Get clips for a video.""" | |
try: | |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") | |
if not os.path.exists(annotation_file): | |
raise FileNotFoundError("Annotations not found") | |
with open(annotation_file, 'r') as f: | |
annotations = json.load(f) | |
timestamps = annotations['timestamps'] | |
clips = [] | |
for i in range(len(timestamps)-1): | |
clips.append({ | |
"index": i, | |
"start": timestamps[i], | |
"end": timestamps[i+1], | |
"path": f"/clip/{video_id}/{i}" | |
}) | |
return jsonify({ | |
"status": "success", | |
"clips": clips | |
}) | |
except Exception as e: | |
logger.error(f"Error getting clips: {str(e)}") | |
return jsonify({ | |
"status": "error", | |
"message": str(e) | |
}), 500 | |
def serve_clip(video_id, clip_index): | |
"""Serve a specific clip.""" | |
clip_path = os.path.join( | |
TEMP_DIR, | |
f"{video_id}_clip_{clip_index:03d}.mp4" | |
) | |
logger.info(f"Attempting to serve clip: {clip_path}") | |
if not os.path.exists(clip_path): | |
logger.error(f"Clip not found: {clip_path}") | |
return jsonify({ | |
"status": "error", | |
"message": "Clip not found" | |
}), 404 | |
return send_file(clip_path, mimetype="video/mp4") | |
def save_alignments(): | |
"""Save alignment data.""" | |
try: | |
data = request.json | |
if not data or 'video_id' not in data or 'alignments' not in data: | |
return jsonify({'success': False, 'message': 'Invalid data'}), 400 | |
# Add user information to the alignments | |
for alignment in data['alignments']: | |
if alignment: | |
alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') | |
output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") | |
with open(output_path, "w") as f: | |
json.dump(data['alignments'], f, indent=2) | |
return jsonify({ | |
"success": True, | |
"message": "Alignments saved successfully" | |
}) | |
except Exception as e: | |
logger.error(f"Error saving alignments: {str(e)}") | |
return jsonify({ | |
"success": False, | |
"message": str(e) | |
}), 500 | |
def extract_clips_for_video(video_id): | |
"""Extract clips and start transcription for a video.""" | |
# If using S3, ensure the video is downloaded first | |
if USE_S3_FOR_VIDEOS: | |
video_path = download_video_from_s3(video_id) | |
if not video_path: | |
return jsonify({ | |
"status": "error", | |
"message": f"Failed to download video {video_id} from S3" | |
}), 404 | |
status = clip_extraction_status.get(video_id, {}) | |
if status.get("percent", 0) < 100: | |
thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) | |
thread.start() | |
if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: | |
thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) | |
thread_trans.start() | |
return jsonify({"status": "started"}) | |
def clip_progress(video_id): | |
"""Get clip extraction progress.""" | |
progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) | |
return jsonify(progress) | |
def transcription_progress(video_id): | |
"""Get transcription progress.""" | |
progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) | |
return jsonify(progress) | |
if __name__ == '__main__': | |
try: | |
# Print diagnostic information | |
print("=" * 50) | |
print(f"Starting app with configuration:") | |
print(f"- Running in HF Space: {is_hf_space}") | |
print(f"- Auth bypass: {bypass_auth}") | |
print(f"- Port: {os.getenv('PORT', 5000)}") | |
print(f"- S3 for videos: {USE_S3_FOR_VIDEOS}") | |
print(f"- S3 bucket: {S3_BUCKET}") | |
print(f"- S3 prefix: {S3_VIDEO_PREFIX}") | |
print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}") | |
if USE_S3_FOR_VIDEOS: | |
try: | |
s3_videos = list_s3_videos() | |
print(f"- Available S3 videos: {s3_videos if s3_videos else 'None'}") | |
except Exception as e: | |
print(f"- Error listing S3 videos: {str(e)}") | |
print("=" * 50) | |
port = int(os.getenv('PORT', 5000)) | |
app.run(host='0.0.0.0', port=port, debug=True) | |
except Exception as e: | |
print(f"Error starting the application: {e}") | |
import traceback | |
traceback.print_exc() |