Spaces:
Running
Running
import gradio as gr | |
from pydub import AudioSegment | |
import os | |
import speech_recognition as sr | |
import concurrent.futures | |
def split_audio(audio_path, chunk_length_ms=60000, overlap_ms=2000): | |
audio = AudioSegment.from_file(audio_path) | |
chunks = [] | |
for i in range(0, len(audio), chunk_length_ms - overlap_ms): | |
chunks.append(audio[i:i + chunk_length_ms]) | |
return chunks | |
def convert_audio_to_wav(input_path, output_path): | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) | |
audio.export(output_path, format="wav") | |
def transcribe_chunk_indexed(indexed_chunk_language): | |
index, chunk, language = indexed_chunk_language | |
recognizer = sr.Recognizer() | |
try: | |
with open(f"chunk_{index}.wav", "wb") as f: | |
chunk.export(f.name, format="wav") | |
with sr.AudioFile(f"chunk_{index}.wav") as source: | |
audio_data = recognizer.record(source) | |
text = recognizer.recognize_google(audio_data, language=language) | |
os.remove(f"chunk_{index}.wav") | |
return index, text | |
except sr.RequestError: | |
return index, "[Error: API unavailable or unresponsive]" | |
except sr.UnknownValueError: | |
return index, "[Error: Unable to recognize speech]" | |
except Exception as e: | |
return index, f"[Error: {str(e)}]" | |
def transcribe_audio_with_google_parallel(audio_path, chunk_length_ms=60000, overlap_ms=2000, language="en-US"): | |
chunks = split_audio(audio_path, chunk_length_ms, overlap_ms) | |
indexed_chunks = [(i, chunk, language) for i, chunk in enumerate(chunks)] | |
transcription = [""] * len(indexed_chunks) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
futures = {executor.submit(transcribe_chunk_indexed, ic): ic[0] for ic in indexed_chunks} | |
for future in concurrent.futures.as_completed(futures): | |
idx, text = future.result() | |
transcription[idx] = text | |
return " ".join(transcription) | |
def transcribe(audio_file_path, language): | |
if audio_file_path is None: | |
return "Please upload an audio file." | |
try: | |
converted_path = audio_file_path + "_converted.wav" | |
convert_audio_to_wav(audio_file_path, converted_path) | |
temp_path = converted_path | |
except Exception as e: | |
return f"Error processing audio: {e}" | |
transcription = transcribe_audio_with_google_parallel(temp_path, chunk_length_ms=60000, overlap_ms=2000, language=language) | |
try: | |
os.remove(temp_path) | |
except Exception: | |
pass | |
return transcription | |
language_options = { | |
"English (US)": "en-US", | |
"Dutch": "nl-NL", | |
"English (UK)": "en-GB", | |
"Spanish": "es-ES", | |
"French": "fr-FR", | |
"German": "de-DE", | |
"Hindi": "hi-IN", | |
"Chinese (Mandarin)": "zh-CN", | |
"Arabic": "ar-SA", | |
"Turkish": "tr-TR", | |
} | |
with gr.Blocks() as demo: | |
gr.Markdown("# Audio to Text Transcription") | |
gr.Markdown("Upload an audio file, and we'll transcribe it into text using chunk processing.") | |
with gr.Row(): | |
audio_input = gr.Audio(type="filepath", label="Upload audio file (mp3, wav, m4a, ogg)") | |
language_dropdown = gr.Dropdown(list(language_options.keys()), label="Select language", value="English (US)") | |
transcribe_btn = gr.Button("Transcribe") | |
output_text = gr.Textbox(label="Transcription Output", lines=15) | |
def on_transcribe(audio_path, lang_name): | |
lang_code = language_options[lang_name] | |
return transcribe(audio_path, lang_code) | |
transcribe_btn.click(on_transcribe, inputs=[audio_input, language_dropdown], outputs=output_text) | |
demo.launch() | |