#!/usr/bin/env python3 import boto3 import json import os import pathlib import requests import time from decimal import Decimal from typing import Any, Dict, List def extract_audio(video_path: str) -> str: """Extract audio from video file using ffmpeg. Returns path to the extracted audio file.""" temp_dir = "/tmp/transcribe_temp" os.makedirs(temp_dir, exist_ok=True) audio_path = os.path.join(temp_dir, "temp_audio.wav") ffmpeg_cmd = f"/usr/bin/ffmpeg -i {video_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 -y {audio_path}" os.system(ffmpeg_cmd) return audio_path def upload_to_s3(audio_path: str, timestamp: int) -> str: """Upload audio file to S3 and return the S3 path.""" s3 = boto3.client("s3", region_name="us-west-2") s3_path = f"awilkinson/temp_audio/temp_audio_{timestamp}.wav" with open(audio_path, "rb") as audio_file: s3.upload_fileobj(audio_file, "sorenson-ai-sb-scratch", s3_path) return s3_path def start_transcription(s3_path: str, timestamp: int) -> str: """Start transcription job and return job name.""" transcribe = boto3.client("transcribe", region_name="us-west-2") job_name = f"transcribe_{timestamp}" transcribe.start_transcription_job( TranscriptionJobName=job_name, Media={"MediaFileUri": f"s3://sorenson-ai-sb-scratch/{s3_path}"}, MediaFormat="wav", LanguageCode="en-US", Settings={ "ShowSpeakerLabels": False, "ShowAlternatives": False } ) return job_name def wait_for_transcription(job_name: str) -> Dict[str, Any]: """Wait for transcription job to complete and return results.""" transcribe = boto3.client("transcribe", region_name="us-west-2") while True: status = transcribe.get_transcription_job(TranscriptionJobName=job_name) if status["TranscriptionJob"]["TranscriptionJobStatus"] in ["COMPLETED", "FAILED"]: break time.sleep(5) return status def process_transcription_results(transcript_uri: str) -> List[Dict[str, str]]: """Process transcription results and extract word timestamps with punctuation.""" response = requests.get(transcript_uri) data = response.json() words = [] current_word = None for item in data["results"]["items"]: if item["type"] == "pronunciation": if current_word is not None: words.append(current_word) current_word = { "word": item["alternatives"][0]["content"], "start_time": str(round(Decimal(item["start_time"]), 3)), "end_time": str(round(Decimal(item["end_time"]), 3)), "punctuated_word": item["alternatives"][0]["content"] } elif item["type"] == "punctuation" and current_word is not None: current_word["punctuated_word"] += item["alternatives"][0]["content"] if current_word is not None: words.append(current_word) return words def cleanup_files(audio_path: str, s3_path: str) -> None: """Remove temporary files and S3 objects.""" if os.path.exists(audio_path): os.remove(audio_path) s3 = boto3.client("s3", region_name="us-west-2") try: s3.delete_object(Bucket="sorenson-ai-sb-scratch", Key=s3_path) except Exception as e: print(f"Warning: Failed to delete S3 object. Error: {str(e)}") def get_word_timestamps(video_path: str) -> List[Dict[str, str]]: """Get word-level timestamps using Amazon Transcribe.""" timestamp = int(time.time()) audio_path = "" s3_path = "" try: audio_path = extract_audio(video_path) s3_path = upload_to_s3(audio_path, timestamp) job_name = start_transcription(s3_path, timestamp) status = wait_for_transcription(job_name) if status["TranscriptionJob"]["TranscriptionJobStatus"] == "COMPLETED": transcript_uri = status["TranscriptionJob"]["Transcript"]["TranscriptFileUri"] return process_transcription_results(transcript_uri) else: print(f"Transcription failed. Reason: {status['TranscriptionJob']['FailureReason']}") return [] finally: cleanup_files(audio_path, s3_path) def main() -> None: base_dir = os.path.join( str(pathlib.Path.home()), "andrew_messaround", "vsl_speech_to_signing_alignment", "boundary_annotation_webapp" ) video_filename = args.video_id + ".mp4" # Source video file (with .mp4) video_path = os.path.join(base_dir, "data", "videos", video_filename) word_timestamps = get_word_timestamps(video_path) output_dir = os.path.join(base_dir, "data", "word_timestamps") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, args.video_id + "_word_timestamps.json") with open(output_path, "w") as f: json.dump(word_timestamps, f, indent=4) print(f"Word timestamps saved to: {output_path}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Get word timestamps for a given video file ID.") parser.add_argument("video_id", help="Video file ID (without extension)") args = parser.parse_args() main()