Perilon's picture
Initial commit
df66a57
raw
history blame
27.7 kB
# from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for
# import os, json, threading, time
# from datetime import datetime
# from extract_signed_segments_from_annotations import ClipExtractor, VideoClip
# import logging
# app = Flask(__name__)
# logging.basicConfig(level=logging.INFO)
# VIDEO_DIR = os.path.abspath("data/videos")
# ANNOTATIONS_DIR = os.path.abspath("data/annotations")
# TEMP_DIR = os.path.abspath("data/temp")
# WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps")
# ALIGNMENTS_DIR = os.path.abspath("data/alignments")
# TRANSCRIPTS_DIR = os.path.abspath("data/transcripts")
# # Ensure all required directories exist
# for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]:
# os.makedirs(directory, exist_ok=True)
# # Global dictionary for clip extraction progress status keyed by video_id
# clip_extraction_status = {}
# # Global dictionary for transcription progress status keyed by video_id
# transcription_progress_status = {}
# def update_extraction_progress(video_id, current, total):
# percent = int((current / total) * 100)
# clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent}
# def run_clip_extraction(video_id):
# try:
# base_dir = app.root_path
# extractor = ClipExtractor(base_dir)
# # The extractor uses f"{video_id}.mp4" for the source video and f"{video_id}_annotations.json" for annotations.
# extractor.extract_clips_from_annotations(
# video_id,
# progress_callback=lambda current, total: update_extraction_progress(video_id, current, total)
# )
# if video_id in clip_extraction_status:
# status = clip_extraction_status[video_id]
# if status.get("percent", 0) < 100:
# update_extraction_progress(video_id, status["total"], status["total"])
# else:
# update_extraction_progress(video_id, 1, 1)
# except Exception as e:
# logging.error(f"Error during clip extraction for {video_id}: {str(e)}")
# clip_extraction_status[video_id] = {"error": str(e)}
# def run_transcription(video_id):
# try:
# base_dir = app.root_path
# output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# # Check if transcription already exists and is valid.
# if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
# app.logger.info(f"Using cached transcription for video {video_id}.")
# transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
# return
# video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4")
# transcription_progress_status[video_id] = {"status": "started", "percent": 10}
# # Run transcription via the imported function from get_transcription_with_amazon.py
# from get_transcription_with_amazon import get_word_timestamps
# word_timestamps = get_word_timestamps(video_path)
# with open(output_path, "w") as f:
# json.dump(word_timestamps, f, indent=4)
# transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
# except Exception as e:
# app.logger.error(f"Error during transcription for {video_id}: {str(e)}")
# transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)}
# @app.route('/')
# def index():
# return redirect(url_for('select_video'))
# @app.route('/select_video')
# def select_video():
# if not os.path.exists(VIDEO_DIR):
# return render_template('error.html', message="Video directory not found.")
# videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')]
# video_ids = [os.path.splitext(v)[0] for v in videos]
# return render_template('select_video.html', video_ids=video_ids)
# @app.route('/player/<video_id>')
# def player(video_id):
# return render_template('player.html', video_id=video_id)
# @app.route('/videos')
# def get_videos():
# if not os.path.exists(VIDEO_DIR):
# return jsonify({'error': 'Video directory not found'}), 404
# videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))]
# if not videos:
# return jsonify({'error': 'No videos found'}), 404
# return jsonify(videos)
# @app.route('/video/<path:filename>')
# def serve_video(filename):
# if not os.path.exists(os.path.join(VIDEO_DIR, filename)):
# return jsonify({'error': 'Video not found'}), 404
# return send_from_directory(VIDEO_DIR, filename)
# @app.route('/save_annotations', methods=['POST'])
# def save_annotations():
# data = request.json
# if not data or 'video' not in data or 'timestamps' not in data:
# return jsonify({'success': False, 'message': 'Invalid data'}), 400
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json")
# annotation_data = {
# "video_name": data['video'] + ".mp4",
# "timestamps": sorted(data['timestamps']),
# "annotation_date": datetime.now().isoformat()
# }
# with open(annotation_file, 'w') as f:
# json.dump(annotation_data, f, indent=4)
# return jsonify({'success': True, 'message': 'Annotations saved successfully'})
# @app.route('/get_annotations/<path:video_name>')
# def get_annotations(video_name):
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json")
# if not os.path.exists(annotation_file):
# return jsonify({'error': 'No annotations found'}), 404
# with open(annotation_file, 'r') as f:
# annotations = json.load(f)
# return jsonify(annotations)
# @app.route("/alignment/<video_id>")
# def alignment_mode(video_id):
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
# if not os.path.exists(annotation_file):
# return render_template("error.html", message="No annotations found for this video. Please annotate the video first.")
# with open(annotation_file, 'r') as f:
# annotations = json.load(f)
# return render_template(
# "alignment.html",
# video_id=video_id,
# total_clips=len(annotations['timestamps']) - 1
# )
# @app.route("/api/transcript/<video_id>")
# def get_transcript(video_id):
# timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
# if not os.path.exists(timestamps_file):
# app.logger.warning(f"Word timestamps file not found: {timestamps_file}")
# return jsonify({
# "status": "error",
# "message": "No word timestamps found for this video"
# }), 404
# try:
# with open(timestamps_file, 'r') as f:
# word_data = json.load(f)
# full_text = " ".join(item["punctuated_word"] for item in word_data)
# words_with_times = [{
# "word": item["punctuated_word"],
# "start": float(item["start_time"]),
# "end": float(item["end_time"])
# } for item in word_data]
# app.logger.info(f"Successfully created transcript ({len(full_text)} characters)")
# return jsonify({
# "status": "success",
# "text": full_text,
# "words": words_with_times
# })
# except Exception as e:
# app.logger.error(f"Error processing word timestamps: {str(e)}")
# return jsonify({
# "status": "error",
# "message": f"Error processing word timestamps: {str(e)}"
# }), 500
# @app.route("/api/word_timestamps/<video_id>")
# def get_word_timestamps(video_id):
# timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
# if not os.path.exists(timestamps_file):
# app.logger.warning(f"Word timestamps file not found: {timestamps_file}")
# return jsonify({
# "status": "error",
# "message": "No word timestamps found for this video"
# }), 404
# try:
# with open(timestamps_file, 'r') as f:
# word_data = json.load(f)
# app.logger.info(f"Successfully loaded {len(word_data)} word timestamps")
# return jsonify({
# "status": "success",
# "words": word_data
# })
# except Exception as e:
# app.logger.error(f"Error processing word timestamps: {str(e)}")
# return jsonify({
# "status": "error",
# "message": f"Error processing word timestamps: {str(e)}"
# }), 500
# @app.route("/api/clips/<video_id>")
# def get_video_clips(video_id):
# try:
# annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
# if not os.path.exists(annotation_file):
# raise FileNotFoundError("Annotations not found")
# with open(annotation_file, 'r') as f:
# annotations = json.load(f)
# timestamps = annotations['timestamps']
# clips = []
# for i in range(len(timestamps)-1):
# clips.append({
# "index": i,
# "start": timestamps[i],
# "end": timestamps[i+1],
# "path": f"/clip/{video_id}/{i}"
# })
# return jsonify({
# "status": "success",
# "clips": clips
# })
# except Exception as e:
# app.logger.error(f"Error getting clips: {str(e)}")
# return jsonify({
# "status": "error",
# "message": str(e)
# }), 500
# @app.route("/clip/<video_id>/<int:clip_index>")
# def serve_clip(video_id, clip_index):
# clip_path = os.path.join(
# TEMP_DIR,
# f"{video_id}_clip_{clip_index:03d}.mp4"
# )
# app.logger.info(f"Attempting to serve clip: {clip_path}")
# if not os.path.exists(clip_path):
# app.logger.error(f"Clip not found: {clip_path}")
# return jsonify({
# "status": "error",
# "message": "Clip not found"
# }), 404
# return send_file(clip_path, mimetype="video/mp4")
# @app.route("/api/save_alignments", methods=["POST"])
# def save_alignments():
# try:
# data = request.json
# if not data or 'video_id' not in data or 'alignments' not in data:
# return jsonify({'success': False, 'message': 'Invalid data'}), 400
# output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json")
# with open(output_path, "w") as f:
# json.dump(data['alignments'], f, indent=2)
# return jsonify({
# "success": True,
# "message": "Alignments saved successfully"
# })
# except Exception as e:
# app.logger.error(f"Error saving alignments: {str(e)}")
# return jsonify({
# "success": False,
# "message": str(e)
# }), 500
# @app.route("/api/extract_clips/<video_id>")
# def extract_clips_for_video(video_id):
# status = clip_extraction_status.get(video_id, {})
# if status.get("percent", 0) < 100:
# thread = threading.Thread(target=run_clip_extraction, args=(video_id,))
# thread.start()
# if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100:
# thread_trans = threading.Thread(target=run_transcription, args=(video_id,))
# thread_trans.start()
# return jsonify({"status": "started"})
# @app.route("/api/clip_progress/<video_id>")
# def clip_progress(video_id):
# progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0})
# return jsonify(progress)
# @app.route("/api/transcription_progress/<video_id>")
# def transcription_progress(video_id):
# progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0})
# return jsonify(progress)
# if __name__ == '__main__':
# app.run(host='0.0.0.0', port=5000, debug=True)
from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session
import os, json, threading, time
from datetime import datetime
from extract_signed_segments_from_annotations import ClipExtractor, VideoClip
import logging
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing')
logging.basicConfig(level=logging.INFO)
# Directory paths
VIDEO_DIR = os.path.abspath("data/videos")
ANNOTATIONS_DIR = os.path.abspath("data/annotations")
TEMP_DIR = os.path.abspath("data/temp")
WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps")
ALIGNMENTS_DIR = os.path.abspath("data/alignments")
TRANSCRIPTS_DIR = os.path.abspath("data/transcripts")
# Ensure all required directories exist
for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]:
os.makedirs(directory, exist_ok=True)
# Global dictionaries for progress tracking
clip_extraction_status = {}
transcription_progress_status = {}
# Check if we're running on Hugging Face Spaces
is_hf_space = os.getenv('SPACE_ID') is not None
# Login required decorator
def login_required(f):
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# Allow specific users (for testing)
def is_allowed_user(username):
allowed_users = ['Perilon'] # Add your username for testing
return username in allowed_users or not is_hf_space # Allow all users in local dev
def update_extraction_progress(video_id, current, total):
percent = int((current / total) * 100)
clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent}
def run_clip_extraction(video_id):
try:
base_dir = app.root_path
extractor = ClipExtractor(base_dir)
extractor.extract_clips_from_annotations(
video_id,
progress_callback=lambda current, total: update_extraction_progress(video_id, current, total)
)
if video_id in clip_extraction_status:
status = clip_extraction_status[video_id]
if status.get("percent", 0) < 100:
update_extraction_progress(video_id, status["total"], status["total"])
else:
update_extraction_progress(video_id, 1, 1)
except Exception as e:
logging.error(f"Error during clip extraction for {video_id}: {str(e)}")
clip_extraction_status[video_id] = {"error": str(e)}
def run_transcription(video_id):
try:
base_dir = app.root_path
output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
# Check if transcription already exists and is valid.
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
app.logger.info(f"Using cached transcription for video {video_id}.")
transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
return
video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4")
transcription_progress_status[video_id] = {"status": "started", "percent": 10}
# Run transcription via the imported function from get_transcription_with_amazon.py
from get_transcription_with_amazon import get_word_timestamps
word_timestamps = get_word_timestamps(video_path)
with open(output_path, "w") as f:
json.dump(word_timestamps, f, indent=4)
transcription_progress_status[video_id] = {"status": "completed", "percent": 100}
except Exception as e:
app.logger.error(f"Error during transcription for {video_id}: {str(e)}")
transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)}
# Authentication routes
@app.route('/login')
def login():
if is_hf_space:
# For Hugging Face Spaces, redirect to the built-in OAuth
return redirect('/auth/login')
else:
# For local development, just set a mock user
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True}
return redirect(url_for('index'))
@app.route('/auth/callback')
def auth_callback():
# This route will be called by Hugging Face after successful authentication
if is_hf_space:
# In Hugging Face Spaces, the user info is available in the request headers
username = request.headers.get('X-Spaces-Username')
if username:
session['user'] = {'name': username, 'is_hf': True}
return redirect(url_for('index'))
else:
return render_template('error.html', message="Authentication failed. No username provided.")
return redirect(url_for('login'))
@app.route('/auth')
def auth():
# This route will be handled by Hugging Face Spaces when deployed
# For local development, we'll just redirect to index
if not is_hf_space:
session['user'] = {'name': 'Perilon', 'is_mock': True}
return redirect(url_for('index'))
@app.before_request
def check_auth():
# Skip authentication for login/logout routes
if request.path in ['/login', '/logout', '/auth/callback'] or request.path.startswith('/static/'):
return
# In Hugging Face Spaces, check the username header
if is_hf_space:
username = request.headers.get('X-Spaces-Username')
if username and is_allowed_user(username):
# Update the session with the current user
if 'user' not in session or session['user'].get('name') != username:
session['user'] = {'name': username, 'is_hf': True}
elif 'user' not in session:
return redirect(url_for('login'))
# For local development, we already set a mock user in the login route
elif 'user' not in session:
return redirect(url_for('login'))
@app.route('/logout')
def logout():
session.clear() # Clear the entire session
if is_hf_space:
return redirect('/auth/logout')
return redirect(url_for('login'))
# Main application routes
@app.route('/')
@login_required
def index():
return redirect(url_for('select_video'))
@app.route('/select_video')
@login_required
def select_video():
if not os.path.exists(VIDEO_DIR):
return render_template('error.html', message="Video directory not found.")
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')]
video_ids = [os.path.splitext(v)[0] for v in videos]
return render_template('select_video.html', video_ids=video_ids, user=session.get('user'))
@app.route('/player/<video_id>')
@login_required
def player(video_id):
return render_template('player.html', video_id=video_id, user=session.get('user'))
@app.route('/videos')
@login_required
def get_videos():
if not os.path.exists(VIDEO_DIR):
return jsonify({'error': 'Video directory not found'}), 404
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))]
if not videos:
return jsonify({'error': 'No videos found'}), 404
return jsonify(videos)
@app.route('/video/<path:filename>')
@login_required
def serve_video(filename):
if not os.path.exists(os.path.join(VIDEO_DIR, filename)):
return jsonify({'error': 'Video not found'}), 404
return send_from_directory(VIDEO_DIR, filename)
@app.route('/save_annotations', methods=['POST'])
@login_required
def save_annotations():
data = request.json
if not data or 'video' not in data or 'timestamps' not in data:
return jsonify({'success': False, 'message': 'Invalid data'}), 400
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json")
annotation_data = {
"video_name": data['video'] + ".mp4",
"timestamps": sorted(data['timestamps']),
"annotation_date": datetime.now().isoformat(),
"annotated_by": session.get('user', {}).get('name', 'unknown')
}
with open(annotation_file, 'w') as f:
json.dump(annotation_data, f, indent=4)
return jsonify({'success': True, 'message': 'Annotations saved successfully'})
@app.route('/get_annotations/<path:video_name>')
@login_required
def get_annotations(video_name):
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json")
if not os.path.exists(annotation_file):
return jsonify({'error': 'No annotations found'}), 404
with open(annotation_file, 'r') as f:
annotations = json.load(f)
return jsonify(annotations)
@app.route("/alignment/<video_id>")
@login_required
def alignment_mode(video_id):
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
if not os.path.exists(annotation_file):
return render_template("error.html", message="No annotations found for this video. Please annotate the video first.")
with open(annotation_file, 'r') as f:
annotations = json.load(f)
return render_template(
"alignment.html",
video_id=video_id,
total_clips=len(annotations['timestamps']) - 1,
user=session.get('user')
)
@app.route("/api/transcript/<video_id>")
@login_required
def get_transcript(video_id):
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
if not os.path.exists(timestamps_file):
app.logger.warning(f"Word timestamps file not found: {timestamps_file}")
return jsonify({
"status": "error",
"message": "No word timestamps found for this video"
}), 404
try:
with open(timestamps_file, 'r') as f:
word_data = json.load(f)
full_text = " ".join(item["punctuated_word"] for item in word_data)
words_with_times = [{
"word": item["punctuated_word"],
"start": float(item["start_time"]),
"end": float(item["end_time"])
} for item in word_data]
app.logger.info(f"Successfully created transcript ({len(full_text)} characters)")
return jsonify({
"status": "success",
"text": full_text,
"words": words_with_times
})
except Exception as e:
app.logger.error(f"Error processing word timestamps: {str(e)}")
return jsonify({
"status": "error",
"message": f"Error processing word timestamps: {str(e)}"
}), 500
@app.route("/api/word_timestamps/<video_id>")
@login_required
def get_word_timestamps(video_id):
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json")
app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}")
if not os.path.exists(timestamps_file):
app.logger.warning(f"Word timestamps file not found: {timestamps_file}")
return jsonify({
"status": "error",
"message": "No word timestamps found for this video"
}), 404
try:
with open(timestamps_file, 'r') as f:
word_data = json.load(f)
app.logger.info(f"Successfully loaded {len(word_data)} word timestamps")
return jsonify({
"status": "success",
"words": word_data
})
except Exception as e:
app.logger.error(f"Error processing word timestamps: {str(e)}")
return jsonify({
"status": "error",
"message": f"Error processing word timestamps: {str(e)}"
}), 500
@app.route("/api/clips/<video_id>")
@login_required
def get_video_clips(video_id):
try:
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json")
if not os.path.exists(annotation_file):
raise FileNotFoundError("Annotations not found")
with open(annotation_file, 'r') as f:
annotations = json.load(f)
timestamps = annotations['timestamps']
clips = []
for i in range(len(timestamps)-1):
clips.append({
"index": i,
"start": timestamps[i],
"end": timestamps[i+1],
"path": f"/clip/{video_id}/{i}"
})
return jsonify({
"status": "success",
"clips": clips
})
except Exception as e:
app.logger.error(f"Error getting clips: {str(e)}")
return jsonify({
"status": "error",
"message": str(e)
}), 500
@app.route("/clip/<video_id>/<int:clip_index>")
@login_required
def serve_clip(video_id, clip_index):
clip_path = os.path.join(
TEMP_DIR,
f"{video_id}_clip_{clip_index:03d}.mp4"
)
app.logger.info(f"Attempting to serve clip: {clip_path}")
if not os.path.exists(clip_path):
app.logger.error(f"Clip not found: {clip_path}")
return jsonify({
"status": "error",
"message": "Clip not found"
}), 404
return send_file(clip_path, mimetype="video/mp4")
@app.route("/api/save_alignments", methods=["POST"])
@login_required
def save_alignments():
try:
data = request.json
if not data or 'video_id' not in data or 'alignments' not in data:
return jsonify({'success': False, 'message': 'Invalid data'}), 400
# Add user information to the alignments
for alignment in data['alignments']:
if alignment:
alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown')
output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json")
with open(output_path, "w") as f:
json.dump(data['alignments'], f, indent=2)
return jsonify({
"success": True,
"message": "Alignments saved successfully"
})
except Exception as e:
app.logger.error(f"Error saving alignments: {str(e)}")
return jsonify({
"success": False,
"message": str(e)
}), 500
@app.route("/api/extract_clips/<video_id>")
@login_required
def extract_clips_for_video(video_id):
status = clip_extraction_status.get(video_id, {})
if status.get("percent", 0) < 100:
thread = threading.Thread(target=run_clip_extraction, args=(video_id,))
thread.start()
if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100:
thread_trans = threading.Thread(target=run_transcription, args=(video_id,))
thread_trans.start()
return jsonify({"status": "started"})
@app.route("/api/clip_progress/<video_id>")
@login_required
def clip_progress(video_id):
progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0})
return jsonify(progress)
@app.route("/api/transcription_progress/<video_id>")
@login_required
def transcription_progress(video_id):
progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0})
return jsonify(progress)
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000))
app.run(host='0.0.0.0', port=port, debug=True)