ayazfau's picture
Update app.py
2dcb1c1 verified
import gradio as gr
from pydub import AudioSegment
import os
import speech_recognition as sr
import concurrent.futures
def split_audio(audio_path, chunk_length_ms=60000, overlap_ms=2000):
audio = AudioSegment.from_file(audio_path)
chunks = []
for i in range(0, len(audio), chunk_length_ms - overlap_ms):
chunks.append(audio[i:i + chunk_length_ms])
return chunks
def convert_audio_to_wav(input_path, output_path):
audio = AudioSegment.from_file(input_path)
audio = audio.set_frame_rate(16000).set_channels(1)
audio.export(output_path, format="wav")
def transcribe_chunk_indexed(indexed_chunk_language):
index, chunk, language = indexed_chunk_language
recognizer = sr.Recognizer()
try:
with open(f"chunk_{index}.wav", "wb") as f:
chunk.export(f.name, format="wav")
with sr.AudioFile(f"chunk_{index}.wav") as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data, language=language)
os.remove(f"chunk_{index}.wav")
return index, text
except sr.RequestError:
return index, "[Error: API unavailable or unresponsive]"
except sr.UnknownValueError:
return index, "[Error: Unable to recognize speech]"
except Exception as e:
return index, f"[Error: {str(e)}]"
def transcribe_audio_with_google_parallel(audio_path, chunk_length_ms=60000, overlap_ms=2000, language="en-US"):
chunks = split_audio(audio_path, chunk_length_ms, overlap_ms)
indexed_chunks = [(i, chunk, language) for i, chunk in enumerate(chunks)]
transcription = [""] * len(indexed_chunks)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(transcribe_chunk_indexed, ic): ic[0] for ic in indexed_chunks}
for future in concurrent.futures.as_completed(futures):
idx, text = future.result()
transcription[idx] = text
return " ".join(transcription)
def transcribe(audio_file_path, language):
if audio_file_path is None:
return "Please upload an audio file."
try:
converted_path = audio_file_path + "_converted.wav"
convert_audio_to_wav(audio_file_path, converted_path)
temp_path = converted_path
except Exception as e:
return f"Error processing audio: {e}"
transcription = transcribe_audio_with_google_parallel(temp_path, chunk_length_ms=60000, overlap_ms=2000, language=language)
try:
os.remove(temp_path)
except Exception:
pass
return transcription
language_options = {
"English (US)": "en-US",
"Dutch": "nl-NL",
"English (UK)": "en-GB",
"Spanish": "es-ES",
"French": "fr-FR",
"German": "de-DE",
"Hindi": "hi-IN",
"Chinese (Mandarin)": "zh-CN",
"Arabic": "ar-SA",
"Turkish": "tr-TR",
}
with gr.Blocks() as demo:
gr.Markdown("# Audio to Text Transcription")
gr.Markdown("Upload an audio file, and we'll transcribe it into text using chunk processing.")
with gr.Row():
audio_input = gr.Audio(type="filepath", label="Upload audio file (mp3, wav, m4a, ogg)")
language_dropdown = gr.Dropdown(list(language_options.keys()), label="Select language", value="English (US)")
transcribe_btn = gr.Button("Transcribe")
output_text = gr.Textbox(label="Transcription Output", lines=15)
def on_transcribe(audio_path, lang_name):
lang_code = language_options[lang_name]
return transcribe(audio_path, lang_code)
transcribe_btn.click(on_transcribe, inputs=[audio_input, language_dropdown], outputs=output_text)
demo.launch()