File size: 5,248 Bytes
df66a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3

import boto3
import json
import os
import pathlib
import requests
import time
from decimal import Decimal
from typing import Any, Dict, List

def extract_audio(video_path: str) -> str:
    """Extract audio from video file using ffmpeg.
    Returns path to the extracted audio file."""
    temp_dir = "/tmp/transcribe_temp"
    os.makedirs(temp_dir, exist_ok=True)
    audio_path = os.path.join(temp_dir, "temp_audio.wav")
    ffmpeg_cmd = f"/usr/bin/ffmpeg -i {video_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 -y {audio_path}"
    os.system(ffmpeg_cmd)
    return audio_path

def upload_to_s3(audio_path: str, timestamp: int) -> str:
    """Upload audio file to S3 and return the S3 path."""
    s3 = boto3.client("s3", region_name="us-west-2")
    s3_path = f"awilkinson/temp_audio/temp_audio_{timestamp}.wav"
    with open(audio_path, "rb") as audio_file:
        s3.upload_fileobj(audio_file, "sorenson-ai-sb-scratch", s3_path)
    return s3_path

def start_transcription(s3_path: str, timestamp: int) -> str:
    """Start transcription job and return job name."""
    transcribe = boto3.client("transcribe", region_name="us-west-2")
    job_name = f"transcribe_{timestamp}"
    transcribe.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={"MediaFileUri": f"s3://sorenson-ai-sb-scratch/{s3_path}"},
        MediaFormat="wav",
        LanguageCode="en-US",
        Settings={
            "ShowSpeakerLabels": False,
            "ShowAlternatives": False
        }
    )
    return job_name

def wait_for_transcription(job_name: str) -> Dict[str, Any]:
    """Wait for transcription job to complete and return results."""
    transcribe = boto3.client("transcribe", region_name="us-west-2")
    while True:
        status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        if status["TranscriptionJob"]["TranscriptionJobStatus"] in ["COMPLETED", "FAILED"]:
            break
        time.sleep(5)
    return status

def process_transcription_results(transcript_uri: str) -> List[Dict[str, str]]:
    """Process transcription results and extract word timestamps with punctuation."""
    response = requests.get(transcript_uri)
    data = response.json()
    words = []
    current_word = None
    for item in data["results"]["items"]:
        if item["type"] == "pronunciation":
            if current_word is not None:
                words.append(current_word)
            current_word = {
                "word": item["alternatives"][0]["content"],
                "start_time": str(round(Decimal(item["start_time"]), 3)),
                "end_time": str(round(Decimal(item["end_time"]), 3)),
                "punctuated_word": item["alternatives"][0]["content"]
            }
        elif item["type"] == "punctuation" and current_word is not None:
            current_word["punctuated_word"] += item["alternatives"][0]["content"]
    if current_word is not None:
        words.append(current_word)
    return words

def cleanup_files(audio_path: str, s3_path: str) -> None:
    """Remove temporary files and S3 objects."""
    if os.path.exists(audio_path):
        os.remove(audio_path)
    s3 = boto3.client("s3", region_name="us-west-2")
    try:
        s3.delete_object(Bucket="sorenson-ai-sb-scratch", Key=s3_path)
    except Exception as e:
        print(f"Warning: Failed to delete S3 object. Error: {str(e)}")

def get_word_timestamps(video_path: str) -> List[Dict[str, str]]:
    """Get word-level timestamps using Amazon Transcribe."""
    timestamp = int(time.time())
    audio_path = ""
    s3_path = ""
    try:
        audio_path = extract_audio(video_path)
        s3_path = upload_to_s3(audio_path, timestamp)
        job_name = start_transcription(s3_path, timestamp)
        status = wait_for_transcription(job_name)
        if status["TranscriptionJob"]["TranscriptionJobStatus"] == "COMPLETED":
            transcript_uri = status["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
            return process_transcription_results(transcript_uri)
        else:
            print(f"Transcription failed. Reason: {status['TranscriptionJob']['FailureReason']}")
            return []
    finally:
        cleanup_files(audio_path, s3_path)

def main() -> None:
    base_dir = os.path.join(
        str(pathlib.Path.home()),
        "andrew_messaround",
        "vsl_speech_to_signing_alignment",
        "boundary_annotation_webapp"
    )
    video_filename = args.video_id + ".mp4"  # Source video file (with .mp4)
    video_path = os.path.join(base_dir, "data", "videos", video_filename)
    word_timestamps = get_word_timestamps(video_path)
    output_dir = os.path.join(base_dir, "data", "word_timestamps")
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, args.video_id + "_word_timestamps.json")
    with open(output_path, "w") as f:
        json.dump(word_timestamps, f, indent=4)
    print(f"Word timestamps saved to: {output_path}")

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Get word timestamps for a given video file ID.")
    parser.add_argument("video_id", help="Video file ID (without extension)")
    args = parser.parse_args()
    main()