Spaces:
Runtime error
Runtime error
import os | |
import torch | |
from transformers import pipeline | |
# Loading the TTS and Vocoder ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
import sys | |
import re | |
import json | |
import inflect | |
import random | |
import uroman as ur | |
import numpy as np | |
import torchaudio | |
import subprocess | |
import requests | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from outetts.wav_tokenizer.decoder import WavTokenizer | |
# Function to execute shell commands safely | |
def run_command(command): | |
try: | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
shell=True, | |
universal_newlines=True | |
) | |
stdout, stderr = process.communicate() | |
if process.returncode != 0: | |
print(f"Error executing: {command}") | |
print(stderr) | |
return False | |
else: | |
print(stdout) | |
return True | |
except Exception as e: | |
print(f"Exception during execution of {command}: {e}") | |
return False | |
# Clone the YarnGPT repository | |
if not os.path.exists('yarngpt'): | |
print("Cloning YarnGPT repository...") | |
run_command("git clone https://github.com/saheedniyi02/yarngpt.git") | |
else: | |
print("YarnGPT repository already exists") | |
# # Install required packages with specific versions | |
# print("Installing required packages with specific versions...") | |
# run_command("pip install -q outetts==0.3.3 uroman==1.3.1.1") | |
# Add the yarngpt directory to Python path instead of installing it | |
yarngpt_path = os.path.join(os.getcwd(), 'yarngpt') | |
if os.path.exists(yarngpt_path) and yarngpt_path not in sys.path: | |
sys.path.append(yarngpt_path) | |
print(f"Added {yarngpt_path} to Python path") | |
# Now you should be able to import from yarngpt | |
# Import this after adding to path | |
try: | |
from yarngpt.audiotokenizer import AudioTokenizerV2 | |
print("Successfully imported AudioTokenizerV2 from yarngpt") | |
except ImportError as e: | |
print(f"Error importing from yarngpt: {e}") | |
# Check the content of the directory to debug | |
if os.path.exists(yarngpt_path): | |
print("Contents of yarngpt directory:") | |
print(os.listdir(yarngpt_path)) | |
# Download files using Python's requests library instead of !wget | |
def download_file(url, save_path): | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open(save_path, 'wb') as f: | |
f.write(response.content) | |
print(f"Downloaded {save_path}") | |
else: | |
print(f"Failed to download {url}") | |
# Download required files | |
download_file( | |
"https://huggingface.co/novateur/WavTokenizer-medium-speech-75token/resolve/main/wavtokenizer_mediumdata_frame75_3s_nq1_code4096_dim512_kmeans200_attn.yaml", | |
"wavtokenizer_mediumdata_frame75_3s_nq1_code4096_dim512_kmeans200_attn.yaml" | |
) | |
download_file( | |
"https://huggingface.co/novateur/WavTokenizer-large-speech-75token/resolve/main/wavtokenizer_large_speech_320_24k.ckpt", | |
"wavtokenizer_large_speech_320_24k.ckpt" | |
) | |
from yarngpt.audiotokenizer import AudioTokenizerV2 | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
tokenizer_path="saheedniyi/YarnGPT2" | |
wav_tokenizer_config_path = "./wavtokenizer_mediumdata_frame75_3s_nq1_code4096_dim512_kmeans200_attn.yaml" | |
wav_tokenizer_model_path = "./wavtokenizer_large_speech_320_24k.ckpt" | |
audio_tokenizer=AudioTokenizerV2(tokenizer_path,wav_tokenizer_model_path,wav_tokenizer_config_path) | |
tts_model = AutoModelForCausalLM.from_pretrained(tokenizer_path,torch_dtype="auto").to(audio_tokenizer.device) | |
# The LLM Model ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
from huggingface_hub import HfFolder | |
from openai import OpenAI | |
api_key = os.getenv("API_KEY") | |
if api_key is None: | |
raise ValueError("API_KEY is not set in the environment variables.") | |
print("API key successfully loaded.") | |
# Initialize OpenAI client for Hugging Face Inference Endpoint | |
client = OpenAI( | |
base_url="https://y1ztgv8tu09nay6u.us-east-1.aws.endpoints.huggingface.cloud/v1/", #https://f2iozzwigntrzkve.us-east-1.aws.endpoints.huggingface.cloud/v1/", | |
api_key=api_key | |
) | |
def generate_llm_response(text, model_id="ccibeekeoc42/Llama3.1-8b-base-SFT-2024-11-09"): | |
full_response = [] | |
try: | |
chat_completion = client.chat.completions.create( | |
model="tgi", | |
messages=[ | |
{"role": "system", "content": "You are HypaAI a very BRIEF AND DIRECT assistant. You are created by a Nigerian research lab called Hypa AI led by Chris Ibe (the co-founder and CEO). As part of a speech pipeline so keep your responses short (under 60 words), fluent, and straight to the point. Avoid markdown or digits in responses."}, | |
{"role": "user", "content": text} | |
], | |
top_p=0.3, | |
temperature=1, | |
max_tokens=150, | |
stream=True, | |
seed=None, | |
stop=None, | |
frequency_penalty=None, | |
presence_penalty=None | |
) | |
for chunk in chat_completion: | |
if chunk.choices[0].delta.content: | |
full_response.append(chunk.choices[0].delta.content) | |
return "".join(full_response) | |
except Exception as e: | |
# If the error has a response with status code 503, assume the GPU is booting up. | |
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 503: | |
return "The GPU is currently booting up. Please wait about 10 minutes and try again." | |
else: | |
raise e | |
# generate_llm_response("Explain Deep Learning in Igbo") | |
# Loading the ST Model (Whisper) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
pipe = pipeline("automatic-speech-recognition", model="okezieowen/whisper-small-multilingual-naija-11-03-2024", device=device) | |
# Take audio and return translated text | |
def transcribe(audio): | |
outputs = pipe(audio, max_new_tokens=256, generate_kwargs={"task": "transcribe"}) | |
return outputs["text"] | |
# putting the ST and TTS system together ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
import numpy as np | |
def synthesise_yarn2(text): | |
# change the language and voice | |
prompt=audio_tokenizer.create_prompt(text,lang="english",speaker_name="idera") | |
input_ids=audio_tokenizer.tokenize_prompt(prompt) | |
output = tts_model.generate( | |
input_ids=input_ids, | |
temperature=0.1, | |
repetition_penalty=1.1, | |
max_length=4000, | |
num_beams=5,# using a beam size helps for the local languages but not english | |
) | |
codes=audio_tokenizer.get_codes(output) | |
audio=audio_tokenizer.get_audio(codes) | |
return audio.cpu() | |
target_dtype = np.int16 | |
max_range = np.iinfo(target_dtype).max # Maximum value for 16-bit PCM audio conversion | |
def speech_to_speech_translation(audio, language="english"): | |
# Speech to Text | |
transcribed_text = transcribe(audio) | |
print(f"Transcribed: {transcribed_text}") | |
# Generate LLM Response | |
print("Now making LLM Call ~~~~~~~~~~~~~~~~~~~~~~~~") | |
llm_response = generate_llm_response(transcribed_text) | |
print(f"LLM Response: {llm_response}") | |
# Select a random voice based on the chosen language | |
voice_mapping = { | |
"english": ["idera", "chinenye", "jude", "emma", "umar", "joke", "zainab", "osagie", "remi", "tayo"], | |
"yoruba": ["yoruba_male2", "yoruba_female2", "yoruba_feamle1"], | |
"igbo": ["igbo_female2", "igbo_male2", "igbo_female1"], | |
"hausa": ["hausa_feamle1", "hausa_female2", "hausa_male2", "hausa_male1"] | |
} | |
selected_voice = random.choice(voice_mapping.get(language.lower(), voice_mapping["english"])) | |
print(f"Selected {language} voice: {selected_voice}") | |
# Text to Speech | |
print("Synthesizing Speech ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
# Use the selected language and voice | |
prompt = audio_tokenizer.create_prompt(llm_response, lang=language.lower(), speaker_name=selected_voice) | |
input_ids = audio_tokenizer.tokenize_prompt(prompt) | |
output = tts_model.generate( | |
input_ids=input_ids, | |
temperature=0.1, | |
repetition_penalty=1.1, | |
max_length=4000, | |
) | |
codes = audio_tokenizer.get_codes(output) | |
synthesised_speech = audio_tokenizer.get_audio(codes) | |
# Make sure we have a NumPy array, not a tensor | |
if hasattr(synthesised_speech, 'numpy'): | |
audio_np = synthesised_speech.numpy() | |
else: | |
audio_np = synthesised_speech | |
# Handle NaN and Inf values | |
audio_np = np.nan_to_num(audio_np) | |
# Ensure audio is in [-1, 1] range | |
if np.max(np.abs(audio_np)) > 0: | |
audio_np = audio_np / np.max(np.abs(audio_np)) | |
# Convert to signed int16 (-32768 to 32767) | |
int16_max = 32767 # Max value for signed 16-bit | |
audio_int16 = np.clip(audio_np * int16_max, -int16_max, int16_max).astype(np.int16) | |
# Ensure the audio is mono channel if needed | |
if len(audio_int16.shape) > 1 and audio_int16.shape[0] == 1: | |
audio_int16 = audio_int16[0] # Convert from [1, samples] to [samples] | |
# Debug info | |
print(f"Audio stats - Min: {np.min(audio_int16)}, Max: {np.max(audio_int16)}, Shape: {audio_int16.shape}") | |
# Ensure sample rate is within valid range (1-192000) | |
sample_rate = min(max(24000, 1), 192000) | |
print("Speech Synthesis Completed~~~~~~~~~~~~~~~~~~~") | |
return transcribed_text, llm_response, (sample_rate, audio_int16) | |
# Gradio Demo | |
import gradio as gr | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown("# Aware Speech-to-Speech Demo") | |
with gr.Tab("Microphone"): | |
with gr.Row(): | |
mic_input = gr.Audio(sources="microphone", type="filepath", label="Speak") | |
lang_dropdown_mic = gr.Dropdown( | |
choices=["English", "Yoruba", "Igbo", "Hausa"], | |
value="English", | |
label="Select Language" | |
) | |
mic_submit = gr.Button("Submit") | |
with gr.Row(): | |
mic_transcribed = gr.Textbox(label="Transcribed Text", interactive=False) | |
mic_response = gr.Textbox(label="HypaAI's Response", interactive=False) | |
mic_audio_output = gr.Audio(label="Generated Speech", type="numpy") | |
mic_submit.click( | |
fn=speech_to_speech_translation, | |
inputs=[mic_input, lang_dropdown_mic], | |
outputs=[mic_transcribed, mic_response, mic_audio_output] | |
) | |
with gr.Tab("Audio File"): | |
with gr.Row(): | |
file_input = gr.Audio(sources="upload", type="filepath", label="Upload Audio") | |
lang_dropdown_file = gr.Dropdown( | |
choices=["English", "Yoruba", "Igbo", "Hausa"], | |
value="English", | |
label="Select Language" | |
) | |
file_submit = gr.Button("Submit") | |
with gr.Row(): | |
file_transcribed = gr.Textbox(label="Transcribed Text", interactive=False) | |
file_response = gr.Textbox(label="HypaAI's Response", interactive=False) | |
file_audio_output = gr.Audio(label="Generated Speech", type="numpy") | |
file_submit.click( | |
fn=speech_to_speech_translation, | |
inputs=[file_input, lang_dropdown_file], | |
outputs=[file_transcribed, file_response, file_audio_output] | |
) | |
demo.launch(share=True) |