Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
import os | |
import uuid | |
import tempfile | |
import numpy as np | |
import scipy.io.wavfile | |
from fastapi import FastAPI, UploadFile, File, HTTPException, Form | |
from fastapi.responses import FileResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from starlette.background import BackgroundTask | |
from google import genai | |
from google.genai import types | |
from silero_vad import ( | |
load_silero_vad, | |
read_audio, | |
get_speech_timestamps, | |
save_audio, | |
collect_chunks, | |
) | |
import torch | |
import torchaudio | |
from transformers import ( | |
pipeline, | |
VitsModel, | |
VitsTokenizer, | |
) | |
import time | |
import gradio as gr | |
import logging | |
logging.basicConfig(level=logging.INFO | |
, format='%(asctime)s - %(levelname)s - %(message)s') | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
# load model and processor | |
model_id = "rbcurzon/whisper-medium-ph" | |
pipe = pipeline( | |
"automatic-speech-recognition", | |
model=model_id, | |
chunk_length_s=30, | |
device=device | |
) | |
model = load_silero_vad() | |
client = genai.Client(api_key=os.environ.get("GENAI_API_KEY")) # Do not share api key | |
"""**FastAPI**""" | |
app = FastAPI( | |
title="Real-Time Audio Processor", | |
description="Process and transcribe audio in real-time using Whisper" | |
) | |
def remove_silence(filename): | |
wav = read_audio(filename) | |
speech_timestamps = get_speech_timestamps(wav, model) | |
temp_file = create_temp_filename() | |
save_audio( | |
temp_file, | |
collect_chunks(speech_timestamps, wav), | |
sampling_rate=16000 | |
) | |
return temp_file | |
def create_temp_filename(): | |
# Step 1: Generate a unique file name using uuid | |
unique_id = str(uuid.uuid4()) | |
temp_file_name = f"{unique_id}.wav" | |
# Step 2: Create a temporary file | |
temp_file_path = os.path.join(tempfile.gettempdir(), temp_file_name) | |
return temp_file_path | |
def translate(text, srcLang, tgtLang): | |
sys_instruct = "You are a professional translator. Generate a translation of the text and return only the result. Return only the translated text." | |
response = client.models.generate_content( | |
model="gemini-2.0-flash", | |
config=types.GenerateContentConfig( | |
system_instruction=sys_instruct), | |
contents=f"Translate the text from {srcLang} to {tgtLang}: {text} ", | |
) | |
return response.text | |
def remove_file(file): | |
time.sleep(600) # delay for 10 minutes | |
os.remove(file) | |
def read_root(): | |
return{ | |
"detail":"Philippine Regional Language Translator", | |
} | |
async def translate_audio( | |
file: UploadFile = File(...), | |
srcLang: str = Form("Tagalog"), | |
tgtLang: str = Form("Cebuano") | |
): | |
try: | |
content = await file.read() | |
with open(file.filename, 'wb') as f: | |
f.write(content) | |
print(f"Successfully uploaded {file.filename}") | |
generate_kwargs = { | |
"language": "tagalog", | |
"return_timestamps": True, | |
# "condition_on_prev_tokens": False, | |
# "initial_prompt": "The sentence may be cut off, do not make up words to fill in the rest of the sentence." | |
} | |
temp_file = remove_silence(file.filename) | |
result = pipe( | |
temp_file, | |
batch_size=8, | |
return_timestamps=True, | |
generate_kwargs=generate_kwargs | |
) | |
# print(result) | |
result_dict = { | |
"transcribed_text": result['text'], | |
"translated_text": translate(result['text'], srcLang=srcLang, tgtLang=tgtLang), | |
"srcLang": srcLang, | |
"tgtLang": tgtLang | |
} | |
return result_dict | |
except Exception as error: | |
logging.error(f"Error translating audio {file.filename}: {error}") | |
raise HTTPException(status_code=500, detail=str(error)) | |
finally: | |
if file.file: | |
file.file.close() | |
if os.path.exists(file.filename): | |
os.remove(file.filename) | |
if os.path.exists(temp_file): | |
os.remove(temp_file) | |
async def translate_text(text: str, | |
srcLang: str = Form(...), | |
tgtLang: str = Form(...)): | |
result = translate(text, srcLang, tgtLang) | |
if not result: | |
logging.error("Translation failed for text: %s", text) | |
raise HTTPException(status_code=500, detail="Translation failed") | |
result_dict = { | |
"text": text, | |
"translated_text": result, | |
"srcLang": srcLang, | |
"tgtLang": tgtLang | |
} | |
return result_dict | |
async def synthesize(text: str): | |
model = VitsModel.from_pretrained("facebook/mms-tts-tgl") | |
tokenizer = VitsTokenizer.from_pretrained("facebook/mms-tts-tgl") | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model.to(device) | |
inputs = tokenizer(text, return_tensors="pt") | |
input_ids = inputs["input_ids"].to(device) | |
with torch.no_grad(): | |
outputs = model(input_ids) | |
speech = outputs["waveform"] | |
temp_file = create_temp_filename() | |
torchaudio.save(temp_file, speech.cpu(), 16000) | |
logging.info(f"Synthesizing completed for text: {text}") | |
return FileResponse( | |
temp_file, | |
media_type="audio/wav", | |
background=BackgroundTask(remove_file, temp_file) | |
) |