|
import boto3 |
|
import json |
|
import logging |
|
import os |
|
import platform |
|
import requests |
|
import signal |
|
import sys |
|
import tempfile |
|
import threading |
|
import time |
|
import uuid |
|
from botocore.exceptions import ClientError |
|
from datetime import datetime |
|
from dotenv import load_dotenv |
|
from extract_signed_segments_from_annotations import ClipExtractor, VideoClip |
|
from flask import Flask, jsonify, redirect, render_template, request, send_file, send_from_directory, session, url_for |
|
from typing import Any, Dict, List, Optional |
|
from urllib.parse import urlparse |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
bypass_auth = os.getenv("BYPASS_AUTH", "false").lower() == "true" |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
is_hf_space = os.getenv("SPACE_ID") is not None |
|
if is_hf_space: |
|
logger.info("Running in Hugging Face Spaces environment") |
|
|
|
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" |
|
|
|
os.environ["PORT"] = "7860" |
|
|
|
|
|
print("=" * 50) |
|
print(f"Python version: {sys.version}") |
|
print(f"Platform: {platform.platform()}") |
|
print(f"Current directory: {os.getcwd()}") |
|
print(f"Directory contents: {os.listdir('.')}") |
|
print("=" * 50) |
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.getenv("SECRET_KEY", "dev_key_for_testing") |
|
|
|
|
|
if is_hf_space: |
|
app.config["SESSION_COOKIE_SECURE"] = False |
|
app.config["SESSION_COOKIE_HTTPONLY"] = True |
|
app.config["SESSION_COOKIE_SAMESITE"] = None |
|
app.config["PERMANENT_SESSION_LIFETIME"] = 86400 |
|
|
|
|
|
VIDEO_DIR = os.path.abspath("data/videos") |
|
ANNOTATIONS_DIR = os.path.abspath("data/annotations") |
|
TEMP_DIR = os.path.abspath("data/temp") |
|
WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") |
|
ALIGNMENTS_DIR = os.path.abspath("data/alignments") |
|
TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") |
|
|
|
|
|
S3_BUCKET = os.getenv("S3_BUCKET", "sorenson-ai-sb-scratch") |
|
S3_VIDEO_PREFIX = os.getenv("S3_VIDEO_PREFIX", "awilkinson/kylie_dataset_videos_for_alignment_webapp/") |
|
USE_S3_FOR_VIDEOS = os.getenv("USE_S3_FOR_VIDEOS", "true").lower() == "true" |
|
|
|
|
|
for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: |
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
clip_extraction_status = {} |
|
transcription_progress_status = {} |
|
|
|
|
|
|
|
def get_s3_client(): |
|
"""Get a boto3 S3 client.""" |
|
return boto3.client( |
|
"s3", |
|
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-west-2"), |
|
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"), |
|
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY") |
|
) |
|
|
|
|
|
def list_s3_videos() -> List[str]: |
|
"""List all videos in the S3 bucket with the given prefix.""" |
|
if not os.environ.get("AWS_ACCESS_KEY_ID") or not os.environ.get("AWS_SECRET_ACCESS_KEY"): |
|
logger.warning("AWS credentials not found. Returning empty video list.") |
|
return [] |
|
|
|
try: |
|
s3_client = get_s3_client() |
|
response = s3_client.list_objects_v2( |
|
Bucket=S3_BUCKET, |
|
Prefix=S3_VIDEO_PREFIX |
|
) |
|
|
|
if "Contents" not in response: |
|
logger.warning(f"No videos found in S3 bucket {S3_BUCKET} with prefix {S3_VIDEO_PREFIX}") |
|
return [] |
|
|
|
|
|
videos = [] |
|
for item in response["Contents"]: |
|
key = item["Key"] |
|
if key.endswith(".mp4"): |
|
|
|
filename = os.path.basename(key) |
|
video_id = os.path.splitext(filename)[0] |
|
videos.append(video_id) |
|
|
|
return videos |
|
except Exception as e: |
|
logger.error(f"Error listing S3 videos: {str(e)}") |
|
return [] |
|
|
|
|
|
def download_video_from_s3(video_id: str) -> Optional[str]: |
|
"""Download a video from S3 to the local videos directory.""" |
|
video_filename = f"{video_id}.mp4" |
|
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" |
|
local_path = os.path.join(VIDEO_DIR, video_filename) |
|
|
|
|
|
if os.path.exists(local_path): |
|
logger.info(f"Video {video_id} already exists locally.") |
|
return local_path |
|
|
|
try: |
|
logger.info(f"Downloading video {video_id} from S3...") |
|
s3_client = get_s3_client() |
|
s3_client.download_file(S3_BUCKET, s3_key, local_path) |
|
logger.info(f"Video {video_id} downloaded successfully to {local_path}") |
|
return local_path |
|
except ClientError as e: |
|
logger.error(f"Error downloading video from S3: {str(e)}") |
|
return None |
|
|
|
|
|
def generate_presigned_url(video_id: str, expiration: int = 3600) -> Optional[str]: |
|
"""Generate a presigned URL for direct access to the video in S3.""" |
|
video_filename = f"{video_id}.mp4" |
|
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" |
|
|
|
try: |
|
s3_client = get_s3_client() |
|
url = s3_client.generate_presigned_url( |
|
"get_object", |
|
Params={"Bucket": S3_BUCKET, "Key": s3_key}, |
|
ExpiresIn=expiration |
|
) |
|
return url |
|
except ClientError as e: |
|
logger.error(f"Error generating presigned URL: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
def graceful_shutdown(signum, frame): |
|
"""Handle graceful shutdown on signals.""" |
|
logger.info(f"Received signal {signum}, shutting down gracefully...") |
|
|
|
sys.exit(0) |
|
|
|
|
|
|
|
signal.signal(signal.SIGTERM, graceful_shutdown) |
|
signal.signal(signal.SIGINT, graceful_shutdown) |
|
|
|
|
|
|
|
def login_required(f): |
|
from functools import wraps |
|
@wraps(f) |
|
def decorated_function(*args, **kwargs): |
|
if "user" not in session: |
|
logger.info(f"User not in session, redirecting to login") |
|
return redirect(url_for("login")) |
|
return f(*args, **kwargs) |
|
return decorated_function |
|
|
|
|
|
|
|
def is_allowed_user(username: str) -> bool: |
|
allowed_users_env = os.getenv("ALLOWED_USERS", "Perilon") |
|
allowed_users = [user.strip() for user in allowed_users_env.split(",")] |
|
return username in allowed_users or not is_hf_space |
|
|
|
|
|
def update_extraction_progress(video_id: str, current: int, total: int) -> None: |
|
percent = int((current / total) * 100) |
|
clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} |
|
|
|
|
|
def run_clip_extraction(video_id: str) -> None: |
|
try: |
|
base_dir = app.root_path |
|
extractor = ClipExtractor(base_dir) |
|
extractor.extract_clips_from_annotations( |
|
video_id, |
|
progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) |
|
) |
|
if video_id in clip_extraction_status: |
|
status = clip_extraction_status[video_id] |
|
if status.get("percent", 0) < 100: |
|
update_extraction_progress(video_id, status["total"], status["total"]) |
|
else: |
|
update_extraction_progress(video_id, 1, 1) |
|
except Exception as e: |
|
logger.error(f"Error during clip extraction for {video_id}: {str(e)}") |
|
clip_extraction_status[video_id] = {"error": str(e)} |
|
|
|
|
|
def run_transcription(video_id: str) -> None: |
|
try: |
|
base_dir = app.root_path |
|
output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
|
|
|
|
|
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
|
logger.info(f"Using cached transcription for video {video_id}.") |
|
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} |
|
return |
|
|
|
|
|
if USE_S3_FOR_VIDEOS: |
|
video_path = download_video_from_s3(video_id) |
|
if not video_path: |
|
transcription_progress_status[video_id] = { |
|
"status": "error", |
|
"percent": 0, |
|
"message": f"Failed to download video {video_id} from S3" |
|
} |
|
return |
|
else: |
|
video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") |
|
|
|
transcription_progress_status[video_id] = {"status": "started", "percent": 10} |
|
|
|
|
|
if not os.environ.get("AWS_ACCESS_KEY_ID") or not os.environ.get("AWS_SECRET_ACCESS_KEY"): |
|
logger.warning("AWS credentials not found. Transcription will not work properly.") |
|
transcription_progress_status[video_id] = { |
|
"status": "error", |
|
"percent": 0, |
|
"message": "AWS credentials missing" |
|
} |
|
return |
|
|
|
|
|
from get_transcription_with_amazon import get_word_timestamps |
|
word_timestamps = get_word_timestamps(video_path) |
|
|
|
with open(output_path, "w") as f: |
|
json.dump(word_timestamps, f, indent=4) |
|
|
|
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} |
|
except Exception as e: |
|
logger.error(f"Error during transcription for {video_id}: {str(e)}") |
|
transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} |
|
|
|
|
|
|
|
@app.route("/login") |
|
def login(): |
|
"""Handle login for both local and HF environments.""" |
|
logger.info(f"Login route called. Headers: {dict(request.headers)}") |
|
|
|
if is_hf_space: |
|
username = request.headers.get("X-Spaces-Username") |
|
logger.info(f"Username from headers in login: {username}") |
|
|
|
if username and is_allowed_user(username): |
|
session["user"] = {"name": username, "is_hf": True} |
|
return redirect(url_for("index")) |
|
else: |
|
|
|
return redirect("/auth") |
|
else: |
|
|
|
session["user"] = {"name": "LocalDeveloper", "is_mock": True} |
|
return redirect(url_for("index")) |
|
|
|
|
|
@app.route("/auth/callback") |
|
def auth_callback(): |
|
"""This route will be called by Hugging Face after successful authentication.""" |
|
logger.info(f"Auth callback called. Headers: {dict(request.headers)}") |
|
|
|
if is_hf_space: |
|
|
|
username = request.headers.get("X-Spaces-Username") |
|
if username: |
|
session["user"] = {"name": username, "is_hf": True} |
|
return redirect(url_for("index")) |
|
else: |
|
return render_template("error.html", message="Authentication failed. No username provided.") |
|
return redirect(url_for("login")) |
|
|
|
|
|
@app.route("/health") |
|
def health_check(): |
|
"""Health check endpoint for container verification.""" |
|
|
|
env_vars = { |
|
"FLASK_ENV": os.environ.get("FLASK_ENV", "production"), |
|
"DEBUG": os.environ.get("DEBUG", "Not set"), |
|
"SPACE_ID": os.environ.get("SPACE_ID", "Not set"), |
|
"BYPASS_AUTH": os.environ.get("BYPASS_AUTH", "Not set"), |
|
"SECRET_KEY": os.environ.get("SECRET_KEY", "Not set")[:5] + "..." if os.environ.get("SECRET_KEY") else "Not set", |
|
"S3_BUCKET": os.environ.get("S3_BUCKET", "Not set"), |
|
"S3_VIDEO_PREFIX": os.environ.get("S3_VIDEO_PREFIX", "Not set"), |
|
"USE_S3_FOR_VIDEOS": os.environ.get("USE_S3_FOR_VIDEOS", "Not set") |
|
} |
|
|
|
logger.info(f"Health check called. Environment: {env_vars}") |
|
|
|
|
|
session_info = dict(session) if session else None |
|
session_keys = list(session.keys()) if session else [] |
|
|
|
return jsonify({ |
|
"status": "healthy", |
|
"environment": env_vars, |
|
"session_keys": session_keys, |
|
"is_hf_space": is_hf_space, |
|
"bypass_auth": bypass_auth, |
|
"directories": { |
|
"videos": os.path.exists(VIDEO_DIR), |
|
"annotations": os.path.exists(ANNOTATIONS_DIR), |
|
"temp": os.path.exists(TEMP_DIR) |
|
} |
|
}) |
|
|
|
|
|
@app.route("/auth") |
|
def auth(): |
|
"""This route handles HF authentication.""" |
|
logger.info(f"Auth route called. Headers: {dict(request.headers)}") |
|
|
|
|
|
bypass_auth = True |
|
|
|
|
|
if bypass_auth: |
|
logger.info("Auth bypass enabled, setting default user") |
|
session["user"] = {"name": "Perilon", "is_hf": True} |
|
return redirect(url_for("index")) |
|
|
|
|
|
username = request.headers.get("X-Spaces-Username") |
|
logger.info(f"Username from headers in auth: {username}") |
|
|
|
if is_hf_space and username and is_allowed_user(username): |
|
logger.info(f"Setting user in session: {username}") |
|
session["user"] = {"name": username, "is_hf": True} |
|
return redirect(url_for("index")) |
|
elif not is_hf_space: |
|
|
|
session["user"] = {"name": "LocalDeveloper", "is_mock": True} |
|
return redirect(url_for("index")) |
|
else: |
|
|
|
return render_template("error.html", message= |
|
"Waiting for Hugging Face authentication. If you continue to see this message, " |
|
"please make sure you're logged into Hugging Face and your username is allowed.") |
|
|
|
|
|
@app.before_request |
|
def check_auth(): |
|
"""Check authentication before processing requests.""" |
|
|
|
if request.path in ["/login", "/logout", "/auth", "/auth/callback", "/debug", "/health"] or request.path.startswith("/static/"): |
|
return |
|
|
|
|
|
bypass_auth = True |
|
|
|
|
|
logger.debug(f"Request path: {request.path}, User in session: {'user' in session}") |
|
|
|
if bypass_auth: |
|
|
|
if "user" not in session: |
|
session["user"] = {"name": "Perilon", "is_hf": True} |
|
return |
|
|
|
if is_hf_space: |
|
|
|
username = request.headers.get("X-Spaces-Username") |
|
|
|
if "user" in session: |
|
logger.debug(f"User in session: {session['user']}") |
|
return |
|
|
|
if username and is_allowed_user(username): |
|
logger.info(f"Setting user from headers: {username}") |
|
session["user"] = {"name": username, "is_hf": True} |
|
return |
|
|
|
|
|
logger.info(f"No authenticated user, redirecting to /auth") |
|
return redirect("/auth") |
|
elif "user" not in session: |
|
return redirect(url_for("login")) |
|
|
|
|
|
@app.route("/logout") |
|
def logout(): |
|
"""Clear session and redirect to login.""" |
|
session.clear() |
|
if is_hf_space: |
|
return redirect("/auth/logout") |
|
return redirect(url_for("login")) |
|
|
|
|
|
@app.route("/debug") |
|
def debug_info(): |
|
"""Return debug information.""" |
|
cookies = {key: request.cookies.get(key) for key in request.cookies.keys()} |
|
|
|
info = { |
|
"session": dict(session) if session else None, |
|
"headers": dict(request.headers), |
|
"cookies": cookies, |
|
"is_hf_space": is_hf_space, |
|
"allowed_users": os.getenv("ALLOWED_USERS", "Perilon"), |
|
"app_config": {k: str(v) for k, v in app.config.items() if k in |
|
["SESSION_COOKIE_SECURE", "SESSION_COOKIE_HTTPONLY", |
|
"SESSION_COOKIE_SAMESITE", "PERMANENT_SESSION_LIFETIME"]}, |
|
"s3_config": { |
|
"S3_BUCKET": S3_BUCKET, |
|
"S3_VIDEO_PREFIX": S3_VIDEO_PREFIX, |
|
"USE_S3_FOR_VIDEOS": USE_S3_FOR_VIDEOS |
|
} |
|
} |
|
return jsonify(info) |
|
|
|
|
|
|
|
@app.route("/") |
|
@login_required |
|
def index(): |
|
"""Main entry point, redirects to video selection.""" |
|
return redirect(url_for("select_video")) |
|
|
|
|
|
@app.route("/select_video") |
|
@login_required |
|
def select_video(): |
|
"""Page to select a video for annotation.""" |
|
if USE_S3_FOR_VIDEOS: |
|
video_ids = list_s3_videos() |
|
else: |
|
if not os.path.exists(VIDEO_DIR): |
|
return render_template("error.html", message="Video directory not found.") |
|
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(".mp4")] |
|
video_ids = [os.path.splitext(v)[0] for v in videos] |
|
|
|
return render_template("select_video.html", video_ids=video_ids, user=session.get("user")) |
|
|
|
|
|
@app.route("/player/<video_id>") |
|
@login_required |
|
def player(video_id): |
|
"""Video player page for annotation.""" |
|
return render_template("player.html", video_id=video_id, user=session.get("user")) |
|
|
|
|
|
@app.route("/videos") |
|
@login_required |
|
def get_videos(): |
|
"""API endpoint to get available videos.""" |
|
if USE_S3_FOR_VIDEOS: |
|
videos = list_s3_videos() |
|
if not videos: |
|
return jsonify({"error": "No videos found in S3"}), 404 |
|
|
|
return jsonify([f"{vid}.mp4" for vid in videos]) |
|
else: |
|
|
|
if not os.path.exists(VIDEO_DIR): |
|
return jsonify({"error": "Video directory not found"}), 404 |
|
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith((".mp4", ".avi", ".mov"))] |
|
if not videos: |
|
return jsonify({"error": "No videos found"}), 404 |
|
return jsonify(videos) |
|
|
|
|
|
@app.route("/video/<path:filename>") |
|
@login_required |
|
def serve_video(filename): |
|
"""Serve a video file from S3 or local storage.""" |
|
video_id = os.path.splitext(filename)[0] |
|
|
|
if USE_S3_FOR_VIDEOS: |
|
|
|
presigned_url = generate_presigned_url(video_id) |
|
if presigned_url: |
|
return redirect(presigned_url) |
|
|
|
|
|
local_path = download_video_from_s3(video_id) |
|
if local_path and os.path.exists(local_path): |
|
return send_from_directory(VIDEO_DIR, filename) |
|
|
|
return jsonify({"error": "Video not found in S3"}), 404 |
|
else: |
|
|
|
if not os.path.exists(os.path.join(VIDEO_DIR, filename)): |
|
return jsonify({"error": "Video not found"}), 404 |
|
return send_from_directory(VIDEO_DIR, filename) |
|
|
|
|
|
@app.route("/save_annotations", methods=["POST"]) |
|
@login_required |
|
def save_annotations(): |
|
"""Save annotation data.""" |
|
data = request.json |
|
if not data or "video" not in data or "timestamps" not in data: |
|
return jsonify({"success": False, "message": "Invalid data"}), 400 |
|
|
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") |
|
annotation_data = { |
|
"video_name": data["video"] + ".mp4", |
|
"timestamps": sorted(data["timestamps"]), |
|
"annotation_date": datetime.now().isoformat(), |
|
"annotated_by": session.get("user", {}).get("name", "unknown") |
|
} |
|
with open(annotation_file, "w") as f: |
|
json.dump(annotation_data, f, indent=4) |
|
return jsonify({"success": True, "message": "Annotations saved successfully"}) |
|
|
|
|
|
@app.route("/get_annotations/<path:video_name>") |
|
@login_required |
|
def get_annotations(video_name): |
|
"""Get annotations for a video.""" |
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") |
|
if not os.path.exists(annotation_file): |
|
return jsonify({"error": "No annotations found"}), 404 |
|
with open(annotation_file, "r") as f: |
|
annotations = json.load(f) |
|
return jsonify(annotations) |
|
|
|
|
|
@app.route("/alignment/<video_id>") |
|
@login_required |
|
def alignment_mode(video_id): |
|
"""Page for aligning sign language with transcribed text.""" |
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") |
|
if not os.path.exists(annotation_file): |
|
return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") |
|
with open(annotation_file, "r") as f: |
|
annotations = json.load(f) |
|
return render_template( |
|
"alignment.html", |
|
video_id=video_id, |
|
total_clips=len(annotations["timestamps"]) - 1, |
|
user=session.get("user") |
|
) |
|
|
|
|
|
@app.route("/api/transcript/<video_id>") |
|
@login_required |
|
def get_transcript(video_id): |
|
"""Get transcript for a video.""" |
|
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
|
logger.info(f"Attempting to load word timestamps from: {timestamps_file}") |
|
if not os.path.exists(timestamps_file): |
|
logger.warning(f"Word timestamps file not found: {timestamps_file}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": "No word timestamps found for this video" |
|
}), 404 |
|
try: |
|
with open(timestamps_file, "r") as f: |
|
word_data = json.load(f) |
|
full_text = " ".join(item["punctuated_word"] for item in word_data) |
|
words_with_times = [{ |
|
"word": item["punctuated_word"], |
|
"start": float(item["start_time"]), |
|
"end": float(item["end_time"]) |
|
} for item in word_data] |
|
logger.info(f"Successfully created transcript ({len(full_text)} characters)") |
|
return jsonify({ |
|
"status": "success", |
|
"text": full_text, |
|
"words": words_with_times |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error processing word timestamps: {str(e)}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Error processing word timestamps: {str(e)}" |
|
}), 500 |
|
|
|
|
|
@app.route("/api/word_timestamps/<video_id>") |
|
@login_required |
|
def get_word_timestamps(video_id): |
|
"""Get word-level timestamps for a video.""" |
|
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
|
logger.info(f"Attempting to load word timestamps from: {timestamps_file}") |
|
if not os.path.exists(timestamps_file): |
|
logger.warning(f"Word timestamps file not found: {timestamps_file}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": "No word timestamps found for this video" |
|
}), 404 |
|
try: |
|
with open(timestamps_file, "r") as f: |
|
word_data = json.load(f) |
|
logger.info(f"Successfully loaded {len(word_data)} word timestamps") |
|
return jsonify({ |
|
"status": "success", |
|
"words": word_data |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error processing word timestamps: {str(e)}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Error processing word timestamps: {str(e)}" |
|
}), 500 |
|
|
|
|
|
@app.route("/api/clips/<video_id>") |
|
@login_required |
|
def get_video_clips(video_id): |
|
"""Get clips for a video.""" |
|
try: |
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") |
|
if not os.path.exists(annotation_file): |
|
raise FileNotFoundError("Annotations not found") |
|
with open(annotation_file, "r") as f: |
|
annotations = json.load(f) |
|
timestamps = annotations["timestamps"] |
|
clips = [] |
|
for i in range(len(timestamps)-1): |
|
clips.append({ |
|
"index": i, |
|
"start": timestamps[i], |
|
"end": timestamps[i+1], |
|
"path": f"/clip/{video_id}/{i}" |
|
}) |
|
return jsonify({ |
|
"status": "success", |
|
"clips": clips |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error getting clips: {str(e)}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": str(e) |
|
}), 500 |
|
|
|
|
|
@app.route("/clip/<video_id>/<int:clip_index>") |
|
@login_required |
|
def serve_clip(video_id, clip_index): |
|
"""Serve a specific clip.""" |
|
clip_path = os.path.join( |
|
TEMP_DIR, |
|
f"{video_id}_clip_{clip_index:03d}.mp4" |
|
) |
|
logger.info(f"Attempting to serve clip: {clip_path}") |
|
if not os.path.exists(clip_path): |
|
logger.error(f"Clip not found: {clip_path}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": "Clip not found" |
|
}), 404 |
|
return send_file(clip_path, mimetype="video/mp4") |
|
|
|
|
|
@app.route("/api/save_alignments", methods=["POST"]) |
|
@login_required |
|
def save_alignments(): |
|
"""Save alignment data.""" |
|
try: |
|
data = request.json |
|
if not data or "video_id" not in data or "alignments" not in data: |
|
return jsonify({"success": False, "message": "Invalid data"}), 400 |
|
|
|
|
|
for alignment in data["alignments"]: |
|
if alignment: |
|
alignment["aligned_by"] = session.get("user", {}).get("name", "unknown") |
|
|
|
output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") |
|
with open(output_path, "w") as f: |
|
json.dump(data["alignments"], f, indent=2) |
|
return jsonify({ |
|
"success": True, |
|
"message": "Alignments saved successfully" |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error saving alignments: {str(e)}") |
|
return jsonify({ |
|
"success": False, |
|
"message": str(e) |
|
}), 500 |
|
|
|
|
|
@app.route("/api/extract_clips/<video_id>") |
|
@login_required |
|
def extract_clips_for_video(video_id): |
|
"""Extract clips and start transcription for a video.""" |
|
|
|
if USE_S3_FOR_VIDEOS: |
|
video_path = download_video_from_s3(video_id) |
|
if not video_path: |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Failed to download video {video_id} from S3" |
|
}), 404 |
|
|
|
status = clip_extraction_status.get(video_id, {}) |
|
if status.get("percent", 0) < 100: |
|
thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) |
|
thread.start() |
|
if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: |
|
thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) |
|
thread_trans.start() |
|
return jsonify({"status": "started"}) |
|
|
|
|
|
@app.route("/api/clip_progress/<video_id>") |
|
@login_required |
|
def clip_progress(video_id): |
|
"""Get clip extraction progress.""" |
|
progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) |
|
return jsonify(progress) |
|
|
|
|
|
@app.route("/api/transcription_progress/<video_id>") |
|
@login_required |
|
def transcription_progress(video_id): |
|
"""Get transcription progress.""" |
|
progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) |
|
return jsonify(progress) |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
|
|
print("=" * 50) |
|
print(f"Starting app with configuration:") |
|
print(f"- Running in HF Space: {is_hf_space}") |
|
print(f"- Auth bypass: {bypass_auth}") |
|
print(f"- Port: {os.getenv('PORT', 7860)}") |
|
print(f"- S3 for videos: {USE_S3_FOR_VIDEOS}") |
|
print(f"- S3 bucket: {S3_BUCKET}") |
|
print(f"- S3 prefix: {S3_VIDEO_PREFIX}") |
|
print(f"- Available videos: {os.listdir(VIDEO_DIR) if os.path.exists(VIDEO_DIR) else 'None'}") |
|
if USE_S3_FOR_VIDEOS: |
|
try: |
|
s3_videos = list_s3_videos() |
|
print(f"- Available S3 videos: {s3_videos if s3_videos else 'None'}") |
|
except Exception as e: |
|
print(f"- Error listing S3 videos: {str(e)}") |
|
print("=" * 50) |
|
|
|
port = int(os.getenv("PORT", 7860)) |
|
app.run(host="0.0.0.0", port=port, debug=True) |
|
except Exception as e: |
|
print(f"Error starting the application: {e}") |
|
import traceback |
|
traceback.print_exc() |