VSL_Boundary_Annotation_and_Alignment_Tool / get_transcription_with_amazon.py
Perilon's picture
Bug fixes
2daffd5
raw
history blame
6.7 kB
#!/usr/bin/env python3
import boto3
import json
import os
import pathlib
import requests
import time
from decimal import Decimal
from typing import Any, Dict, List
from botocore.exceptions import ClientError
S3_BUCKET = "sorenson-ai-sb-scratch"
S3_VIDEO_PREFIX = "awilkinson/kylie_dataset_videos_for_alignment_webapp/"
USE_S3_FOR_VIDEOS = True # Set to True to use S3, False to use local files
def get_s3_client():
"""Get a boto3 S3 client."""
return boto3.client(
's3',
region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-west-2'),
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
)
def download_video_from_s3(video_id, output_dir):
"""Download a video from S3."""
video_filename = f"{video_id}.mp4"
s3_key = f"{S3_VIDEO_PREFIX}{video_filename}"
local_path = os.path.join(output_dir, video_filename)
# Check if the file already exists locally
if os.path.exists(local_path):
print(f"Video {video_id} already exists locally.")
return local_path
try:
print(f"Downloading video {video_id} from S3...")
s3_client = get_s3_client()
s3_client.download_file(S3_BUCKET, s3_key, local_path)
print(f"Video {video_id} downloaded successfully to {local_path}")
return local_path
except ClientError as e:
print(f"Error downloading video from S3: {str(e)}")
return None
def extract_audio(video_path: str) -> str:
"""Extract audio from video file using ffmpeg.
Returns path to the extracted audio file."""
temp_dir = "/tmp/transcribe_temp"
os.makedirs(temp_dir, exist_ok=True)
audio_path = os.path.join(temp_dir, "temp_audio.wav")
ffmpeg_cmd = f"/usr/bin/ffmpeg -i {video_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 -y {audio_path}"
os.system(ffmpeg_cmd)
return audio_path
def upload_to_s3(audio_path: str, timestamp: int) -> str:
"""Upload audio file to S3 and return the S3 path."""
s3 = boto3.client("s3", region_name="us-west-2")
s3_path = f"awilkinson/temp_audio/temp_audio_{timestamp}.wav"
with open(audio_path, "rb") as audio_file:
s3.upload_fileobj(audio_file, "sorenson-ai-sb-scratch", s3_path)
return s3_path
def start_transcription(s3_path: str, timestamp: int) -> str:
"""Start transcription job and return job name."""
transcribe = boto3.client("transcribe", region_name="us-west-2")
job_name = f"transcribe_{timestamp}"
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={"MediaFileUri": f"s3://sorenson-ai-sb-scratch/{s3_path}"},
MediaFormat="wav",
LanguageCode="en-US",
Settings={
"ShowSpeakerLabels": False,
"ShowAlternatives": False
}
)
return job_name
def wait_for_transcription(job_name: str) -> Dict[str, Any]:
"""Wait for transcription job to complete and return results."""
transcribe = boto3.client("transcribe", region_name="us-west-2")
while True:
status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
if status["TranscriptionJob"]["TranscriptionJobStatus"] in ["COMPLETED", "FAILED"]:
break
time.sleep(5)
return status
def process_transcription_results(transcript_uri: str) -> List[Dict[str, str]]:
"""Process transcription results and extract word timestamps with punctuation."""
response = requests.get(transcript_uri)
data = response.json()
words = []
current_word = None
for item in data["results"]["items"]:
if item["type"] == "pronunciation":
if current_word is not None:
words.append(current_word)
current_word = {
"word": item["alternatives"][0]["content"],
"start_time": str(round(Decimal(item["start_time"]), 3)),
"end_time": str(round(Decimal(item["end_time"]), 3)),
"punctuated_word": item["alternatives"][0]["content"]
}
elif item["type"] == "punctuation" and current_word is not None:
current_word["punctuated_word"] += item["alternatives"][0]["content"]
if current_word is not None:
words.append(current_word)
return words
def cleanup_files(audio_path: str, s3_path: str) -> None:
"""Remove temporary files and S3 objects."""
if os.path.exists(audio_path):
os.remove(audio_path)
s3 = boto3.client("s3", region_name="us-west-2")
try:
s3.delete_object(Bucket="sorenson-ai-sb-scratch", Key=s3_path)
except Exception as e:
print(f"Warning: Failed to delete S3 object. Error: {str(e)}")
def get_word_timestamps(video_path: str) -> List[Dict[str, str]]:
"""Get word-level timestamps using Amazon Transcribe."""
timestamp = int(time.time())
audio_path = ""
s3_path = ""
try:
audio_path = extract_audio(video_path)
s3_path = upload_to_s3(audio_path, timestamp)
job_name = start_transcription(s3_path, timestamp)
status = wait_for_transcription(job_name)
if status["TranscriptionJob"]["TranscriptionJobStatus"] == "COMPLETED":
transcript_uri = status["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
return process_transcription_results(transcript_uri)
else:
print(f"Transcription failed. Reason: {status['TranscriptionJob']['FailureReason']}")
return []
finally:
cleanup_files(audio_path, s3_path)
def main() -> None:
base_dir = os.path.join(
str(pathlib.Path.home()),
"andrew_messaround",
"vsl_speech_to_signing_alignment",
"boundary_annotation_webapp"
)
video_filename = args.video_id + ".mp4" # Source video file (with .mp4)
video_path = os.path.join(base_dir, "data", "videos", video_filename)
# Check if we need to download from S3
if USE_S3_FOR_VIDEOS and not os.path.exists(video_path):
videos_dir = os.path.join(base_dir, "data", "videos")
os.makedirs(videos_dir, exist_ok=True)
download_video_from_s3(args.video_id, videos_dir)
if not os.path.exists(video_path):
print(f"Error: Video file not found: {video_path}")
return
word_timestamps = get_word_timestamps(video_path)
output_dir = os.path.join(base_dir, "data", "word_timestamps")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, args.video_id + "_word_timestamps.json")
with open(output_path, "w") as f:
json.dump(word_timestamps, f, indent=4)
print(f"Word timestamps saved to: {output_path}")