Perilon's picture
Bug fixes
2daffd5
raw
history blame
51.4 kB
# from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session
# import os, json, threading, time, signal, sys
# from datetime import datetime
# from extract_signed_segments_from_annotations import ClipExtractor, VideoClip
# import logging
# from dotenv import load_dotenv
# # Load environment variables
# load_dotenv()
# # Add this near the top with other environment variables
# bypass_auth = os.getenv('BYPASS_AUTH', 'false').lower() == 'true'
# # Configure logging first
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# )
# logger = logging.getLogger(__name__)
# # Hugging Face specific configuration
# is_hf_space = os.getenv('SPACE_ID') is not None
# if is_hf_space:
# logger.info("Running in Hugging Face Spaces environment")
# # Allow insecure transport for development in HF
# os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
# # Ensure port is set correctly
# os.environ['PORT'] = '7860'
# app = Flask(__name__)
# app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing')
# # Configure session for HF
# if is_hf_space:
# app.config['SESSION_COOKIE_SECURE'] = False
# app.config['SESSION_COOKIE_HTTPONLY'] = True
# app.config['SESSION_COOKIE_SAMESITE'] = None # Add this line
# app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours
# # Directory paths
# VIDEO_DIR = os.path.abspath("data/videos")
# ANNOTATIONS_DIR = os.path.abspath("data/annotations")
# TEMP_DIR = os.path.abspath("data/temp")
# WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps")
# ALIGNMENTS_DIR = os.path.abspath("data/alignments")
# TRANSCRIPTS_DIR = os.path.abspath("data/transcripts")
# # Ensure all required directories exist
# for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]:
# os.makedirs(directory, exist_ok=True)
# # Global dictionaries for progress tracking
# clip_extraction_status = {}
# transcription_progress_status = {}
# # Graceful shutdown handler
# def graceful_shutdown(signum, frame):
# """Handle graceful shutdown on signals."""
# logger.info(f"Received signal {signum}, shutting down gracefully...")
# # Clean up as needed here
# sys.exit(0)
# # Register signal handlers
# signal.signal(signal.SIGTERM, graceful_shutdown)
# signal.signal(signal.SIGINT, graceful_shutdown)
# # Login required decorator
# def login_required(f):
# from functools import wraps
# @wraps(f)
# def decorated_function(*args, **kwargs):
# if 'user' not in session:
# logger.info(f"User not in session, redirecting to login")
# return redirect(url_for('login'))
# return f(*args, **kwargs)
# return decorated_function
# # Allow specific users (for testing)
# def is_allowed_user(username):
# allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username
# allowed_users = [user.strip() for user in allowed_users_env.split(',')]
# return username in allowed_users or not is_hf_space # Allow all users in local dev
# def update_extraction_progress(video_id, current, total):
# percent = int((current / total) * 100)
# clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent}
# def run_clip_extraction(video_id):
# try:
# base_dir = app.root_path
# extractor = ClipExtractor(base_dir)
# extractor.extract_clips_from_annotations(
# video_id,
# progress_callback=lambda current, total: update_extraction_progress(video_id, current, total)
# )
# if video_id in clip_extraction_status:
# status = clip_extraction_status[video_id]
# if status.get("percent", 0) < 100:
# update_extraction_progress(video_id, status["total"], status["total"])
# else:
# update_extraction_progress(video_id, 1, 1)
# except Exception as e:
# logger.error(f"Error during clip extraction for {video_id}: {str(e)}")
# clip_extraction_status[video_id] = {"error": str(e)}
# def run_transcription(video_id):
# try:
# base_dir = app.root_path
# output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# # Check if transcription already exists and is valid.
# if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
# logger.info(f"Using cached transcription for video {video_id}.")
# transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
# return
# video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4")
# transcription_progress_status[video_id] = {"status": "started", "percent": 10}
# # Check if AWS credentials are available
# if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'):
# logger.warning("AWS credentials not found. Transcription will not work properly.")
# transcription_progress_status[video_id] = {
# "status": "error",
# "percent": 0,
# "message": "AWS credentials missing"
# }
# return
# # Run transcription via the imported function from get_transcription_with_amazon.py
# from get_transcription_with_amazon import get_word_timestamps
# word_timestamps = get_word_timestamps(video_path)
# with open(output_path, "w") as f:
# json.dump(word_timestamps, f, indent=4)
# transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
# except Exception as e:
# logger.error(f"Error during transcription for {video_id}: {str(e)}")
# transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)}
# # Authentication routes
# @app.route('/login')
# def login():
# """Handle login for both local and HF environments."""
# logger.info(f"Login route called. Headers: {dict(request.headers)}")
# if is_hf_space:
# username = request.headers.get('X-Spaces-Username')
# logger.info(f"Username from headers in login: {username}")
# if username and is_allowed_user(username):
# session['user'] = {'name': username, 'is_hf': True}
# return redirect(url_for('index'))
# else:
# # Redirect to the HF auth endpoint
# return redirect('/auth')
# else:
# # For local development
# session['user'] = {'name': 'LocalDeveloper', 'is_mock': True}
# return redirect(url_for('index'))
# @app.route('/auth/callback')
# def auth_callback():
# """This route will be called by Hugging Face after successful authentication."""
# logger.info(f"Auth callback called. Headers: {dict(request.headers)}")
# if is_hf_space:
# # In Hugging Face Spaces, the user info is available in the request headers
# username = request.headers.get('X-Spaces-Username')
# if username:
# session['user'] = {'name': username, 'is_hf': True}
# return redirect(url_for('index'))
# else:
# return render_template('error.html', message="Authentication failed. No username provided.")
# return redirect(url_for('login'))
# # Replace the health check route with this improved version
# @app.route('/health')
# def health_check():
# """Health check endpoint for container verification."""
# # Log environment variables for debugging
# env_vars = {
# "FLASK_ENV": os.environ.get('FLASK_ENV', 'production'),
# "DEBUG": os.environ.get('DEBUG', 'Not set'),
# "SPACE_ID": os.environ.get('SPACE_ID', 'Not set'),
# "BYPASS_AUTH": os.environ.get('BYPASS_AUTH', 'Not set'),
# "SECRET_KEY": os.environ.get('SECRET_KEY', 'Not set')[:5] + '...' if os.environ.get('SECRET_KEY') else 'Not set'
# }
# logger.info(f"Health check called. Environment: {env_vars}")
# # Get session information for debugging
# session_info = dict(session) if session else None
# session_keys = list(session.keys()) if session else []
# return jsonify({
# "status": "healthy",
# "environment": env_vars,
# "session_keys": session_keys,
# "is_hf_space": is_hf_space,
# "bypass_auth": bypass_auth,
# "directories": {
# "videos": os.path.exists(VIDEO_DIR),
# "annotations": os.path.exists(ANNOTATIONS_DIR),
# "temp": os.path.exists(TEMP_DIR)
# }
# })
# @app.route('/auth')
# def auth():
# """This route handles HF authentication."""
# logger.info(f"Auth route called. Headers: {dict(request.headers)}")
# # Force bypass auth to be true for debugging
# bypass_auth = True
# # If bypass is enabled, authenticate immediately
# if bypass_auth:
# logger.info("Auth bypass enabled, setting default user")
# session['user'] = {'name': 'Perilon', 'is_hf': True}
# return redirect(url_for('index'))
# # Normal authentication logic
# username = request.headers.get('X-Spaces-Username')
# logger.info(f"Username from headers in auth: {username}")
# if is_hf_space and username and is_allowed_user(username):
# logger.info(f"Setting user in session: {username}")
# session['user'] = {'name': username, 'is_hf': True}
# return redirect(url_for('index'))
# elif not is_hf_space:
# # For local development
# session['user'] = {'name': 'LocalDeveloper', 'is_mock': True}
# return redirect(url_for('index'))
# else:
# # For HF with no valid username yet
# return render_template('error.html', message=
# "Waiting for Hugging Face authentication. If you continue to see this message, "
# "please make sure you're logged into Hugging Face and your username is allowed.")
# @app.before_request
# def check_auth():
# """Check authentication before processing requests."""
# # Skip authentication for certain routes and static files
# if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug', '/health'] or request.path.startswith('/static/'):
# return
# # Force bypass auth to be true for debugging
# bypass_auth = True
# # Log all request paths to help troubleshoot
# logger.debug(f"Request path: {request.path}, User in session: {'user' in session}")
# if bypass_auth:
# # Set default user for bypass mode if not already set
# if 'user' not in session:
# session['user'] = {'name': 'Perilon', 'is_hf': True}
# return
# if is_hf_space:
# # Check for HF username header
# username = request.headers.get('X-Spaces-Username')
# if 'user' in session:
# logger.debug(f"User in session: {session['user']}")
# return
# if username and is_allowed_user(username):
# logger.info(f"Setting user from headers: {username}")
# session['user'] = {'name': username, 'is_hf': True}
# return
# # No valid user in session or headers
# logger.info(f"No authenticated user, redirecting to /auth")
# return redirect('/auth')
# elif 'user' not in session:
# return redirect(url_for('login'))
# @app.route('/logout')
# def logout():
# """Clear session and redirect to login."""
# session.clear() # Clear the entire session
# if is_hf_space:
# return redirect('/auth/logout')
# return redirect(url_for('login'))
# @app.route('/debug')
# def debug_info():
# """Return debug information."""
# cookies = {key: request.cookies.get(key) for key in request.cookies.keys()}
# info = {
# "session": dict(session) if session else None,
# "headers": dict(request.headers),
# "cookies": cookies,
# "is_hf_space": is_hf_space,
# "allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'),
# "app_config": {k: str(v) for k, v in app.config.items() if k in
# ['SESSION_COOKIE_SECURE', 'SESSION_COOKIE_HTTPONLY',
# 'SESSION_COOKIE_SAMESITE', 'PERMANENT_SESSION_LIFETIME']},
# }
# return jsonify(info)
# # Main application routes
# @app.route('/')
# @login_required
# def index():
# """Main entry point, redirects to video selection."""
# return redirect(url_for('select_video'))
# @app.route('/select_video')
# @login_required
# def select_video():
# """Page to select a video for annotation."""
# if not os.path.exists(VIDEO_DIR):
# return render_template('error.html', message="Video directory not found.")
# videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')]
# video_ids = [os.path.splitext(v)[0] for v in videos]
# return render_template('select_video.html', video_ids=video_ids, user=session.get('user'))
# @app.route('/player/<video_id>')
# @login_required
# def player(video_id):
# """Video player page for annotation."""
# return render_template('player.html', video_id=video_id, user=session.get('user'))
# @app.route('/videos')
# @login_required
# def get_videos():
# """API endpoint to get available videos."""
# if not os.path.exists(VIDEO_DIR):
# return jsonify({'error': 'Video directory not found'}), 404
# videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))]
# if not videos:
# return jsonify({'error': 'No videos found'}), 404
# return jsonify(videos)
# @app.route('/video/<path:filename>')
# @login_required
# def serve_video(filename):
# """Serve a video file."""
# if not os.path.exists(os.path.join(VIDEO_DIR, filename)):
# return jsonify({'error': 'Video not found'}), 404
# return send_from_directory(VIDEO_DIR, filename)
# @app.route('/save_annotations', methods=['POST'])
# @login_required
# def save_annotations():
# """Save annotation data."""
# data = request.json
# if not data or 'video' not in data or 'timestamps' not in data:
# return jsonify({'success': False, 'message': 'Invalid data'}), 400
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json")
# annotation_data = {
# "video_name": data['video'] + ".mp4",
# "timestamps": sorted(data['timestamps']),
# "annotation_date": datetime.now().isoformat(),
# "annotated_by": session.get('user', {}).get('name', 'unknown')
# }
# with open(annotation_file, 'w') as f:
# json.dump(annotation_data, f, indent=4)
# return jsonify({'success': True, 'message': 'Annotations saved successfully'})
# @app.route('/get_annotations/<path:video_name>')
# @login_required
# def get_annotations(video_name):
# """Get annotations for a video."""
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json")
# if not os.path.exists(annotation_file):
# return jsonify({'error': 'No annotations found'}), 404
# with open(annotation_file, 'r') as f:
# annotations = json.load(f)
# return jsonify(annotations)
# @app.route("/alignment/<video_id>")
# @login_required
# def alignment_mode(video_id):
# """Page for aligning sign language with transcribed text."""
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
# if not os.path.exists(annotation_file):
# return render_template("error.html", message="No annotations found for this video. Please annotate the video first.")
# with open(annotation_file, 'r') as f:
# annotations = json.load(f)
# return render_template(
# "alignment.html",
# video_id=video_id,
# total_clips=len(annotations['timestamps']) - 1,
# user=session.get('user')
# )
# @app.route("/api/transcript/<video_id>")
# @login_required
# def get_transcript(video_id):
# """Get transcript for a video."""
# timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
# if not os.path.exists(timestamps_file):
# logger.warning(f"Word timestamps file not found: {timestamps_file}")
# return jsonify({
# "status": "error",
# "message": "No word timestamps found for this video"
# }), 404
# try:
# with open(timestamps_file, 'r') as f:
# word_data = json.load(f)
# full_text = " ".join(item["punctuated_word"] for item in word_data)
# words_with_times = [{
# "word": item["punctuated_word"],
# "start": float(item["start_time"]),
# "end": float(item["end_time"])
# } for item in word_data]
# logger.info(f"Successfully created transcript ({len(full_text)} characters)")
# return jsonify({
# "status": "success",
# "text": full_text,
# "words": words_with_times
# })
# except Exception as e:
# logger.error(f"Error processing word timestamps: {str(e)}")
# return jsonify({
# "status": "error",
# "message": f"Error processing word timestamps: {str(e)}"
# }), 500
# @app.route("/api/word_timestamps/<video_id>")
# @login_required
# def get_word_timestamps(video_id):
# """Get word-level timestamps for a video."""
# timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
# if not os.path.exists(timestamps_file):
# logger.warning(f"Word timestamps file not found: {timestamps_file}")
# return jsonify({
# "status": "error",
# "message": "No word timestamps found for this video"
# }), 404
# try:
# with open(timestamps_file, 'r') as f:
# word_data = json.load(f)
# logger.info(f"Successfully loaded {len(word_data)} word timestamps")
# return jsonify({
# "status": "success",
# "words": word_data
# })
# except Exception as e:
# logger.error(f"Error processing word timestamps: {str(e)}")
# return jsonify({
# "status": "error",
# "message": f"Error processing word timestamps: {str(e)}"
# }), 500
# @app.route("/api/clips/<video_id>")
# @login_required
# def get_video_clips(video_id):
# """Get clips for a video."""
# try:
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
# if not os.path.exists(annotation_file):
# raise FileNotFoundError("Annotations not found")
# with open(annotation_file, 'r') as f:
# annotations = json.load(f)
# timestamps = annotations['timestamps']
# clips = []
# for i in range(len(timestamps)-1):
# clips.append({
# "index": i,
# "start": timestamps[i],
# "end": timestamps[i+1],
# "path": f"/clip/{video_id}/{i}"
# })
# return jsonify({
# "status": "success",
# "clips": clips
# })
# except Exception as e:
# logger.error(f"Error getting clips: {str(e)}")
# return jsonify({
# "status": "error",
# "message": str(e)
# }), 500
# @app.route("/clip/<video_id>/<int:clip_index>")
# @login_required
# def serve_clip(video_id, clip_index):
# """Serve a specific clip."""
# clip_path = os.path.join(
# TEMP_DIR,
# f"{video_id}_clip_{clip_index:03d}.mp4"
# )
# logger.info(f"Attempting to serve clip: {clip_path}")
# if not os.path.exists(clip_path):
# logger.error(f"Clip not found: {clip_path}")
# return jsonify({
# "status": "error",
# "message": "Clip not found"
# }), 404
# return send_file(clip_path, mimetype="video/mp4")
# @app.route("/api/save_alignments", methods=["POST"])
# @login_required
# def save_alignments():
# """Save alignment data."""
# try:
# data = request.json
# if not data or 'video_id' not in data or 'alignments' not in data:
# return jsonify({'success': False, 'message': 'Invalid data'}), 400
# # Add user information to the alignments
# for alignment in data['alignments']:
# if alignment:
# alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown')
# output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json")
# with open(output_path, "w") as f:
# json.dump(data['alignments'], f, indent=2)
# return jsonify({
# "success": True,
# "message": "Alignments saved successfully"
# })
# except Exception as e:
# logger.error(f"Error saving alignments: {str(e)}")
# return jsonify({
# "success": False,
# "message": str(e)
# }), 500
# @app.route("/api/extract_clips/<video_id>")
# @login_required
# def extract_clips_for_video(video_id):
# """Extract clips and start transcription for a video."""
# status = clip_extraction_status.get(video_id, {})
# if status.get("percent", 0) < 100:
# thread = threading.Thread(target=run_clip_extraction, args=(video_id,))
# thread.start()
# if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100:
# thread_trans = threading.Thread(target=run_transcription, args=(video_id,))
# thread_trans.start()
# return jsonify({"status": "started"})
# @app.route("/api/clip_progress/<video_id>")
# @login_required
# def clip_progress(video_id):
# """Get clip extraction progress."""
# progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0})
# return jsonify(progress)
# @app.route("/api/transcription_progress/<video_id>")
# @login_required
# def transcription_progress(video_id):
# """Get transcription progress."""
# progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0})
# return jsonify(progress)
# if __name__ == '__main__':
# try:
# # Print diagnostic information
# print("=" * 50)
# print(f"Starting app with configuration:")
# print(f"- Running in HF Space: {is_hf_space}")
# print(f"- Auth bypass: {bypass_auth}")
# print(f"- Port: {os.getenv('PORT', 5000)}")
# print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}")
# print("=" * 50)
# port = int(os.getenv('PORT', 5000))
# app.run(host='0.0.0.0', port=port, debug=True)
# except Exception as e:
# print(f"Error starting the application: {e}")
# import traceback
# traceback.print_exc()
from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session
import os, json, threading, time, signal, sys
from datetime import datetime
from extract_signed_segments_from_annotations import ClipExtractor, VideoClip
import logging
from dotenv import load_dotenv
import boto3
from botocore.exceptions import ClientError
import tempfile
import uuid
import requests
from urllib.parse import urlparse
# Load environment variables
load_dotenv()
# Add this near the top with other environment variables
bypass_auth = os.getenv('BYPASS_AUTH', 'false').lower() == 'true'
# Configure logging first
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Hugging Face specific configuration
is_hf_space = os.getenv('SPACE_ID') is not None
if is_hf_space:
logger.info("Running in Hugging Face Spaces environment")
# Allow insecure transport for development in HF
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
# Ensure port is set correctly
os.environ['PORT'] = '7860'
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing')
# Configure session for HF
if is_hf_space:
app.config['SESSION_COOKIE_SECURE'] = False
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = None # Add this line
app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours
# Directory paths
VIDEO_DIR = os.path.abspath("data/videos")
ANNOTATIONS_DIR = os.path.abspath("data/annotations")
TEMP_DIR = os.path.abspath("data/temp")
WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps")
ALIGNMENTS_DIR = os.path.abspath("data/alignments")
TRANSCRIPTS_DIR = os.path.abspath("data/transcripts")
# S3 configuration
S3_BUCKET = os.getenv('S3_BUCKET', "sorenson-ai-sb-scratch")
S3_VIDEO_PREFIX = os.getenv('S3_VIDEO_PREFIX', "awilkinson/kylie_dataset_videos_for_alignment_webapp/")
USE_S3_FOR_VIDEOS = os.getenv('USE_S3_FOR_VIDEOS', 'true').lower() == 'true'
# Ensure all required directories exist
for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]:
os.makedirs(directory, exist_ok=True)
# Global dictionaries for progress tracking
clip_extraction_status = {}
transcription_progress_status = {}
# S3 helper functions
def get_s3_client():
"""Get a boto3 S3 client."""
return boto3.client(
's3',
region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-west-2'),
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
)
def list_s3_videos():
"""List all videos in the S3 bucket with the given prefix."""
try:
s3_client = get_s3_client()
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET,
Prefix=S3_VIDEO_PREFIX
)
if 'Contents' not in response:
logger.warning(f"No videos found in S3 bucket {S3_BUCKET} with prefix {S3_VIDEO_PREFIX}")
return []
# Extract video IDs (filenames without extension) from S3 keys
videos = []
for item in response['Contents']:
key = item['Key']
if key.endswith('.mp4'):
# Extract just the filename without extension
filename = os.path.basename(key)
video_id = os.path.splitext(filename)[0]
videos.append(video_id)
return videos
except ClientError as e:
logger.error(f"Error listing videos from S3: {str(e)}")
return []
def download_video_from_s3(video_id):
"""Download a video from S3 to the local videos directory."""
video_filename = f"{video_id}.mp4"
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}"
local_path = os.path.join(VIDEO_DIR, video_filename)
# Check if the file already exists locally
if os.path.exists(local_path):
logger.info(f"Video {video_id} already exists locally.")
return local_path
try:
logger.info(f"Downloading video {video_id} from S3...")
s3_client = get_s3_client()
s3_client.download_file(S3_BUCKET, s3_key, local_path)
logger.info(f"Video {video_id} downloaded successfully to {local_path}")
return local_path
except ClientError as e:
logger.error(f"Error downloading video from S3: {str(e)}")
return None
def generate_presigned_url(video_id, expiration=3600):
"""Generate a presigned URL for direct access to the video in S3."""
video_filename = f"{video_id}.mp4"
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}"
try:
s3_client = get_s3_client()
url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': S3_BUCKET, 'Key': s3_key},
ExpiresIn=expiration
)
return url
except ClientError as e:
logger.error(f"Error generating presigned URL: {str(e)}")
return None
# Graceful shutdown handler
def graceful_shutdown(signum, frame):
"""Handle graceful shutdown on signals."""
logger.info(f"Received signal {signum}, shutting down gracefully...")
# Clean up as needed here
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)
# Login required decorator
def login_required(f):
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
logger.info(f"User not in session, redirecting to login")
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# Allow specific users (for testing)
def is_allowed_user(username):
allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username
allowed_users = [user.strip() for user in allowed_users_env.split(',')]
return username in allowed_users or not is_hf_space # Allow all users in local dev
def update_extraction_progress(video_id, current, total):
percent = int((current / total) * 100)
clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent}
def run_clip_extraction(video_id):
try:
base_dir = app.root_path
extractor = ClipExtractor(base_dir)
extractor.extract_clips_from_annotations(
video_id,
progress_callback=lambda current, total: update_extraction_progress(video_id, current, total)
)
if video_id in clip_extraction_status:
status = clip_extraction_status[video_id]
if status.get("percent", 0) < 100:
update_extraction_progress(video_id, status["total"], status["total"])
else:
update_extraction_progress(video_id, 1, 1)
except Exception as e:
logger.error(f"Error during clip extraction for {video_id}: {str(e)}")
clip_extraction_status[video_id] = {"error": str(e)}
def run_transcription(video_id):
try:
base_dir = app.root_path
output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# Check if transcription already exists and is valid.
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
logger.info(f"Using cached transcription for video {video_id}.")
transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
return
# Download video from S3 if needed
if USE_S3_FOR_VIDEOS:
video_path = download_video_from_s3(video_id)
if not video_path:
transcription_progress_status[video_id] = {
"status": "error",
"percent": 0,
"message": f"Failed to download video {video_id} from S3"
}
return
else:
video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4")
transcription_progress_status[video_id] = {"status": "started", "percent": 10}
# Check if AWS credentials are available
if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'):
logger.warning("AWS credentials not found. Transcription will not work properly.")
transcription_progress_status[video_id] = {
"status": "error",
"percent": 0,
"message": "AWS credentials missing"
}
return
# Run transcription via the imported function from get_transcription_with_amazon.py
from get_transcription_with_amazon import get_word_timestamps
word_timestamps = get_word_timestamps(video_path)
with open(output_path, "w") as f:
json.dump(word_timestamps, f, indent=4)
transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
except Exception as e:
logger.error(f"Error during transcription for {video_id}: {str(e)}")
transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)}
# Authentication routes
@app.route('/login')
def login():
"""Handle login for both local and HF environments."""
logger.info(f"Login route called. Headers: {dict(request.headers)}")
if is_hf_space:
username = request.headers.get('X-Spaces-Username')
logger.info(f"Username from headers in login: {username}")
if username and is_allowed_user(username):
session['user'] = {'name': username, 'is_hf': True}
return redirect(url_for('index'))
else:
# Redirect to the HF auth endpoint
return redirect('/auth')
else:
# For local development
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True}
return redirect(url_for('index'))
@app.route('/auth/callback')
def auth_callback():
"""This route will be called by Hugging Face after successful authentication."""
logger.info(f"Auth callback called. Headers: {dict(request.headers)}")
if is_hf_space:
# In Hugging Face Spaces, the user info is available in the request headers
username = request.headers.get('X-Spaces-Username')
if username:
session['user'] = {'name': username, 'is_hf': True}
return redirect(url_for('index'))
else:
return render_template('error.html', message="Authentication failed. No username provided.")
return redirect(url_for('login'))
@app.route('/health')
def health_check():
"""Health check endpoint for container verification."""
# Log environment variables for debugging
env_vars = {
"FLASK_ENV": os.environ.get('FLASK_ENV', 'production'),
"DEBUG": os.environ.get('DEBUG', 'Not set'),
"SPACE_ID": os.environ.get('SPACE_ID', 'Not set'),
"BYPASS_AUTH": os.environ.get('BYPASS_AUTH', 'Not set'),
"SECRET_KEY": os.environ.get('SECRET_KEY', 'Not set')[:5] + '...' if os.environ.get('SECRET_KEY') else 'Not set',
"S3_BUCKET": os.environ.get('S3_BUCKET', 'Not set'),
"S3_VIDEO_PREFIX": os.environ.get('S3_VIDEO_PREFIX', 'Not set'),
"USE_S3_FOR_VIDEOS": os.environ.get('USE_S3_FOR_VIDEOS', 'Not set')
}
logger.info(f"Health check called. Environment: {env_vars}")
# Get session information for debugging
session_info = dict(session) if session else None
session_keys = list(session.keys()) if session else []
return jsonify({
"status": "healthy",
"environment": env_vars,
"session_keys": session_keys,
"is_hf_space": is_hf_space,
"bypass_auth": bypass_auth,
"directories": {
"videos": os.path.exists(VIDEO_DIR),
"annotations": os.path.exists(ANNOTATIONS_DIR),
"temp": os.path.exists(TEMP_DIR)
}
})
@app.route('/auth')
def auth():
"""This route handles HF authentication."""
logger.info(f"Auth route called. Headers: {dict(request.headers)}")
# Force bypass auth to be true for debugging
bypass_auth = True
# If bypass is enabled, authenticate immediately
if bypass_auth:
logger.info("Auth bypass enabled, setting default user")
session['user'] = {'name': 'Perilon', 'is_hf': True}
return redirect(url_for('index'))
# Normal authentication logic
username = request.headers.get('X-Spaces-Username')
logger.info(f"Username from headers in auth: {username}")
if is_hf_space and username and is_allowed_user(username):
logger.info(f"Setting user in session: {username}")
session['user'] = {'name': username, 'is_hf': True}
return redirect(url_for('index'))
elif not is_hf_space:
# For local development
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True}
return redirect(url_for('index'))
else:
# For HF with no valid username yet
return render_template('error.html', message=
"Waiting for Hugging Face authentication. If you continue to see this message, "
"please make sure you're logged into Hugging Face and your username is allowed.")
@app.before_request
def check_auth():
"""Check authentication before processing requests."""
# Skip authentication for certain routes and static files
if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug', '/health'] or request.path.startswith('/static/'):
return
# Force bypass auth to be true for debugging
bypass_auth = True
# Log all request paths to help troubleshoot
logger.debug(f"Request path: {request.path}, User in session: {'user' in session}")
if bypass_auth:
# Set default user for bypass mode if not already set
if 'user' not in session:
session['user'] = {'name': 'Perilon', 'is_hf': True}
return
if is_hf_space:
# Check for HF username header
username = request.headers.get('X-Spaces-Username')
if 'user' in session:
logger.debug(f"User in session: {session['user']}")
return
if username and is_allowed_user(username):
logger.info(f"Setting user from headers: {username}")
session['user'] = {'name': username, 'is_hf': True}
return
# No valid user in session or headers
logger.info(f"No authenticated user, redirecting to /auth")
return redirect('/auth')
elif 'user' not in session:
return redirect(url_for('login'))
@app.route('/logout')
def logout():
"""Clear session and redirect to login."""
session.clear() # Clear the entire session
if is_hf_space:
return redirect('/auth/logout')
return redirect(url_for('login'))
@app.route('/debug')
def debug_info():
"""Return debug information."""
cookies = {key: request.cookies.get(key) for key in request.cookies.keys()}
info = {
"session": dict(session) if session else None,
"headers": dict(request.headers),
"cookies": cookies,
"is_hf_space": is_hf_space,
"allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'),
"app_config": {k: str(v) for k, v in app.config.items() if k in
['SESSION_COOKIE_SECURE', 'SESSION_COOKIE_HTTPONLY',
'SESSION_COOKIE_SAMESITE', 'PERMANENT_SESSION_LIFETIME']},
"s3_config": {
"S3_BUCKET": S3_BUCKET,
"S3_VIDEO_PREFIX": S3_VIDEO_PREFIX,
"USE_S3_FOR_VIDEOS": USE_S3_FOR_VIDEOS
}
}
return jsonify(info)
# Main application routes
@app.route('/')
@login_required
def index():
"""Main entry point, redirects to video selection."""
return redirect(url_for('select_video'))
@app.route('/select_video')
@login_required
def select_video():
"""Page to select a video for annotation."""
if USE_S3_FOR_VIDEOS:
video_ids = list_s3_videos()
else:
if not os.path.exists(VIDEO_DIR):
return render_template('error.html', message="Video directory not found.")
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')]
video_ids = [os.path.splitext(v)[0] for v in videos]
return render_template('select_video.html', video_ids=video_ids, user=session.get('user'))
@app.route('/player/<video_id>')
@login_required
def player(video_id):
"""Video player page for annotation."""
return render_template('player.html', video_id=video_id, user=session.get('user'))
@app.route('/videos')
@login_required
def get_videos():
"""API endpoint to get available videos."""
if USE_S3_FOR_VIDEOS:
videos = list_s3_videos()
if not videos:
return jsonify({'error': 'No videos found in S3'}), 404
# Return just the filenames with .mp4 extension for compatibility
return jsonify([f"{vid}.mp4" for vid in videos])
else:
# Original local file behavior
if not os.path.exists(VIDEO_DIR):
return jsonify({'error': 'Video directory not found'}), 404
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))]
if not videos:
return jsonify({'error': 'No videos found'}), 404
return jsonify(videos)
@app.route('/video/<path:filename>')
@login_required
def serve_video(filename):
"""Serve a video file from S3 or local storage."""
video_id = os.path.splitext(filename)[0] # Remove extension
if USE_S3_FOR_VIDEOS:
# Option 1: Generate a presigned URL and redirect
presigned_url = generate_presigned_url(video_id)
if presigned_url:
return redirect(presigned_url)
# Option 2 (fallback): Download from S3 to local temporary storage and serve
local_path = download_video_from_s3(video_id)
if local_path and os.path.exists(local_path):
return send_from_directory(VIDEO_DIR, filename)
return jsonify({'error': 'Video not found in S3'}), 404
else:
# Original local file behavior
if not os.path.exists(os.path.join(VIDEO_DIR, filename)):
return jsonify({'error': 'Video not found'}), 404
return send_from_directory(VIDEO_DIR, filename)
@app.route('/save_annotations', methods=['POST'])
@login_required
def save_annotations():
"""Save annotation data."""
data = request.json
if not data or 'video' not in data or 'timestamps' not in data:
return jsonify({'success': False, 'message': 'Invalid data'}), 400
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json")
annotation_data = {
"video_name": data['video'] + ".mp4",
"timestamps": sorted(data['timestamps']),
"annotation_date": datetime.now().isoformat(),
"annotated_by": session.get('user', {}).get('name', 'unknown')
}
with open(annotation_file, 'w') as f:
json.dump(annotation_data, f, indent=4)
return jsonify({'success': True, 'message': 'Annotations saved successfully'})
@app.route('/get_annotations/<path:video_name>')
@login_required
def get_annotations(video_name):
"""Get annotations for a video."""
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json")
if not os.path.exists(annotation_file):
return jsonify({'error': 'No annotations found'}), 404
with open(annotation_file, 'r') as f:
annotations = json.load(f)
return jsonify(annotations)
@app.route("/alignment/<video_id>")
@login_required
def alignment_mode(video_id):
"""Page for aligning sign language with transcribed text."""
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
if not os.path.exists(annotation_file):
return render_template("error.html", message="No annotations found for this video. Please annotate the video first.")
with open(annotation_file, 'r') as f:
annotations = json.load(f)
return render_template(
"alignment.html",
video_id=video_id,
total_clips=len(annotations['timestamps']) - 1,
user=session.get('user')
)
@app.route("/api/transcript/<video_id>")
@login_required
def get_transcript(video_id):
"""Get transcript for a video."""
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
if not os.path.exists(timestamps_file):
logger.warning(f"Word timestamps file not found: {timestamps_file}")
return jsonify({
"status": "error",
"message": "No word timestamps found for this video"
}), 404
try:
with open(timestamps_file, 'r') as f:
word_data = json.load(f)
full_text = " ".join(item["punctuated_word"] for item in word_data)
words_with_times = [{
"word": item["punctuated_word"],
"start": float(item["start_time"]),
"end": float(item["end_time"])
} for item in word_data]
logger.info(f"Successfully created transcript ({len(full_text)} characters)")
return jsonify({
"status": "success",
"text": full_text,
"words": words_with_times
})
except Exception as e:
logger.error(f"Error processing word timestamps: {str(e)}")
return jsonify({
"status": "error",
"message": f"Error processing word timestamps: {str(e)}"
}), 500
@app.route("/api/word_timestamps/<video_id>")
@login_required
def get_word_timestamps(video_id):
"""Get word-level timestamps for a video."""
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
if not os.path.exists(timestamps_file):
logger.warning(f"Word timestamps file not found: {timestamps_file}")
return jsonify({
"status": "error",
"message": "No word timestamps found for this video"
}), 404
try:
with open(timestamps_file, 'r') as f:
word_data = json.load(f)
logger.info(f"Successfully loaded {len(word_data)} word timestamps")
return jsonify({
"status": "success",
"words": word_data
})
except Exception as e:
logger.error(f"Error processing word timestamps: {str(e)}")
return jsonify({
"status": "error",
"message": f"Error processing word timestamps: {str(e)}"
}), 500
@app.route("/api/clips/<video_id>")
@login_required
def get_video_clips(video_id):
"""Get clips for a video."""
try:
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
if not os.path.exists(annotation_file):
raise FileNotFoundError("Annotations not found")
with open(annotation_file, 'r') as f:
annotations = json.load(f)
timestamps = annotations['timestamps']
clips = []
for i in range(len(timestamps)-1):
clips.append({
"index": i,
"start": timestamps[i],
"end": timestamps[i+1],
"path": f"/clip/{video_id}/{i}"
})
return jsonify({
"status": "success",
"clips": clips
})
except Exception as e:
logger.error(f"Error getting clips: {str(e)}")
return jsonify({
"status": "error",
"message": str(e)
}), 500
@app.route("/clip/<video_id>/<int:clip_index>")
@login_required
def serve_clip(video_id, clip_index):
"""Serve a specific clip."""
clip_path = os.path.join(
TEMP_DIR,
f"{video_id}_clip_{clip_index:03d}.mp4"
)
logger.info(f"Attempting to serve clip: {clip_path}")
if not os.path.exists(clip_path):
logger.error(f"Clip not found: {clip_path}")
return jsonify({
"status": "error",
"message": "Clip not found"
}), 404
return send_file(clip_path, mimetype="video/mp4")
@app.route("/api/save_alignments", methods=["POST"])
@login_required
def save_alignments():
"""Save alignment data."""
try:
data = request.json
if not data or 'video_id' not in data or 'alignments' not in data:
return jsonify({'success': False, 'message': 'Invalid data'}), 400
# Add user information to the alignments
for alignment in data['alignments']:
if alignment:
alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown')
output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json")
with open(output_path, "w") as f:
json.dump(data['alignments'], f, indent=2)
return jsonify({
"success": True,
"message": "Alignments saved successfully"
})
except Exception as e:
logger.error(f"Error saving alignments: {str(e)}")
return jsonify({
"success": False,
"message": str(e)
}), 500
@app.route("/api/extract_clips/<video_id>")
@login_required
def extract_clips_for_video(video_id):
"""Extract clips and start transcription for a video."""
# If using S3, ensure the video is downloaded first
if USE_S3_FOR_VIDEOS:
video_path = download_video_from_s3(video_id)
if not video_path:
return jsonify({
"status": "error",
"message": f"Failed to download video {video_id} from S3"
}), 404
status = clip_extraction_status.get(video_id, {})
if status.get("percent", 0) < 100:
thread = threading.Thread(target=run_clip_extraction, args=(video_id,))
thread.start()
if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100:
thread_trans = threading.Thread(target=run_transcription, args=(video_id,))
thread_trans.start()
return jsonify({"status": "started"})
@app.route("/api/clip_progress/<video_id>")
@login_required
def clip_progress(video_id):
"""Get clip extraction progress."""
progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0})
return jsonify(progress)
@app.route("/api/transcription_progress/<video_id>")
@login_required
def transcription_progress(video_id):
"""Get transcription progress."""
progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0})
return jsonify(progress)
if __name__ == '__main__':
try:
# Print diagnostic information
print("=" * 50)
print(f"Starting app with configuration:")
print(f"- Running in HF Space: {is_hf_space}")
print(f"- Auth bypass: {bypass_auth}")
print(f"- Port: {os.getenv('PORT', 5000)}")
print(f"- S3 for videos: {USE_S3_FOR_VIDEOS}")
print(f"- S3 bucket: {S3_BUCKET}")
print(f"- S3 prefix: {S3_VIDEO_PREFIX}")
print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}")
if USE_S3_FOR_VIDEOS:
try:
s3_videos = list_s3_videos()
print(f"- Available S3 videos: {s3_videos if s3_videos else 'None'}")
except Exception as e:
print(f"- Error listing S3 videos: {str(e)}")
print("=" * 50)
port = int(os.getenv('PORT', 5000))
app.run(host='0.0.0.0', port=port, debug=True)
except Exception as e:
print(f"Error starting the application: {e}")
import traceback
traceback.print_exc()