Spaces:
Sleeping
Sleeping
import os | |
import torch | |
from transformers import pipeline | |
# Loading the TTS and Vocoder ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan | |
from datasets import load_dataset | |
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") | |
model_default = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts") | |
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") | |
# sending the model to device | |
model_default.to(device) | |
vocoder.to(device) | |
# Loading speaker embedings | |
embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation") | |
speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0) | |
# The LLM Model ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
from huggingface_hub import HfFolder | |
from openai import OpenAI | |
api_key = os.getenv("API_KEY") | |
if api_key is None: | |
raise ValueError("API_KEY is not set in the environment variables.") | |
print("API key successfully loaded.") | |
# Initialize OpenAI client for Hugging Face Inference Endpoint | |
client = OpenAI( | |
base_url="https://y1ztgv8tu09nay6u.us-east-1.aws.endpoints.huggingface.cloud/v1/", #https://f2iozzwigntrzkve.us-east-1.aws.endpoints.huggingface.cloud/v1/", | |
api_key=api_key | |
) | |
def generate_llm_response(text, model_id="ccibeekeoc42/Llama3.1-8b-base-SFT-2024-11-09"): | |
full_response = [] | |
try: | |
chat_completion = client.chat.completions.create( | |
model="tgi", | |
messages=[ | |
{"role": "system", "content": "You are a very BRIEF AND DIRECT assistant. As part of a speech pipeline so keep your responses short (under 60 words), fluent, and straight to the point. Avoid markdown or digits in responses."}, | |
{"role": "user", "content": text} | |
], | |
top_p=0.3, | |
temperature=1, | |
max_tokens=80, | |
stream=True, | |
seed=None, | |
stop=None, | |
frequency_penalty=None, | |
presence_penalty=None | |
) | |
for chunk in chat_completion: | |
if chunk.choices[0].delta.content: | |
full_response.append(chunk.choices[0].delta.content) | |
return "".join(full_response) | |
except Exception as e: | |
# If the error has a response with status code 503, assume the GPU is booting up. | |
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 503: | |
return "The GPU is currently booting up. Please wait about 10 minutes and try again." | |
else: | |
raise e | |
# generate_llm_response("Explain Deep Learning in Igbo") | |
# Loading the ST Model (Whisper) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
pipe = pipeline("automatic-speech-recognition", model="okezieowen/whisper-small-multilingual-naija-11-03-2024", device=device) | |
# Take audio and return translated text | |
def transcribe(audio): | |
outputs = pipe(audio, max_new_tokens=256, generate_kwargs={"task": "transcribe"}) | |
return outputs["text"] | |
# Helper Functions to Cleanup LLM Texts ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Replacement rules | |
import re | |
# Language-specific replacements | |
ig_replacements = [('a', 'ah'), ('e', 'eh'), ('i', 'ee'), ('ị', 'ih'), ('ṅ', 'nn'), ('o','oh'), ('ọ','aw'), ('u','oo'), ('ụ','uh')] | |
yo_replacements = [('á', 'ah'), ('é', 'eh'), ('ẹ', 'e'), ('ó', 'oh'), ('ọ', 'aw'), ('ṣ', 'sh')] | |
# Overall Replacements Rules | |
replacements = [ | |
('²','squared'), ('½','square-root'), ('¾','one quarter'), ('¼','cubeed-root'), | |
('ā','a'), ('â', 'a'), ('å','a'), ('á', 'a'), ('à', 'a'), ('ả', 'a'), ('ã', 'a'), | |
('č', 'c'), ('ç', 'c'), | |
('ë','e'), ('ẹ̀','e'), ('ẹ́','e'), ('é', 'e'), ('è', 'e'), ('ẻ', 'e'), ('ẽ', 'e'), ('ẹ', 'e'), ('ė', 'e'), ('ē', 'e'), ('ę', 'e'), | |
('ï', 'i'), ('ì', 'i'), ('ị', 'i'), ('ỉ', 'i'), ('ĩ', 'i'), ('í', 'i'), ('ī', 'i'), | |
('ń', 'n'), ('ň', 'n'), ('ń', 'n'), ('ṅ', 'n'), ('ñ', 'n'), ('ǹ', 'n'), | |
('ö','o'), ('ọ̀','o'), ('ò', 'o'), ('ó', 'o'), ('ô', 'o'), ('ọ', 'o'), ('ò','o'), ('ó','o'), ('ò','o'), ('õ','o'), ('ō','o'), | |
('ṣ', 's'), ('š', 's'), | |
('ụ', 'u'), ('ü', 'u'), ('ú', 'u'), ('ǔ', 'u'), ('ù', 'u'), ('ū', 'u'), ('ũ', 'u'), | |
('ω','omega'), ('θ','theta'), ('ł','w'), | |
('α','alpha'), ('β','beta'), ('γ','gamma'), ('δ','delta'), ('ε','epsilon'), ('ζ','zeta'), ('η','eta'), ('θ','theta'), | |
('ι','iota'), ('κ','kappa'), ('λ','lambda'), ('μ','mu'), ('ν','nu'), ('ξ','xi'), ('ο','omicron'), ('π','pi'), | |
('ρ','rho'), | |
('_',' '), | |
] | |
# Function to clean up text | |
def cleanup_text(example, lng="en"): | |
example = example.lower() | |
if lng == "ig": | |
for src, dst in ig_replacements: | |
example = example.replace(src, dst) | |
elif lng == "yo": | |
for src, dst in yo_replacements: | |
example = example.replace(src, dst) | |
for src, dst in replacements: | |
example = example.replace(src, dst) # Update text directly | |
return example | |
# Normalizing the text | |
def normalize_text(text): | |
text = text.lower() # Convert to lowercase | |
text = re.sub(r'[^\w\s\']', '', text) # Remove punctuation (except apostrophes) | |
text = ' '.join(text.split()) # Remove extra whitespace | |
return text | |
# Language-specific number words | |
number_words = { | |
"en": { # English | |
0: "zero", 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine", | |
10: "ten", 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", 15: "fifteen", 16: "sixteen", | |
17: "seventeen", 18: "eighteen", 19: "nineteen", 20: "twenty", 30: "thirty", 40: "forty", 50: "fifty", | |
60: "sixty", 70: "seventy", 80: "eighty", 90: "ninety", 100: "hundred", 1000: "thousand" | |
}, | |
"yo": { # Yoruba | |
0: "ódo", 1: "ọ̀kan", 2: "méjì", 3: "mẹ́ta", 4: "mẹ́rin", 5: "márùn", 6: "mẹ́fà", 7: "mẹ̀je", 8: "mẹ̀jọ", 9: "mẹ́sàn", | |
10: "ẹ́wa", 11: "ọọkànlá", 12: "méjìlá", 13: "mẹ́tàlá", 14: "mẹ́rìnlá", 15: "árundínlógún", 16: "ẹ́rindínlógún", 17: "ẹ́rindínlógún", | |
18: "ẹ́rindínlógún", 19: "ẹ́rindínlógún", 20: "ogún", 30: "ọgbọ̀n", 40: "ogójì", 50: "àádọ́ta", 60: "ọgọ́ta", 70: "àádọ́rin", | |
80: "ọgọ́rin", 90: "àádọ́run", 100: "ọgọ́run", 1000: "ẹgbẹ̀rún" | |
}, | |
"ig": { # Igbo | |
0: "nọọ", 1: "otu", 2: "abụọ", 3: "atọ", 4: "anọ", 5: "ise", 6: "isii", 7: "asaa", 8: "asatọ", 9: "itoolu", | |
10: "iri", 11: "iri na otu", 12: "iri na abụọ", 13: "iri na atọ", 14: "iri na anọ", 15: "iri na ise", | |
16: "iri na isii", 17: "iri na asaa", 18: "iri na asatọ", 19: "iri na itoolu", 20: "iri abụọ", | |
30: "iri atọ", 40: "iri anọ", 50: "iri ise", 60: "iri isii", 70: "iri asaa", 80: "iri asatọ", 90: "iri itoolu", | |
100: "nari", 1000: "puku" | |
} | |
} | |
# Number to words function | |
def number_to_words(number, lang="en"): | |
words = number_words[lang] | |
if number < 20: | |
return words[number] | |
elif number < 100: | |
tens, unit = divmod(number, 10) | |
return words[tens * 10] + (" " + words[unit] if unit else "") | |
elif number < 1000: | |
hundreds, remainder = divmod(number, 100) | |
return (words[hundreds] + " " + ("hundred" if lang == "en" else | |
"ọgọ́rùn" if lang == "yo" else "nari") if hundreds > 1 else | |
"hundred" if lang == "en" else "ọgọ́rùn" if lang == "yo" else "nari") + \ | |
(" " + number_to_words(remainder, lang) if remainder else "") | |
elif number < 1000000: | |
thousands, remainder = divmod(number, 1000) | |
return (number_to_words(thousands, lang) + " " + ("thousand" if lang == "en" else | |
"ẹgbẹ̀rún" if lang == "yo" else "puku")) + \ | |
(" " + number_to_words(remainder, lang) if remainder else "") | |
elif number < 1000000000: | |
millions, remainder = divmod(number, 1000000) | |
return number_to_words(millions, lang) + " " + ("million" if lang == "en" else | |
"mílíọ̀nù" if lang == "yo" else "nde") + \ | |
(" " + number_to_words(remainder, lang) if remainder else "") | |
elif number < 1000000000000: | |
billions, remainder = divmod(number, 1000000000) | |
return number_to_words(billions, lang) + " " + ("billion" if lang == "en" else | |
"bílíọ̀nù" if lang == "yo" else "ijeri") + \ | |
(" " + number_to_words(remainder, lang) if remainder else "") | |
else: | |
return str(number) | |
# Replace numbers in text | |
def replace_numbers_with_words(text, lang="en"): | |
def replace(match): | |
number = int(match.group()) | |
return number_to_words(number, lang) | |
# Replace all numbers in the text | |
return re.sub(r'\b\d+\b', replace, text) | |
# llm_response = generate_llm_response("Explain Deep Learning in Igbo") | |
# llm_response_cleaned = normalize_text(cleanup_text(replace_numbers_with_words(llm_response, "yo"), "yo")) | |
# print(f"LLM Response: {llm_response}") | |
# print(f"LLM Response Cleaned: {llm_response_cleaned}") | |
# returning spech from text (and bringing to CPU) | |
def synthesise(text): | |
inputs = processor(text=text, return_tensors="pt") | |
speech = model_default.generate_speech( | |
inputs["input_ids"].to(device), speaker_embeddings.to(device), vocoder=vocoder | |
) | |
return speech.cpu() | |
# putting the ST and TTS system together | |
import numpy as np | |
target_dtype = np.int16 | |
max_range = np.iinfo(target_dtype).max # Maximum value for 16-bit PCM audio conversion | |
# Modified speech-to-speech translation with textbox | |
def speech_to_speech_translation(audio): | |
# Speech to Text | |
transcribed_text = transcribe(audio) | |
print(f"Transcribed: {transcribed_text}") | |
# Generate LLM Response | |
print("Now making LLM Call ~~~~~~~~~~~~~~~~~~~~~~~~") | |
llm_response = generate_llm_response(transcribed_text) | |
llm_response_cleaned = normalize_text(cleanup_text(replace_numbers_with_words(llm_response, "yo"), "yo")) | |
print(f"LLM Response: {llm_response}") | |
print(f"LLM Response Cleaned: {llm_response_cleaned}") | |
# Text to Speech | |
print("Synthesizing Speech ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
synthesised_speech = synthesise(llm_response_cleaned) | |
synthesised_speech = (synthesised_speech.numpy() * max_range).astype(np.int16) | |
print("Speech Synthesis Completed~~~~~~~~~~~~~~~~~~~") | |
return transcribed_text, (16000, synthesised_speech), llm_response | |
# Gradio Demo | |
import gradio as gr | |
demo = gr.Blocks() | |
mic_translate = gr.Interface( | |
fn=speech_to_speech_translation, | |
inputs=gr.Audio(sources="microphone", type="filepath"), | |
outputs=[ | |
gr.Textbox(label="Transcribed Text", interactive=True), | |
gr.Textbox(label="LLM Enhanced Response", interactive=False), # New Markdown output | |
gr.Audio(label="Generated Speech", type="numpy") | |
# gr.Markdown(label="LLM Enhanced Response") # New Markdown output | |
] | |
) | |
file_translate = gr.Interface( | |
fn=speech_to_speech_translation, | |
inputs=gr.Audio(sources="upload", type="filepath"), | |
outputs=[ | |
gr.Textbox(label="Transcribed Text", interactive=True), | |
gr.Textbox(label="LLM Enhanced Response", interactive=False), # New Markdown output | |
gr.Audio(label="Generated Speech", type="numpy") | |
# gr.Markdown(label="LLM Enhanced Response") # New Markdown output | |
] | |
) | |
with demo: | |
gr.TabbedInterface([mic_translate, file_translate], ["Microphone", "Audio File"]) | |
demo.launch(share=True) |