# from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for # import os, json, threading, time # from datetime import datetime # from extract_signed_segments_from_annotations import ClipExtractor, VideoClip # import logging # app = Flask(__name__) # logging.basicConfig(level=logging.INFO) # VIDEO_DIR = os.path.abspath("data/videos") # ANNOTATIONS_DIR = os.path.abspath("data/annotations") # TEMP_DIR = os.path.abspath("data/temp") # WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") # ALIGNMENTS_DIR = os.path.abspath("data/alignments") # TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") # # Ensure all required directories exist # for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: # os.makedirs(directory, exist_ok=True) # # Global dictionary for clip extraction progress status keyed by video_id # clip_extraction_status = {} # # Global dictionary for transcription progress status keyed by video_id # transcription_progress_status = {} # def update_extraction_progress(video_id, current, total): # percent = int((current / total) * 100) # clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} # def run_clip_extraction(video_id): # try: # base_dir = app.root_path # extractor = ClipExtractor(base_dir) # # The extractor uses f"{video_id}.mp4" for the source video and f"{video_id}_annotations.json" for annotations. # extractor.extract_clips_from_annotations( # video_id, # progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) # ) # if video_id in clip_extraction_status: # status = clip_extraction_status[video_id] # if status.get("percent", 0) < 100: # update_extraction_progress(video_id, status["total"], status["total"]) # else: # update_extraction_progress(video_id, 1, 1) # except Exception as e: # logging.error(f"Error during clip extraction for {video_id}: {str(e)}") # clip_extraction_status[video_id] = {"error": str(e)} # def run_transcription(video_id): # try: # base_dir = app.root_path # output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # # Check if transcription already exists and is valid. # if os.path.exists(output_path) and os.path.getsize(output_path) > 0: # app.logger.info(f"Using cached transcription for video {video_id}.") # transcription_progress_status[video_id] = {"status": "completed", "percent": 100} # return # video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") # transcription_progress_status[video_id] = {"status": "started", "percent": 10} # # Run transcription via the imported function from get_transcription_with_amazon.py # from get_transcription_with_amazon import get_word_timestamps # word_timestamps = get_word_timestamps(video_path) # with open(output_path, "w") as f: # json.dump(word_timestamps, f, indent=4) # transcription_progress_status[video_id] = {"status": "completed", "percent": 100} # except Exception as e: # app.logger.error(f"Error during transcription for {video_id}: {str(e)}") # transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} # @app.route('/') # def index(): # return redirect(url_for('select_video')) # @app.route('/select_video') # def select_video(): # if not os.path.exists(VIDEO_DIR): # return render_template('error.html', message="Video directory not found.") # videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] # video_ids = [os.path.splitext(v)[0] for v in videos] # return render_template('select_video.html', video_ids=video_ids) # @app.route('/player/') # def player(video_id): # return render_template('player.html', video_id=video_id) # @app.route('/videos') # def get_videos(): # if not os.path.exists(VIDEO_DIR): # return jsonify({'error': 'Video directory not found'}), 404 # videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] # if not videos: # return jsonify({'error': 'No videos found'}), 404 # return jsonify(videos) # @app.route('/video/') # def serve_video(filename): # if not os.path.exists(os.path.join(VIDEO_DIR, filename)): # return jsonify({'error': 'Video not found'}), 404 # return send_from_directory(VIDEO_DIR, filename) # @app.route('/save_annotations', methods=['POST']) # def save_annotations(): # data = request.json # if not data or 'video' not in data or 'timestamps' not in data: # return jsonify({'success': False, 'message': 'Invalid data'}), 400 # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") # annotation_data = { # "video_name": data['video'] + ".mp4", # "timestamps": sorted(data['timestamps']), # "annotation_date": datetime.now().isoformat() # } # with open(annotation_file, 'w') as f: # json.dump(annotation_data, f, indent=4) # return jsonify({'success': True, 'message': 'Annotations saved successfully'}) # @app.route('/get_annotations/') # def get_annotations(video_name): # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") # if not os.path.exists(annotation_file): # return jsonify({'error': 'No annotations found'}), 404 # with open(annotation_file, 'r') as f: # annotations = json.load(f) # return jsonify(annotations) # @app.route("/alignment/") # def alignment_mode(video_id): # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") # if not os.path.exists(annotation_file): # return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") # with open(annotation_file, 'r') as f: # annotations = json.load(f) # return render_template( # "alignment.html", # video_id=video_id, # total_clips=len(annotations['timestamps']) - 1 # ) # @app.route("/api/transcript/") # def get_transcript(video_id): # timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") # if not os.path.exists(timestamps_file): # app.logger.warning(f"Word timestamps file not found: {timestamps_file}") # return jsonify({ # "status": "error", # "message": "No word timestamps found for this video" # }), 404 # try: # with open(timestamps_file, 'r') as f: # word_data = json.load(f) # full_text = " ".join(item["punctuated_word"] for item in word_data) # words_with_times = [{ # "word": item["punctuated_word"], # "start": float(item["start_time"]), # "end": float(item["end_time"]) # } for item in word_data] # app.logger.info(f"Successfully created transcript ({len(full_text)} characters)") # return jsonify({ # "status": "success", # "text": full_text, # "words": words_with_times # }) # except Exception as e: # app.logger.error(f"Error processing word timestamps: {str(e)}") # return jsonify({ # "status": "error", # "message": f"Error processing word timestamps: {str(e)}" # }), 500 # @app.route("/api/word_timestamps/") # def get_word_timestamps(video_id): # timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") # if not os.path.exists(timestamps_file): # app.logger.warning(f"Word timestamps file not found: {timestamps_file}") # return jsonify({ # "status": "error", # "message": "No word timestamps found for this video" # }), 404 # try: # with open(timestamps_file, 'r') as f: # word_data = json.load(f) # app.logger.info(f"Successfully loaded {len(word_data)} word timestamps") # return jsonify({ # "status": "success", # "words": word_data # }) # except Exception as e: # app.logger.error(f"Error processing word timestamps: {str(e)}") # return jsonify({ # "status": "error", # "message": f"Error processing word timestamps: {str(e)}" # }), 500 # @app.route("/api/clips/") # def get_video_clips(video_id): # try: # annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") # if not os.path.exists(annotation_file): # raise FileNotFoundError("Annotations not found") # with open(annotation_file, 'r') as f: # annotations = json.load(f) # timestamps = annotations['timestamps'] # clips = [] # for i in range(len(timestamps)-1): # clips.append({ # "index": i, # "start": timestamps[i], # "end": timestamps[i+1], # "path": f"/clip/{video_id}/{i}" # }) # return jsonify({ # "status": "success", # "clips": clips # }) # except Exception as e: # app.logger.error(f"Error getting clips: {str(e)}") # return jsonify({ # "status": "error", # "message": str(e) # }), 500 # @app.route("/clip//") # def serve_clip(video_id, clip_index): # clip_path = os.path.join( # TEMP_DIR, # f"{video_id}_clip_{clip_index:03d}.mp4" # ) # app.logger.info(f"Attempting to serve clip: {clip_path}") # if not os.path.exists(clip_path): # app.logger.error(f"Clip not found: {clip_path}") # return jsonify({ # "status": "error", # "message": "Clip not found" # }), 404 # return send_file(clip_path, mimetype="video/mp4") # @app.route("/api/save_alignments", methods=["POST"]) # def save_alignments(): # try: # data = request.json # if not data or 'video_id' not in data or 'alignments' not in data: # return jsonify({'success': False, 'message': 'Invalid data'}), 400 # output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") # with open(output_path, "w") as f: # json.dump(data['alignments'], f, indent=2) # return jsonify({ # "success": True, # "message": "Alignments saved successfully" # }) # except Exception as e: # app.logger.error(f"Error saving alignments: {str(e)}") # return jsonify({ # "success": False, # "message": str(e) # }), 500 # @app.route("/api/extract_clips/") # def extract_clips_for_video(video_id): # status = clip_extraction_status.get(video_id, {}) # if status.get("percent", 0) < 100: # thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) # thread.start() # if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: # thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) # thread_trans.start() # return jsonify({"status": "started"}) # @app.route("/api/clip_progress/") # def clip_progress(video_id): # progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) # return jsonify(progress) # @app.route("/api/transcription_progress/") # def transcription_progress(video_id): # progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) # return jsonify(progress) # if __name__ == '__main__': # app.run(host='0.0.0.0', port=5000, debug=True) from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session import os, json, threading, time from datetime import datetime from extract_signed_segments_from_annotations import ClipExtractor, VideoClip import logging from dotenv import load_dotenv # Load environment variables load_dotenv() app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') logging.basicConfig(level=logging.INFO) # Directory paths VIDEO_DIR = os.path.abspath("data/videos") ANNOTATIONS_DIR = os.path.abspath("data/annotations") TEMP_DIR = os.path.abspath("data/temp") WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") ALIGNMENTS_DIR = os.path.abspath("data/alignments") TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") # Ensure all required directories exist for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: os.makedirs(directory, exist_ok=True) # Global dictionaries for progress tracking clip_extraction_status = {} transcription_progress_status = {} # Check if we're running on Hugging Face Spaces is_hf_space = os.getenv('SPACE_ID') is not None # Login required decorator def login_required(f): from functools import wraps @wraps(f) def decorated_function(*args, **kwargs): if 'user' not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Allow specific users (for testing) def is_allowed_user(username): allowed_users = ['Perilon'] # Add your username for testing return username in allowed_users or not is_hf_space # Allow all users in local dev def update_extraction_progress(video_id, current, total): percent = int((current / total) * 100) clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} def run_clip_extraction(video_id): try: base_dir = app.root_path extractor = ClipExtractor(base_dir) extractor.extract_clips_from_annotations( video_id, progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) ) if video_id in clip_extraction_status: status = clip_extraction_status[video_id] if status.get("percent", 0) < 100: update_extraction_progress(video_id, status["total"], status["total"]) else: update_extraction_progress(video_id, 1, 1) except Exception as e: logging.error(f"Error during clip extraction for {video_id}: {str(e)}") clip_extraction_status[video_id] = {"error": str(e)} def run_transcription(video_id): try: base_dir = app.root_path output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # Check if transcription already exists and is valid. if os.path.exists(output_path) and os.path.getsize(output_path) > 0: app.logger.info(f"Using cached transcription for video {video_id}.") transcription_progress_status[video_id] = {"status": "completed", "percent": 100} return video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") transcription_progress_status[video_id] = {"status": "started", "percent": 10} # Run transcription via the imported function from get_transcription_with_amazon.py from get_transcription_with_amazon import get_word_timestamps word_timestamps = get_word_timestamps(video_path) with open(output_path, "w") as f: json.dump(word_timestamps, f, indent=4) transcription_progress_status[video_id] = {"status": "completed", "percent": 100} except Exception as e: app.logger.error(f"Error during transcription for {video_id}: {str(e)}") transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} # Authentication routes @app.route('/login') def login(): if is_hf_space: # For Hugging Face Spaces, redirect to the built-in OAuth return redirect('/auth/login') else: # For local development, just set a mock user session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} return redirect(url_for('index')) @app.route('/auth/callback') def auth_callback(): # This route will be called by Hugging Face after successful authentication if is_hf_space: # In Hugging Face Spaces, the user info is available in the request headers username = request.headers.get('X-Spaces-Username') if username: session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) else: return render_template('error.html', message="Authentication failed. No username provided.") return redirect(url_for('login')) @app.route('/auth') def auth(): # This route will be handled by Hugging Face Spaces when deployed # For local development, we'll just redirect to index if not is_hf_space: session['user'] = {'name': 'Perilon', 'is_mock': True} return redirect(url_for('index')) @app.before_request def check_auth(): # Skip authentication for login/logout routes if request.path in ['/login', '/logout', '/auth/callback'] or request.path.startswith('/static/'): return # In Hugging Face Spaces, check the username header if is_hf_space: username = request.headers.get('X-Spaces-Username') if username and is_allowed_user(username): # Update the session with the current user if 'user' not in session or session['user'].get('name') != username: session['user'] = {'name': username, 'is_hf': True} elif 'user' not in session: return redirect(url_for('login')) # For local development, we already set a mock user in the login route elif 'user' not in session: return redirect(url_for('login')) @app.route('/logout') def logout(): session.clear() # Clear the entire session if is_hf_space: return redirect('/auth/logout') return redirect(url_for('login')) # Main application routes @app.route('/') @login_required def index(): return redirect(url_for('select_video')) @app.route('/select_video') @login_required def select_video(): if not os.path.exists(VIDEO_DIR): return render_template('error.html', message="Video directory not found.") videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] video_ids = [os.path.splitext(v)[0] for v in videos] return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) @app.route('/player/') @login_required def player(video_id): return render_template('player.html', video_id=video_id, user=session.get('user')) @app.route('/videos') @login_required def get_videos(): if not os.path.exists(VIDEO_DIR): return jsonify({'error': 'Video directory not found'}), 404 videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] if not videos: return jsonify({'error': 'No videos found'}), 404 return jsonify(videos) @app.route('/video/') @login_required def serve_video(filename): if not os.path.exists(os.path.join(VIDEO_DIR, filename)): return jsonify({'error': 'Video not found'}), 404 return send_from_directory(VIDEO_DIR, filename) @app.route('/save_annotations', methods=['POST']) @login_required def save_annotations(): data = request.json if not data or 'video' not in data or 'timestamps' not in data: return jsonify({'success': False, 'message': 'Invalid data'}), 400 annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") annotation_data = { "video_name": data['video'] + ".mp4", "timestamps": sorted(data['timestamps']), "annotation_date": datetime.now().isoformat(), "annotated_by": session.get('user', {}).get('name', 'unknown') } with open(annotation_file, 'w') as f: json.dump(annotation_data, f, indent=4) return jsonify({'success': True, 'message': 'Annotations saved successfully'}) @app.route('/get_annotations/') @login_required def get_annotations(video_name): annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") if not os.path.exists(annotation_file): return jsonify({'error': 'No annotations found'}), 404 with open(annotation_file, 'r') as f: annotations = json.load(f) return jsonify(annotations) @app.route("/alignment/") @login_required def alignment_mode(video_id): annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") if not os.path.exists(annotation_file): return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") with open(annotation_file, 'r') as f: annotations = json.load(f) return render_template( "alignment.html", video_id=video_id, total_clips=len(annotations['timestamps']) - 1, user=session.get('user') ) @app.route("/api/transcript/") @login_required def get_transcript(video_id): timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") if not os.path.exists(timestamps_file): app.logger.warning(f"Word timestamps file not found: {timestamps_file}") return jsonify({ "status": "error", "message": "No word timestamps found for this video" }), 404 try: with open(timestamps_file, 'r') as f: word_data = json.load(f) full_text = " ".join(item["punctuated_word"] for item in word_data) words_with_times = [{ "word": item["punctuated_word"], "start": float(item["start_time"]), "end": float(item["end_time"]) } for item in word_data] app.logger.info(f"Successfully created transcript ({len(full_text)} characters)") return jsonify({ "status": "success", "text": full_text, "words": words_with_times }) except Exception as e: app.logger.error(f"Error processing word timestamps: {str(e)}") return jsonify({ "status": "error", "message": f"Error processing word timestamps: {str(e)}" }), 500 @app.route("/api/word_timestamps/") @login_required def get_word_timestamps(video_id): timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") if not os.path.exists(timestamps_file): app.logger.warning(f"Word timestamps file not found: {timestamps_file}") return jsonify({ "status": "error", "message": "No word timestamps found for this video" }), 404 try: with open(timestamps_file, 'r') as f: word_data = json.load(f) app.logger.info(f"Successfully loaded {len(word_data)} word timestamps") return jsonify({ "status": "success", "words": word_data }) except Exception as e: app.logger.error(f"Error processing word timestamps: {str(e)}") return jsonify({ "status": "error", "message": f"Error processing word timestamps: {str(e)}" }), 500 @app.route("/api/clips/") @login_required def get_video_clips(video_id): try: annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") if not os.path.exists(annotation_file): raise FileNotFoundError("Annotations not found") with open(annotation_file, 'r') as f: annotations = json.load(f) timestamps = annotations['timestamps'] clips = [] for i in range(len(timestamps)-1): clips.append({ "index": i, "start": timestamps[i], "end": timestamps[i+1], "path": f"/clip/{video_id}/{i}" }) return jsonify({ "status": "success", "clips": clips }) except Exception as e: app.logger.error(f"Error getting clips: {str(e)}") return jsonify({ "status": "error", "message": str(e) }), 500 @app.route("/clip//") @login_required def serve_clip(video_id, clip_index): clip_path = os.path.join( TEMP_DIR, f"{video_id}_clip_{clip_index:03d}.mp4" ) app.logger.info(f"Attempting to serve clip: {clip_path}") if not os.path.exists(clip_path): app.logger.error(f"Clip not found: {clip_path}") return jsonify({ "status": "error", "message": "Clip not found" }), 404 return send_file(clip_path, mimetype="video/mp4") @app.route("/api/save_alignments", methods=["POST"]) @login_required def save_alignments(): try: data = request.json if not data or 'video_id' not in data or 'alignments' not in data: return jsonify({'success': False, 'message': 'Invalid data'}), 400 # Add user information to the alignments for alignment in data['alignments']: if alignment: alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") with open(output_path, "w") as f: json.dump(data['alignments'], f, indent=2) return jsonify({ "success": True, "message": "Alignments saved successfully" }) except Exception as e: app.logger.error(f"Error saving alignments: {str(e)}") return jsonify({ "success": False, "message": str(e) }), 500 @app.route("/api/extract_clips/") @login_required def extract_clips_for_video(video_id): status = clip_extraction_status.get(video_id, {}) if status.get("percent", 0) < 100: thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) thread.start() if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) thread_trans.start() return jsonify({"status": "started"}) @app.route("/api/clip_progress/") @login_required def clip_progress(video_id): progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) return jsonify(progress) @app.route("/api/transcription_progress/") @login_required def transcription_progress(video_id): progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) return jsonify(progress) if __name__ == '__main__': port = int(os.getenv('PORT', 5000)) app.run(host='0.0.0.0', port=port, debug=True)