# from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session # import os, json, threading, time, signal, sys # from datetime import datetime # from extract_signed_segments_from_annotations import ClipExtractor, VideoClip # import logging # from dotenv import load_dotenv # # Load environment variables # load_dotenv() # # Add this near the top with other environment variables # bypass_auth = os.getenv('BYPASS_AUTH', 'false').lower() == 'true' # # Configure logging first # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # ) # logger = logging.getLogger(__name__) # # Hugging Face specific configuration # is_hf_space = os.getenv('SPACE_ID') is not None # if is_hf_space: # logger.info("Running in Hugging Face Spaces environment") # # Allow insecure transport for development in HF # os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' # # Ensure port is set correctly # os.environ['PORT'] = '7860' # app = Flask(__name__) # app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') # # Configure session for HF # if is_hf_space: # app.config['SESSION_COOKIE_SECURE'] = False # app.config['SESSION_COOKIE_HTTPONLY'] = True # app.config['SESSION_COOKIE_SAMESITE'] = None # Add this line # app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours # # Directory paths # VIDEO_DIR = os.path.abspath("data/videos") # ANNOTATIONS_DIR = os.path.abspath("data/annotations") # TEMP_DIR = os.path.abspath("data/temp") # WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") # ALIGNMENTS_DIR = os.path.abspath("data/alignments") # TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") # # Ensure all required directories exist # for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: # os.makedirs(directory, exist_ok=True) # # Global dictionaries for progress tracking # clip_extraction_status = {} # transcription_progress_status = {} # # Graceful shutdown handler # def graceful_shutdown(signum, frame): # """Handle graceful shutdown on signals.""" # logger.info(f"Received signal {signum}, shutting down gracefully...") # # Clean up as needed here # sys.exit(0) # # Register signal handlers # signal.signal(signal.SIGTERM, graceful_shutdown) # signal.signal(signal.SIGINT, graceful_shutdown) # # Login required decorator # def login_required(f): # from functools import wraps # @wraps(f) # def decorated_function(*args, **kwargs): # if 'user' not in session: # logger.info(f"User not in session, redirecting to login") # return redirect(url_for('login')) # return f(*args, **kwargs) # return decorated_function # # Allow specific users (for testing) # def is_allowed_user(username): # allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username # allowed_users = [user.strip() for user in allowed_users_env.split(',')] # return username in allowed_users or not is_hf_space # Allow all users in local dev # def update_extraction_progress(video_id, current, total): # percent = int((current / total) * 100) # clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} # def run_clip_extraction(video_id): # try: # base_dir = app.root_path # extractor = ClipExtractor(base_dir) # extractor.extract_clips_from_annotations( # video_id, # progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) # ) # if video_id in clip_extraction_status: # status = clip_extraction_status[video_id] # if status.get("percent", 0) < 100: # update_extraction_progress(video_id, status["total"], status["total"]) # else: # update_extraction_progress(video_id, 1, 1) # except Exception as e: # logger.error(f"Error during clip extraction for {video_id}: {str(e)}") # clip_extraction_status[video_id] = {"error": str(e)} # def run_transcription(video_id): # try: # base_dir = app.root_path # output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # # Check if transcription already exists and is valid. # if os.path.exists(output_path) and os.path.getsize(output_path) > 0: # logger.info(f"Using cached transcription for video {video_id}.") # transcription_progress_status[video_id] = {"status": "completed", "percent": 100} # return # video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") # transcription_progress_status[video_id] = {"status": "started", "percent": 10} # # Check if AWS credentials are available # if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'): # logger.warning("AWS credentials not found. Transcription will not work properly.") # transcription_progress_status[video_id] = { # "status": "error", # "percent": 0, # "message": "AWS credentials missing" # } # return # # Run transcription via the imported function from get_transcription_with_amazon.py # from get_transcription_with_amazon import get_word_timestamps # word_timestamps = get_word_timestamps(video_path) # with open(output_path, "w") as f: # json.dump(word_timestamps, f, indent=4) # transcription_progress_status[video_id] = {"status": "completed", "percent": 100} # except Exception as e: # logger.error(f"Error during transcription for {video_id}: {str(e)}") # transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} # # Authentication routes # @app.route('/login') # def login(): # """Handle login for both local and HF environments.""" # logger.info(f"Login route called. Headers: {dict(request.headers)}") # if is_hf_space: # username = request.headers.get('X-Spaces-Username') # logger.info(f"Username from headers in login: {username}") # if username and is_allowed_user(username): # session['user'] = {'name': username, 'is_hf': True} # return redirect(url_for('index')) # else: # # Redirect to the HF auth endpoint # return redirect('/auth') # else: # # For local development # session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} # return redirect(url_for('index')) # @app.route('/auth/callback') # def auth_callback(): # """This route will be called by Hugging Face after successful authentication.""" # logger.info(f"Auth callback called. Headers: {dict(request.headers)}") # if is_hf_space: # # In Hugging Face Spaces, the user info is available in the request headers # username = request.headers.get('X-Spaces-Username') # if username: # session['user'] = {'name': username, 'is_hf': True} # return redirect(url_for('index')) # else: # return render_template('error.html', message="Authentication failed. No username provided.") # return redirect(url_for('login')) # # Replace the health check route with this improved version # @app.route('/health') # def health_check(): # """Health check endpoint for container verification.""" # # Log environment variables for debugging # env_vars = { # "FLASK_ENV": os.environ.get('FLASK_ENV', 'production'), # "DEBUG": os.environ.get('DEBUG', 'Not set'), # "SPACE_ID": os.environ.get('SPACE_ID', 'Not set'), # "BYPASS_AUTH": os.environ.get('BYPASS_AUTH', 'Not set'), # "SECRET_KEY": os.environ.get('SECRET_KEY', 'Not set')[:5] + '...' if os.environ.get('SECRET_KEY') else 'Not set' # } # logger.info(f"Health check called. Environment: {env_vars}") # # Get session information for debugging # session_info = dict(session) if session else None # session_keys = list(session.keys()) if session else [] # return jsonify({ # "status": "healthy", # "environment": env_vars, # "session_keys": session_keys, # "is_hf_space": is_hf_space, # "bypass_auth": bypass_auth, # "directories": { # "videos": os.path.exists(VIDEO_DIR), # "annotations": os.path.exists(ANNOTATIONS_DIR), # "temp": os.path.exists(TEMP_DIR) # } # }) # @app.route('/auth') # def auth(): # """This route handles HF authentication.""" # logger.info(f"Auth route called. Headers: {dict(request.headers)}") # # Force bypass auth to be true for debugging # bypass_auth = True # # If bypass is enabled, authenticate immediately # if bypass_auth: # logger.info("Auth bypass enabled, setting default user") # session['user'] = {'name': 'Perilon', 'is_hf': True} # return redirect(url_for('index')) # # Normal authentication logic # username = request.headers.get('X-Spaces-Username') # logger.info(f"Username from headers in auth: {username}") # if is_hf_space and username and is_allowed_user(username): # logger.info(f"Setting user in session: {username}") # session['user'] = {'name': username, 'is_hf': True} # return redirect(url_for('index')) # elif not is_hf_space: # # For local development # session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} # return redirect(url_for('index')) # else: # # For HF with no valid username yet # return render_template('error.html', message= # "Waiting for Hugging Face authentication. If you continue to see this message, " # "please make sure you're logged into Hugging Face and your username is allowed.") # @app.before_request # def check_auth(): # """Check authentication before processing requests.""" # # Skip authentication for certain routes and static files # if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug', '/health'] or request.path.startswith('/static/'): # return # # Force bypass auth to be true for debugging # bypass_auth = True # # Log all request paths to help troubleshoot # logger.debug(f"Request path: {request.path}, User in session: {'user' in session}") # if bypass_auth: # # Set default user for bypass mode if not already set # if 'user' not in session: # session['user'] = {'name': 'Perilon', 'is_hf': True} # return # if is_hf_space: # # Check for HF username header # username = request.headers.get('X-Spaces-Username') # if 'user' in session: # logger.debug(f"User in session: {session['user']}") # return # if username and is_allowed_user(username): # logger.info(f"Setting user from headers: {username}") # session['user'] = {'name': username, 'is_hf': True} # return # # No valid user in session or headers # logger.info(f"No authenticated user, redirecting to /auth") # return redirect('/auth') # elif 'user' not in session: # return redirect(url_for('login')) # @app.route('/logout') # def logout(): # """Clear session and redirect to login.""" # session.clear() # Clear the entire session # if is_hf_space: # return redirect('/auth/logout') # return redirect(url_for('login')) # @app.route('/debug') # def debug_info(): # """Return debug information.""" # cookies = {key: request.cookies.get(key) for key in request.cookies.keys()} # info = { # "session": dict(session) if session else None, # "headers": dict(request.headers), # "cookies": cookies, # "is_hf_space": is_hf_space, # "allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'), # "app_config": {k: str(v) for k, v in app.config.items() if k in # ['SESSION_COOKIE_SECURE', 'SESSION_COOKIE_HTTPONLY', # 'SESSION_COOKIE_SAMESITE', 'PERMANENT_SESSION_LIFETIME']}, # } # return jsonify(info) # # Main application routes # @app.route('/') # @login_required # def index(): # """Main entry point, redirects to video selection.""" # return redirect(url_for('select_video')) # @app.route('/select_video') # @login_required # def select_video(): # """Page to select a video for annotation.""" # if not os.path.exists(VIDEO_DIR): # return render_template('error.html', message="Video directory not found.") # videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] # video_ids = [os.path.splitext(v)[0] for v in videos] # return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) # @app.route('/player/') # @login_required # def player(video_id): # """Video player page for annotation.""" # return render_template('player.html', video_id=video_id, user=session.get('user')) # @app.route('/videos') # @login_required # def get_videos(): # """API endpoint to get available videos.""" # if not os.path.exists(VIDEO_DIR): # return jsonify({'error': 'Video directory not found'}), 404 # videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] # if not videos: # return jsonify({'error': 'No videos found'}), 404 # return jsonify(videos) # @app.route('/video/') # @login_required # def serve_video(filename): # """Serve a video file.""" # if not os.path.exists(os.path.join(VIDEO_DIR, filename)): # return jsonify({'error': 'Video not found'}), 404 # return send_from_directory(VIDEO_DIR, filename) # @app.route('/save_annotations', methods=['POST']) # @login_required # def save_annotations(): # """Save annotation data.""" # data = request.json # if not data or 'video' not in data or 'timestamps' not in data: # return jsonify({'success': False, 'message': 'Invalid data'}), 400 # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") # annotation_data = { # "video_name": data['video'] + ".mp4", # "timestamps": sorted(data['timestamps']), # "annotation_date": datetime.now().isoformat(), # "annotated_by": session.get('user', {}).get('name', 'unknown') # } # with open(annotation_file, 'w') as f: # json.dump(annotation_data, f, indent=4) # return jsonify({'success': True, 'message': 'Annotations saved successfully'}) # @app.route('/get_annotations/') # @login_required # def get_annotations(video_name): # """Get annotations for a video.""" # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") # if not os.path.exists(annotation_file): # return jsonify({'error': 'No annotations found'}), 404 # with open(annotation_file, 'r') as f: # annotations = json.load(f) # return jsonify(annotations) # @app.route("/alignment/") # @login_required # def alignment_mode(video_id): # """Page for aligning sign language with transcribed text.""" # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") # if not os.path.exists(annotation_file): # return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") # with open(annotation_file, 'r') as f: # annotations = json.load(f) # return render_template( # "alignment.html", # video_id=video_id, # total_clips=len(annotations['timestamps']) - 1, # user=session.get('user') # ) # @app.route("/api/transcript/") # @login_required # def get_transcript(video_id): # """Get transcript for a video.""" # timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # logger.info(f"Attempting to load word timestamps from: {timestamps_file}") # if not os.path.exists(timestamps_file): # logger.warning(f"Word timestamps file not found: {timestamps_file}") # return jsonify({ # "status": "error", # "message": "No word timestamps found for this video" # }), 404 # try: # with open(timestamps_file, 'r') as f: # word_data = json.load(f) # full_text = " ".join(item["punctuated_word"] for item in word_data) # words_with_times = [{ # "word": item["punctuated_word"], # "start": float(item["start_time"]), # "end": float(item["end_time"]) # } for item in word_data] # logger.info(f"Successfully created transcript ({len(full_text)} characters)") # return jsonify({ # "status": "success", # "text": full_text, # "words": words_with_times # }) # except Exception as e: # logger.error(f"Error processing word timestamps: {str(e)}") # return jsonify({ # "status": "error", # "message": f"Error processing word timestamps: {str(e)}" # }), 500 # @app.route("/api/word_timestamps/") # @login_required # def get_word_timestamps(video_id): # """Get word-level timestamps for a video.""" # timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # logger.info(f"Attempting to load word timestamps from: {timestamps_file}") # if not os.path.exists(timestamps_file): # logger.warning(f"Word timestamps file not found: {timestamps_file}") # return jsonify({ # "status": "error", # "message": "No word timestamps found for this video" # }), 404 # try: # with open(timestamps_file, 'r') as f: # word_data = json.load(f) # logger.info(f"Successfully loaded {len(word_data)} word timestamps") # return jsonify({ # "status": "success", # "words": word_data # }) # except Exception as e: # logger.error(f"Error processing word timestamps: {str(e)}") # return jsonify({ # "status": "error", # "message": f"Error processing word timestamps: {str(e)}" # }), 500 # @app.route("/api/clips/") # @login_required # def get_video_clips(video_id): # """Get clips for a video.""" # try: # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") # if not os.path.exists(annotation_file): # raise FileNotFoundError("Annotations not found") # with open(annotation_file, 'r') as f: # annotations = json.load(f) # timestamps = annotations['timestamps'] # clips = [] # for i in range(len(timestamps)-1): # clips.append({ # "index": i, # "start": timestamps[i], # "end": timestamps[i+1], # "path": f"/clip/{video_id}/{i}" # }) # return jsonify({ # "status": "success", # "clips": clips # }) # except Exception as e: # logger.error(f"Error getting clips: {str(e)}") # return jsonify({ # "status": "error", # "message": str(e) # }), 500 # @app.route("/clip//") # @login_required # def serve_clip(video_id, clip_index): # """Serve a specific clip.""" # clip_path = os.path.join( # TEMP_DIR, # f"{video_id}_clip_{clip_index:03d}.mp4" # ) # logger.info(f"Attempting to serve clip: {clip_path}") # if not os.path.exists(clip_path): # logger.error(f"Clip not found: {clip_path}") # return jsonify({ # "status": "error", # "message": "Clip not found" # }), 404 # return send_file(clip_path, mimetype="video/mp4") # @app.route("/api/save_alignments", methods=["POST"]) # @login_required # def save_alignments(): # """Save alignment data.""" # try: # data = request.json # if not data or 'video_id' not in data or 'alignments' not in data: # return jsonify({'success': False, 'message': 'Invalid data'}), 400 # # Add user information to the alignments # for alignment in data['alignments']: # if alignment: # alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') # output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") # with open(output_path, "w") as f: # json.dump(data['alignments'], f, indent=2) # return jsonify({ # "success": True, # "message": "Alignments saved successfully" # }) # except Exception as e: # logger.error(f"Error saving alignments: {str(e)}") # return jsonify({ # "success": False, # "message": str(e) # }), 500 # @app.route("/api/extract_clips/") # @login_required # def extract_clips_for_video(video_id): # """Extract clips and start transcription for a video.""" # status = clip_extraction_status.get(video_id, {}) # if status.get("percent", 0) < 100: # thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) # thread.start() # if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: # thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) # thread_trans.start() # return jsonify({"status": "started"}) # @app.route("/api/clip_progress/") # @login_required # def clip_progress(video_id): # """Get clip extraction progress.""" # progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) # return jsonify(progress) # @app.route("/api/transcription_progress/") # @login_required # def transcription_progress(video_id): # """Get transcription progress.""" # progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) # return jsonify(progress) # if __name__ == '__main__': # try: # # Print diagnostic information # print("=" * 50) # print(f"Starting app with configuration:") # print(f"- Running in HF Space: {is_hf_space}") # print(f"- Auth bypass: {bypass_auth}") # print(f"- Port: {os.getenv('PORT', 5000)}") # print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}") # print("=" * 50) # port = int(os.getenv('PORT', 5000)) # app.run(host='0.0.0.0', port=port, debug=True) # except Exception as e: # print(f"Error starting the application: {e}") # import traceback # traceback.print_exc() from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session import os, json, threading, time, signal, sys from datetime import datetime from extract_signed_segments_from_annotations import ClipExtractor, VideoClip import logging from dotenv import load_dotenv import boto3 from botocore.exceptions import ClientError import tempfile import uuid import requests from urllib.parse import urlparse # Load environment variables load_dotenv() # Add this near the top with other environment variables bypass_auth = os.getenv('BYPASS_AUTH', 'false').lower() == 'true' # Configure logging first logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Hugging Face specific configuration is_hf_space = os.getenv('SPACE_ID') is not None if is_hf_space: logger.info("Running in Hugging Face Spaces environment") # Allow insecure transport for development in HF os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' # Ensure port is set correctly os.environ['PORT'] = '7860' app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') # Configure session for HF if is_hf_space: app.config['SESSION_COOKIE_SECURE'] = False app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SAMESITE'] = None # Add this line app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours # Directory paths VIDEO_DIR = os.path.abspath("data/videos") ANNOTATIONS_DIR = os.path.abspath("data/annotations") TEMP_DIR = os.path.abspath("data/temp") WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") ALIGNMENTS_DIR = os.path.abspath("data/alignments") TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") # S3 configuration S3_BUCKET = os.getenv('S3_BUCKET', "sorenson-ai-sb-scratch") S3_VIDEO_PREFIX = os.getenv('S3_VIDEO_PREFIX', "awilkinson/kylie_dataset_videos_for_alignment_webapp/") USE_S3_FOR_VIDEOS = os.getenv('USE_S3_FOR_VIDEOS', 'true').lower() == 'true' # Ensure all required directories exist for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: os.makedirs(directory, exist_ok=True) # Global dictionaries for progress tracking clip_extraction_status = {} transcription_progress_status = {} # S3 helper functions def get_s3_client(): """Get a boto3 S3 client.""" return boto3.client( 's3', region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-west-2'), aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY') ) def list_s3_videos(): """List all videos in the S3 bucket with the given prefix.""" try: s3_client = get_s3_client() response = s3_client.list_objects_v2( Bucket=S3_BUCKET, Prefix=S3_VIDEO_PREFIX ) if 'Contents' not in response: logger.warning(f"No videos found in S3 bucket {S3_BUCKET} with prefix {S3_VIDEO_PREFIX}") return [] # Extract video IDs (filenames without extension) from S3 keys videos = [] for item in response['Contents']: key = item['Key'] if key.endswith('.mp4'): # Extract just the filename without extension filename = os.path.basename(key) video_id = os.path.splitext(filename)[0] videos.append(video_id) return videos except ClientError as e: logger.error(f"Error listing videos from S3: {str(e)}") return [] def download_video_from_s3(video_id): """Download a video from S3 to the local videos directory.""" video_filename = f"{video_id}.mp4" s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" local_path = os.path.join(VIDEO_DIR, video_filename) # Check if the file already exists locally if os.path.exists(local_path): logger.info(f"Video {video_id} already exists locally.") return local_path try: logger.info(f"Downloading video {video_id} from S3...") s3_client = get_s3_client() s3_client.download_file(S3_BUCKET, s3_key, local_path) logger.info(f"Video {video_id} downloaded successfully to {local_path}") return local_path except ClientError as e: logger.error(f"Error downloading video from S3: {str(e)}") return None def generate_presigned_url(video_id, expiration=3600): """Generate a presigned URL for direct access to the video in S3.""" video_filename = f"{video_id}.mp4" s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" try: s3_client = get_s3_client() url = s3_client.generate_presigned_url( 'get_object', Params={'Bucket': S3_BUCKET, 'Key': s3_key}, ExpiresIn=expiration ) return url except ClientError as e: logger.error(f"Error generating presigned URL: {str(e)}") return None # Graceful shutdown handler def graceful_shutdown(signum, frame): """Handle graceful shutdown on signals.""" logger.info(f"Received signal {signum}, shutting down gracefully...") # Clean up as needed here sys.exit(0) # Register signal handlers signal.signal(signal.SIGTERM, graceful_shutdown) signal.signal(signal.SIGINT, graceful_shutdown) # Login required decorator def login_required(f): from functools import wraps @wraps(f) def decorated_function(*args, **kwargs): if 'user' not in session: logger.info(f"User not in session, redirecting to login") return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Allow specific users (for testing) def is_allowed_user(username): allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username allowed_users = [user.strip() for user in allowed_users_env.split(',')] return username in allowed_users or not is_hf_space # Allow all users in local dev def update_extraction_progress(video_id, current, total): percent = int((current / total) * 100) clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} def run_clip_extraction(video_id): try: base_dir = app.root_path extractor = ClipExtractor(base_dir) extractor.extract_clips_from_annotations( video_id, progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) ) if video_id in clip_extraction_status: status = clip_extraction_status[video_id] if status.get("percent", 0) < 100: update_extraction_progress(video_id, status["total"], status["total"]) else: update_extraction_progress(video_id, 1, 1) except Exception as e: logger.error(f"Error during clip extraction for {video_id}: {str(e)}") clip_extraction_status[video_id] = {"error": str(e)} def run_transcription(video_id): try: base_dir = app.root_path output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # Check if transcription already exists and is valid. if os.path.exists(output_path) and os.path.getsize(output_path) > 0: logger.info(f"Using cached transcription for video {video_id}.") transcription_progress_status[video_id] = {"status": "completed", "percent": 100} return # Download video from S3 if needed if USE_S3_FOR_VIDEOS: video_path = download_video_from_s3(video_id) if not video_path: transcription_progress_status[video_id] = { "status": "error", "percent": 0, "message": f"Failed to download video {video_id} from S3" } return else: video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") transcription_progress_status[video_id] = {"status": "started", "percent": 10} # Check if AWS credentials are available if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'): logger.warning("AWS credentials not found. Transcription will not work properly.") transcription_progress_status[video_id] = { "status": "error", "percent": 0, "message": "AWS credentials missing" } return # Run transcription via the imported function from get_transcription_with_amazon.py from get_transcription_with_amazon import get_word_timestamps word_timestamps = get_word_timestamps(video_path) with open(output_path, "w") as f: json.dump(word_timestamps, f, indent=4) transcription_progress_status[video_id] = {"status": "completed", "percent": 100} except Exception as e: logger.error(f"Error during transcription for {video_id}: {str(e)}") transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} # Authentication routes @app.route('/login') def login(): """Handle login for both local and HF environments.""" logger.info(f"Login route called. Headers: {dict(request.headers)}") if is_hf_space: username = request.headers.get('X-Spaces-Username') logger.info(f"Username from headers in login: {username}") if username and is_allowed_user(username): session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) else: # Redirect to the HF auth endpoint return redirect('/auth') else: # For local development session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} return redirect(url_for('index')) @app.route('/auth/callback') def auth_callback(): """This route will be called by Hugging Face after successful authentication.""" logger.info(f"Auth callback called. Headers: {dict(request.headers)}") if is_hf_space: # In Hugging Face Spaces, the user info is available in the request headers username = request.headers.get('X-Spaces-Username') if username: session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) else: return render_template('error.html', message="Authentication failed. No username provided.") return redirect(url_for('login')) @app.route('/health') def health_check(): """Health check endpoint for container verification.""" # Log environment variables for debugging env_vars = { "FLASK_ENV": os.environ.get('FLASK_ENV', 'production'), "DEBUG": os.environ.get('DEBUG', 'Not set'), "SPACE_ID": os.environ.get('SPACE_ID', 'Not set'), "BYPASS_AUTH": os.environ.get('BYPASS_AUTH', 'Not set'), "SECRET_KEY": os.environ.get('SECRET_KEY', 'Not set')[:5] + '...' if os.environ.get('SECRET_KEY') else 'Not set', "S3_BUCKET": os.environ.get('S3_BUCKET', 'Not set'), "S3_VIDEO_PREFIX": os.environ.get('S3_VIDEO_PREFIX', 'Not set'), "USE_S3_FOR_VIDEOS": os.environ.get('USE_S3_FOR_VIDEOS', 'Not set') } logger.info(f"Health check called. Environment: {env_vars}") # Get session information for debugging session_info = dict(session) if session else None session_keys = list(session.keys()) if session else [] return jsonify({ "status": "healthy", "environment": env_vars, "session_keys": session_keys, "is_hf_space": is_hf_space, "bypass_auth": bypass_auth, "directories": { "videos": os.path.exists(VIDEO_DIR), "annotations": os.path.exists(ANNOTATIONS_DIR), "temp": os.path.exists(TEMP_DIR) } }) @app.route('/auth') def auth(): """This route handles HF authentication.""" logger.info(f"Auth route called. Headers: {dict(request.headers)}") # Force bypass auth to be true for debugging bypass_auth = True # If bypass is enabled, authenticate immediately if bypass_auth: logger.info("Auth bypass enabled, setting default user") session['user'] = {'name': 'Perilon', 'is_hf': True} return redirect(url_for('index')) # Normal authentication logic username = request.headers.get('X-Spaces-Username') logger.info(f"Username from headers in auth: {username}") if is_hf_space and username and is_allowed_user(username): logger.info(f"Setting user in session: {username}") session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) elif not is_hf_space: # For local development session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} return redirect(url_for('index')) else: # For HF with no valid username yet return render_template('error.html', message= "Waiting for Hugging Face authentication. If you continue to see this message, " "please make sure you're logged into Hugging Face and your username is allowed.") @app.before_request def check_auth(): """Check authentication before processing requests.""" # Skip authentication for certain routes and static files if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug', '/health'] or request.path.startswith('/static/'): return # Force bypass auth to be true for debugging bypass_auth = True # Log all request paths to help troubleshoot logger.debug(f"Request path: {request.path}, User in session: {'user' in session}") if bypass_auth: # Set default user for bypass mode if not already set if 'user' not in session: session['user'] = {'name': 'Perilon', 'is_hf': True} return if is_hf_space: # Check for HF username header username = request.headers.get('X-Spaces-Username') if 'user' in session: logger.debug(f"User in session: {session['user']}") return if username and is_allowed_user(username): logger.info(f"Setting user from headers: {username}") session['user'] = {'name': username, 'is_hf': True} return # No valid user in session or headers logger.info(f"No authenticated user, redirecting to /auth") return redirect('/auth') elif 'user' not in session: return redirect(url_for('login')) @app.route('/logout') def logout(): """Clear session and redirect to login.""" session.clear() # Clear the entire session if is_hf_space: return redirect('/auth/logout') return redirect(url_for('login')) @app.route('/debug') def debug_info(): """Return debug information.""" cookies = {key: request.cookies.get(key) for key in request.cookies.keys()} info = { "session": dict(session) if session else None, "headers": dict(request.headers), "cookies": cookies, "is_hf_space": is_hf_space, "allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'), "app_config": {k: str(v) for k, v in app.config.items() if k in ['SESSION_COOKIE_SECURE', 'SESSION_COOKIE_HTTPONLY', 'SESSION_COOKIE_SAMESITE', 'PERMANENT_SESSION_LIFETIME']}, "s3_config": { "S3_BUCKET": S3_BUCKET, "S3_VIDEO_PREFIX": S3_VIDEO_PREFIX, "USE_S3_FOR_VIDEOS": USE_S3_FOR_VIDEOS } } return jsonify(info) # Main application routes @app.route('/') @login_required def index(): """Main entry point, redirects to video selection.""" return redirect(url_for('select_video')) @app.route('/select_video') @login_required def select_video(): """Page to select a video for annotation.""" if USE_S3_FOR_VIDEOS: video_ids = list_s3_videos() else: if not os.path.exists(VIDEO_DIR): return render_template('error.html', message="Video directory not found.") videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] video_ids = [os.path.splitext(v)[0] for v in videos] return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) @app.route('/player/') @login_required def player(video_id): """Video player page for annotation.""" return render_template('player.html', video_id=video_id, user=session.get('user')) @app.route('/videos') @login_required def get_videos(): """API endpoint to get available videos.""" if USE_S3_FOR_VIDEOS: videos = list_s3_videos() if not videos: return jsonify({'error': 'No videos found in S3'}), 404 # Return just the filenames with .mp4 extension for compatibility return jsonify([f"{vid}.mp4" for vid in videos]) else: # Original local file behavior if not os.path.exists(VIDEO_DIR): return jsonify({'error': 'Video directory not found'}), 404 videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] if not videos: return jsonify({'error': 'No videos found'}), 404 return jsonify(videos) @app.route('/video/') @login_required def serve_video(filename): """Serve a video file from S3 or local storage.""" video_id = os.path.splitext(filename)[0] # Remove extension if USE_S3_FOR_VIDEOS: # Option 1: Generate a presigned URL and redirect presigned_url = generate_presigned_url(video_id) if presigned_url: return redirect(presigned_url) # Option 2 (fallback): Download from S3 to local temporary storage and serve local_path = download_video_from_s3(video_id) if local_path and os.path.exists(local_path): return send_from_directory(VIDEO_DIR, filename) return jsonify({'error': 'Video not found in S3'}), 404 else: # Original local file behavior if not os.path.exists(os.path.join(VIDEO_DIR, filename)): return jsonify({'error': 'Video not found'}), 404 return send_from_directory(VIDEO_DIR, filename) @app.route('/save_annotations', methods=['POST']) @login_required def save_annotations(): """Save annotation data.""" data = request.json if not data or 'video' not in data or 'timestamps' not in data: return jsonify({'success': False, 'message': 'Invalid data'}), 400 annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") annotation_data = { "video_name": data['video'] + ".mp4", "timestamps": sorted(data['timestamps']), "annotation_date": datetime.now().isoformat(), "annotated_by": session.get('user', {}).get('name', 'unknown') } with open(annotation_file, 'w') as f: json.dump(annotation_data, f, indent=4) return jsonify({'success': True, 'message': 'Annotations saved successfully'}) @app.route('/get_annotations/') @login_required def get_annotations(video_name): """Get annotations for a video.""" annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") if not os.path.exists(annotation_file): return jsonify({'error': 'No annotations found'}), 404 with open(annotation_file, 'r') as f: annotations = json.load(f) return jsonify(annotations) @app.route("/alignment/") @login_required def alignment_mode(video_id): """Page for aligning sign language with transcribed text.""" annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") if not os.path.exists(annotation_file): return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") with open(annotation_file, 'r') as f: annotations = json.load(f) return render_template( "alignment.html", video_id=video_id, total_clips=len(annotations['timestamps']) - 1, user=session.get('user') ) @app.route("/api/transcript/") @login_required def get_transcript(video_id): """Get transcript for a video.""" timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") logger.info(f"Attempting to load word timestamps from: {timestamps_file}") if not os.path.exists(timestamps_file): logger.warning(f"Word timestamps file not found: {timestamps_file}") return jsonify({ "status": "error", "message": "No word timestamps found for this video" }), 404 try: with open(timestamps_file, 'r') as f: word_data = json.load(f) full_text = " ".join(item["punctuated_word"] for item in word_data) words_with_times = [{ "word": item["punctuated_word"], "start": float(item["start_time"]), "end": float(item["end_time"]) } for item in word_data] logger.info(f"Successfully created transcript ({len(full_text)} characters)") return jsonify({ "status": "success", "text": full_text, "words": words_with_times }) except Exception as e: logger.error(f"Error processing word timestamps: {str(e)}") return jsonify({ "status": "error", "message": f"Error processing word timestamps: {str(e)}" }), 500 @app.route("/api/word_timestamps/") @login_required def get_word_timestamps(video_id): """Get word-level timestamps for a video.""" timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") logger.info(f"Attempting to load word timestamps from: {timestamps_file}") if not os.path.exists(timestamps_file): logger.warning(f"Word timestamps file not found: {timestamps_file}") return jsonify({ "status": "error", "message": "No word timestamps found for this video" }), 404 try: with open(timestamps_file, 'r') as f: word_data = json.load(f) logger.info(f"Successfully loaded {len(word_data)} word timestamps") return jsonify({ "status": "success", "words": word_data }) except Exception as e: logger.error(f"Error processing word timestamps: {str(e)}") return jsonify({ "status": "error", "message": f"Error processing word timestamps: {str(e)}" }), 500 @app.route("/api/clips/") @login_required def get_video_clips(video_id): """Get clips for a video.""" try: annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") if not os.path.exists(annotation_file): raise FileNotFoundError("Annotations not found") with open(annotation_file, 'r') as f: annotations = json.load(f) timestamps = annotations['timestamps'] clips = [] for i in range(len(timestamps)-1): clips.append({ "index": i, "start": timestamps[i], "end": timestamps[i+1], "path": f"/clip/{video_id}/{i}" }) return jsonify({ "status": "success", "clips": clips }) except Exception as e: logger.error(f"Error getting clips: {str(e)}") return jsonify({ "status": "error", "message": str(e) }), 500 @app.route("/clip//") @login_required def serve_clip(video_id, clip_index): """Serve a specific clip.""" clip_path = os.path.join( TEMP_DIR, f"{video_id}_clip_{clip_index:03d}.mp4" ) logger.info(f"Attempting to serve clip: {clip_path}") if not os.path.exists(clip_path): logger.error(f"Clip not found: {clip_path}") return jsonify({ "status": "error", "message": "Clip not found" }), 404 return send_file(clip_path, mimetype="video/mp4") @app.route("/api/save_alignments", methods=["POST"]) @login_required def save_alignments(): """Save alignment data.""" try: data = request.json if not data or 'video_id' not in data or 'alignments' not in data: return jsonify({'success': False, 'message': 'Invalid data'}), 400 # Add user information to the alignments for alignment in data['alignments']: if alignment: alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") with open(output_path, "w") as f: json.dump(data['alignments'], f, indent=2) return jsonify({ "success": True, "message": "Alignments saved successfully" }) except Exception as e: logger.error(f"Error saving alignments: {str(e)}") return jsonify({ "success": False, "message": str(e) }), 500 @app.route("/api/extract_clips/") @login_required def extract_clips_for_video(video_id): """Extract clips and start transcription for a video.""" # If using S3, ensure the video is downloaded first if USE_S3_FOR_VIDEOS: video_path = download_video_from_s3(video_id) if not video_path: return jsonify({ "status": "error", "message": f"Failed to download video {video_id} from S3" }), 404 status = clip_extraction_status.get(video_id, {}) if status.get("percent", 0) < 100: thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) thread.start() if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) thread_trans.start() return jsonify({"status": "started"}) @app.route("/api/clip_progress/") @login_required def clip_progress(video_id): """Get clip extraction progress.""" progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) return jsonify(progress) @app.route("/api/transcription_progress/") @login_required def transcription_progress(video_id): """Get transcription progress.""" progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) return jsonify(progress) if __name__ == '__main__': try: # Print diagnostic information print("=" * 50) print(f"Starting app with configuration:") print(f"- Running in HF Space: {is_hf_space}") print(f"- Auth bypass: {bypass_auth}") print(f"- Port: {os.getenv('PORT', 5000)}") print(f"- S3 for videos: {USE_S3_FOR_VIDEOS}") print(f"- S3 bucket: {S3_BUCKET}") print(f"- S3 prefix: {S3_VIDEO_PREFIX}") print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}") if USE_S3_FOR_VIDEOS: try: s3_videos = list_s3_videos() print(f"- Available S3 videos: {s3_videos if s3_videos else 'None'}") except Exception as e: print(f"- Error listing S3 videos: {str(e)}") print("=" * 50) port = int(os.getenv('PORT', 5000)) app.run(host='0.0.0.0', port=port, debug=True) except Exception as e: print(f"Error starting the application: {e}") import traceback traceback.print_exc()