|
|
|
|
|
import boto3 |
|
import json |
|
import os |
|
import pathlib |
|
import requests |
|
import time |
|
from decimal import Decimal |
|
from typing import Any, Dict, List |
|
from botocore.exceptions import ClientError |
|
|
|
S3_BUCKET = "sorenson-ai-sb-scratch" |
|
S3_VIDEO_PREFIX = "awilkinson/kylie_dataset_videos_for_alignment_webapp/" |
|
USE_S3_FOR_VIDEOS = True |
|
|
|
def get_s3_client(): |
|
"""Get a boto3 S3 client.""" |
|
return boto3.client( |
|
's3', |
|
region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-west-2'), |
|
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), |
|
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY') |
|
) |
|
|
|
def download_video_from_s3(video_id, output_dir): |
|
"""Download a video from S3.""" |
|
video_filename = f"{video_id}.mp4" |
|
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}" |
|
local_path = os.path.join(output_dir, video_filename) |
|
|
|
|
|
if os.path.exists(local_path): |
|
print(f"Video {video_id} already exists locally.") |
|
return local_path |
|
|
|
try: |
|
print(f"Downloading video {video_id} from S3...") |
|
s3_client = get_s3_client() |
|
s3_client.download_file(S3_BUCKET, s3_key, local_path) |
|
print(f"Video {video_id} downloaded successfully to {local_path}") |
|
return local_path |
|
except ClientError as e: |
|
print(f"Error downloading video from S3: {str(e)}") |
|
return None |
|
|
|
def extract_audio(video_path: str) -> str: |
|
"""Extract audio from video file using ffmpeg. |
|
Returns path to the extracted audio file.""" |
|
temp_dir = "/tmp/transcribe_temp" |
|
os.makedirs(temp_dir, exist_ok=True) |
|
audio_path = os.path.join(temp_dir, "temp_audio.wav") |
|
ffmpeg_cmd = f"/usr/bin/ffmpeg -i {video_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 -y {audio_path}" |
|
os.system(ffmpeg_cmd) |
|
return audio_path |
|
|
|
def upload_to_s3(audio_path: str, timestamp: int) -> str: |
|
"""Upload audio file to S3 and return the S3 path.""" |
|
s3 = boto3.client("s3", region_name="us-west-2") |
|
s3_path = f"awilkinson/temp_audio/temp_audio_{timestamp}.wav" |
|
with open(audio_path, "rb") as audio_file: |
|
s3.upload_fileobj(audio_file, "sorenson-ai-sb-scratch", s3_path) |
|
return s3_path |
|
|
|
def start_transcription(s3_path: str, timestamp: int) -> str: |
|
"""Start transcription job and return job name.""" |
|
transcribe = boto3.client("transcribe", region_name="us-west-2") |
|
job_name = f"transcribe_{timestamp}" |
|
transcribe.start_transcription_job( |
|
TranscriptionJobName=job_name, |
|
Media={"MediaFileUri": f"s3://sorenson-ai-sb-scratch/{s3_path}"}, |
|
MediaFormat="wav", |
|
LanguageCode="en-US", |
|
Settings={ |
|
"ShowSpeakerLabels": False, |
|
"ShowAlternatives": False |
|
} |
|
) |
|
return job_name |
|
|
|
def wait_for_transcription(job_name: str) -> Dict[str, Any]: |
|
"""Wait for transcription job to complete and return results.""" |
|
transcribe = boto3.client("transcribe", region_name="us-west-2") |
|
while True: |
|
status = transcribe.get_transcription_job(TranscriptionJobName=job_name) |
|
if status["TranscriptionJob"]["TranscriptionJobStatus"] in ["COMPLETED", "FAILED"]: |
|
break |
|
time.sleep(5) |
|
return status |
|
|
|
def process_transcription_results(transcript_uri: str) -> List[Dict[str, str]]: |
|
"""Process transcription results and extract word timestamps with punctuation.""" |
|
response = requests.get(transcript_uri) |
|
data = response.json() |
|
words = [] |
|
current_word = None |
|
for item in data["results"]["items"]: |
|
if item["type"] == "pronunciation": |
|
if current_word is not None: |
|
words.append(current_word) |
|
current_word = { |
|
"word": item["alternatives"][0]["content"], |
|
"start_time": str(round(Decimal(item["start_time"]), 3)), |
|
"end_time": str(round(Decimal(item["end_time"]), 3)), |
|
"punctuated_word": item["alternatives"][0]["content"] |
|
} |
|
elif item["type"] == "punctuation" and current_word is not None: |
|
current_word["punctuated_word"] += item["alternatives"][0]["content"] |
|
if current_word is not None: |
|
words.append(current_word) |
|
return words |
|
|
|
def cleanup_files(audio_path: str, s3_path: str) -> None: |
|
"""Remove temporary files and S3 objects.""" |
|
if os.path.exists(audio_path): |
|
os.remove(audio_path) |
|
s3 = boto3.client("s3", region_name="us-west-2") |
|
try: |
|
s3.delete_object(Bucket="sorenson-ai-sb-scratch", Key=s3_path) |
|
except Exception as e: |
|
print(f"Warning: Failed to delete S3 object. Error: {str(e)}") |
|
|
|
def get_word_timestamps(video_path: str) -> List[Dict[str, str]]: |
|
"""Get word-level timestamps using Amazon Transcribe.""" |
|
timestamp = int(time.time()) |
|
audio_path = "" |
|
s3_path = "" |
|
try: |
|
audio_path = extract_audio(video_path) |
|
s3_path = upload_to_s3(audio_path, timestamp) |
|
job_name = start_transcription(s3_path, timestamp) |
|
status = wait_for_transcription(job_name) |
|
if status["TranscriptionJob"]["TranscriptionJobStatus"] == "COMPLETED": |
|
transcript_uri = status["TranscriptionJob"]["Transcript"]["TranscriptFileUri"] |
|
return process_transcription_results(transcript_uri) |
|
else: |
|
print(f"Transcription failed. Reason: {status['TranscriptionJob']['FailureReason']}") |
|
return [] |
|
finally: |
|
cleanup_files(audio_path, s3_path) |
|
|
|
def main() -> None: |
|
base_dir = os.path.join( |
|
str(pathlib.Path.home()), |
|
"andrew_messaround", |
|
"vsl_speech_to_signing_alignment", |
|
"boundary_annotation_webapp" |
|
) |
|
video_filename = args.video_id + ".mp4" |
|
video_path = os.path.join(base_dir, "data", "videos", video_filename) |
|
|
|
|
|
if USE_S3_FOR_VIDEOS and not os.path.exists(video_path): |
|
videos_dir = os.path.join(base_dir, "data", "videos") |
|
os.makedirs(videos_dir, exist_ok=True) |
|
download_video_from_s3(args.video_id, videos_dir) |
|
|
|
if not os.path.exists(video_path): |
|
print(f"Error: Video file not found: {video_path}") |
|
return |
|
|
|
word_timestamps = get_word_timestamps(video_path) |
|
output_dir = os.path.join(base_dir, "data", "word_timestamps") |
|
os.makedirs(output_dir, exist_ok=True) |
|
output_path = os.path.join(output_dir, args.video_id + "_word_timestamps.json") |
|
with open(output_path, "w") as f: |
|
json.dump(word_timestamps, f, indent=4) |
|
print(f"Word timestamps saved to: {output_path}") |