|
import gradio as gr |
|
import os |
|
from moviepy.editor import VideoFileClip |
|
from pydub import AudioSegment |
|
import torch |
|
from nemo.collections.asr.models import EncDecCTCModelBPE |
|
import wget |
|
|
|
MODEL_URL = "https://huggingface.co/Mohammadp/Persian-ASR/resolve/main/conformer_transducer_persian.nemo" |
|
MODEL_PATH = "conformer_transducer_persian.nemo" |
|
|
|
|
|
if not os.path.exists(MODEL_PATH): |
|
print("Downloading model...") |
|
wget.download(MODEL_URL, MODEL_PATH) |
|
print("\nModel downloaded successfully.") |
|
|
|
|
|
model = EncDecCTCModelBPE.restore_from(MODEL_PATH) |
|
print("Model loaded successfully!") |
|
|
|
SAMPLE_RATE = 16000 |
|
MAX_CHUNK_LENGTH_MS = 10 * 1000 |
|
|
|
|
|
def extract_audio_from_video(video_path): |
|
"""Extracts audio from a video file and saves it as a WAV file.""" |
|
video = VideoFileClip(video_path) |
|
audio_path = "extracted_audio.wav" |
|
video.audio.write_audiofile(audio_path) |
|
return audio_path |
|
|
|
def resample_audio(audio_path, target_sample_rate=SAMPLE_RATE): |
|
"""Resamples an audio file to 16kHz.""" |
|
audio = AudioSegment.from_file(audio_path) |
|
audio = audio.set_frame_rate(target_sample_rate) |
|
resampled_path = "resampled_audio.wav" |
|
audio.export(resampled_path, format="wav") |
|
return resampled_path |
|
|
|
def split_audio(audio_path, max_length_ms=MAX_CHUNK_LENGTH_MS): |
|
"""Splits audio into chunks of max_length_ms each.""" |
|
audio = AudioSegment.from_file(audio_path) |
|
chunks = [] |
|
for i in range(0, len(audio), max_length_ms): |
|
chunk = audio[i:i + max_length_ms] |
|
chunk_path = f"chunk_{i // max_length_ms}.wav" |
|
chunk.export(chunk_path, format="wav") |
|
chunks.append(chunk_path) |
|
return chunks |
|
|
|
def transcribe_audio(audio_path): |
|
"""Transcribes a single audio file using the NeMo model.""" |
|
return model.transcribe([audio_path]) |
|
|
|
def process_audio(audio_path): |
|
"""Processes an audio file: resamples, splits, and transcribes.""" |
|
resampled_path = resample_audio(audio_path) |
|
chunks = split_audio(resampled_path) |
|
transcriptions = [transcribe_audio(chunk) for chunk in chunks] |
|
return " ".join(transcriptions) |
|
|
|
def process_video(video_path): |
|
"""Extracts and processes audio from a video file.""" |
|
audio_path = extract_audio_from_video(video_path) |
|
return process_audio(audio_path) |
|
|
|
def process_microphone(audio_path): |
|
"""Processes live-recorded microphone audio.""" |
|
return process_audio(audio_path) |
|
|
|
|
|
def process_input(video=None, audio=None, microphone=None): |
|
if video is not None: |
|
return f"Transcription: {process_video(video)}" |
|
elif audio is not None: |
|
return f"Transcription: {process_audio(audio)}" |
|
elif microphone is not None: |
|
return f"Transcription: {process_microphone(microphone)}" |
|
else: |
|
return "No input provided." |
|
|
|
|
|
example_wav_files = [ |
|
"FEmEC4QBSwA_285%20(4).wav", |
|
] |
|
|
|
iface = gr.Interface( |
|
fn=process_input, |
|
inputs=[ |
|
gr.Video(label="Upload Video"), |
|
gr.Audio(label="Upload Audio File", type="filepath"), |
|
gr.Microphone(label="Record from Microphone", type="filepath") |
|
], |
|
outputs="text", |
|
title="NeMo ASR Transcription Interface", |
|
description="Upload a video, an audio file, or record from the microphone to transcribe the audio using a trained NeMo model.", |
|
examples=[[None, wav, None] for wav in example_wav_files] |
|
) |
|
|
|
iface.launch() |