File size: 11,748 Bytes
5e57552
99be103
 
 
0019f2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99be103
 
 
5e57552
 
 
 
 
99be103
 
 
0e30cd1
99be103
 
 
 
23976f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1111f2b
 
 
 
 
 
99be103
9734753
99be103
23976f4
0019f2d
23976f4
 
 
 
 
 
 
 
 
0019f2d
99be103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9734753
 
99be103
9734753
 
99be103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9734753
99be103
 
 
 
 
 
9734753
99be103
 
9734753
99be103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59c6107
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import os
import torch
from transformers import pipeline

# Loading the TTS and Vocoder ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from datasets import load_dataset

processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
model_default = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# sending the model to device
model_default.to(device)
vocoder.to(device)

# Loading speaker embedings
embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0)

# The LLM Model ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from huggingface_hub import HfFolder
from openai import OpenAI

api_key = os.getenv("API_KEY")
if api_key is None:
    raise ValueError("API_KEY is not set in the environment variables.")
print("API key successfully loaded.")

# Initialize OpenAI client for Hugging Face Inference Endpoint
client = OpenAI(
	base_url="https://f2iozzwigntrzkve.us-east-1.aws.endpoints.huggingface.cloud/v1/",
	api_key=api_key
)

def generate_llm_response(text, model_id="ccibeekeoc42/Llama3.1-8b-base-SFT-2024-11-09"):
    full_response = []
    try:
        chat_completion = client.chat.completions.create(
            model="tgi",
            messages=[
                {"role": "system", "content": "You are a BRIEF AND DIRECT assistant. A part of a speech pipeline so keep your responses short, fluent, and straight to the point. Avoid markdown in responses."},
                {"role": "user", "content": text}
            ],
            top_p=None,
            temperature=None,
            max_tokens=75,
            stream=True,
            seed=None,
            stop=None,
            frequency_penalty=None,
            presence_penalty=None
        )
        for chunk in chat_completion:
            if chunk.choices[0].delta.content:
                full_response.append(chunk.choices[0].delta.content)
        return "".join(full_response)
    except Exception as e:
        # If the error has a response with status code 503, assume the GPU is booting up.
        if hasattr(e, 'response') and e.response is not None and e.response.status_code == 503:
            return "The GPU is currently booting up. Please wait about 10 minutes and try again."
        else:
            raise e

# generate_llm_response("Explain Deep Learning in Igbo")


# Loading the ST Model (Whisper) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
device = "cuda:0" if torch.cuda.is_available() else "cpu"
pipe = pipeline("automatic-speech-recognition", model="okezieowen/whisper-small-multilingual-naija-11-03-2024", device=device)

# Take audio and return translated text
def transcribe(audio):
    outputs = pipe(audio, max_new_tokens=256, generate_kwargs={"task": "transcribe"})
    return outputs["text"]


# Helper Functions to Cleanup LLM Texts ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Replacement rules
import re
# Language-specific replacements
ig_replacements = [('a', 'ah'), ('e', 'eh'), ('i', 'ee'), ('ị', 'ih'), ('ṅ', 'nn'), ('o','oh'), ('ọ','aw'), ('u','oo'), ('ụ','uh')]
yo_replacements = [('á', 'ah'), ('é', 'eh'), ('ẹ', 'e'), ('ó', 'oh'), ('ọ', 'aw'), ('ṣ', 'sh')]

# Overall Replacements Rules
replacements = [
    ('²','squared'), ('½','square-root'), ('¾','one quarter'), ('¼','cubeed-root'),
    ('ā','a'), ('â', 'a'), ('å','a'), ('á', 'a'), ('à', 'a'), ('ả', 'a'), ('ã', 'a'),
    ('č', 'c'), ('ç', 'c'),
    ('ë','e'), ('ẹ̀','e'), ('ẹ́','e'), ('é', 'e'), ('è', 'e'), ('ẻ', 'e'), ('ẽ', 'e'), ('ẹ', 'e'), ('ė', 'e'), ('ē', 'e'), ('ę', 'e'),
    ('ï', 'i'), ('ì', 'i'), ('ị', 'i'), ('ỉ', 'i'), ('ĩ', 'i'), ('í', 'i'), ('ī', 'i'),
    ('ń', 'n'), ('ň', 'n'), ('ń', 'n'), ('ṅ', 'n'), ('ñ', 'n'), ('ǹ', 'n'),
    ('ö','o'), ('ọ̀','o'), ('ò', 'o'), ('ó', 'o'), ('ô', 'o'), ('ọ', 'o'), ('ò','o'), ('ó','o'), ('ò','o'), ('õ','o'), ('ō','o'),
    ('ṣ', 's'), ('š', 's'),
    ('ụ', 'u'), ('ü', 'u'), ('ú', 'u'), ('ǔ', 'u'), ('ù', 'u'), ('ū', 'u'), ('ũ', 'u'),
    ('ω','omega'), ('θ','theta'), ('ł','w'),
    ('α','alpha'), ('β','beta'), ('γ','gamma'), ('δ','delta'), ('ε','epsilon'), ('ζ','zeta'), ('η','eta'), ('θ','theta'),
    ('ι','iota'), ('κ','kappa'), ('λ','lambda'), ('μ','mu'), ('ν','nu'), ('ξ','xi'), ('ο','omicron'), ('π','pi'),
    ('ρ','rho'),
    ('_',' '),
]

# Function to clean up text
def cleanup_text(example, lng="en"):
    example = example.lower()
    if lng == "ig":
        for src, dst in ig_replacements:
            example = example.replace(src, dst)
    elif lng == "yo":
        for src, dst in yo_replacements:
            example = example.replace(src, dst)
    for src, dst in replacements:
        example = example.replace(src, dst)  # Update text directly
    return example

# Normalizing the text
def normalize_text(text):
    text = text.lower() # Convert to lowercase
    text = re.sub(r'[^\w\s\']', '', text) # Remove punctuation (except apostrophes)
    text = ' '.join(text.split()) # Remove extra whitespace
    return text


# Language-specific number words
number_words = {
    "en": {  # English
        0: "zero", 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine",
        10: "ten", 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", 15: "fifteen", 16: "sixteen",
        17: "seventeen", 18: "eighteen", 19: "nineteen", 20: "twenty", 30: "thirty", 40: "forty", 50: "fifty",
        60: "sixty", 70: "seventy", 80: "eighty", 90: "ninety", 100: "hundred", 1000: "thousand"
    },
    "yo": {  # Yoruba
        0: "ódo", 1: "ọ̀kan", 2: "méjì", 3: "mẹ́ta", 4: "mẹ́rin", 5: "márùn", 6: "mẹ́fà", 7: "mẹ̀je", 8: "mẹ̀jọ", 9: "mẹ́sàn",
        10: "ẹ́wa", 11: "ọọkànlá", 12: "méjìlá", 13: "mẹ́tàlá", 14: "mẹ́rìnlá", 15: "árundínlógún", 16: "ẹ́rindínlógún", 17: "ẹ́rindínlógún",
        18: "ẹ́rindínlógún", 19: "ẹ́rindínlógún", 20: "ogún", 30: "ọgbọ̀n", 40: "ogójì", 50: "àádọ́ta", 60: "ọgọ́ta", 70: "àádọ́rin",
        80: "ọgọ́rin", 90: "àádọ́run", 100: "ọgọ́run", 1000: "ẹgbẹ̀rún"
    },
    "ig": {  # Igbo
        0: "nọọ", 1: "otu", 2: "abụọ", 3: "atọ", 4: "anọ", 5: "ise", 6: "isii", 7: "asaa", 8: "asatọ", 9: "itoolu",
        10: "iri", 11: "iri na otu", 12: "iri na abụọ", 13: "iri na atọ", 14: "iri na anọ", 15: "iri na ise",
        16: "iri na isii", 17: "iri na asaa", 18: "iri na asatọ", 19: "iri na itoolu", 20: "iri abụọ",
        30: "iri atọ", 40: "iri anọ", 50: "iri ise", 60: "iri isii", 70: "iri asaa", 80: "iri asatọ", 90: "iri itoolu",
        100: "nari", 1000: "puku"
    }
}

# Number to words function
def number_to_words(number, lang="en"):
    words = number_words[lang]

    if number < 20:
        return words[number]
    elif number < 100:
        tens, unit = divmod(number, 10)
        return words[tens * 10] + (" " + words[unit] if unit else "")
    elif number < 1000:
        hundreds, remainder = divmod(number, 100)
        return (words[hundreds] + " " + ("hundred" if lang == "en" else
                                         "ọgọ́rùn" if lang == "yo" else "nari") if hundreds > 1 else
                                         "hundred" if lang == "en" else "ọgọ́rùn" if lang == "yo" else "nari") + \
               (" " + number_to_words(remainder, lang) if remainder else "")
    elif number < 1000000:
        thousands, remainder = divmod(number, 1000)
        return (number_to_words(thousands, lang) + " " + ("thousand" if lang == "en" else
                                                          "ẹgbẹ̀rún" if lang == "yo" else "puku")) + \
               (" " + number_to_words(remainder, lang) if remainder else "")
    elif number < 1000000000:
        millions, remainder = divmod(number, 1000000)
        return number_to_words(millions, lang) + " " + ("million" if lang == "en" else
                                                        "mílíọ̀nù" if lang == "yo" else "nde") + \
               (" " + number_to_words(remainder, lang) if remainder else "")
    elif number < 1000000000000:
        billions, remainder = divmod(number, 1000000000)
        return number_to_words(billions, lang) + " " + ("billion" if lang == "en" else
                                                        "bílíọ̀nù" if lang == "yo" else "ijeri") + \
               (" " + number_to_words(remainder, lang) if remainder else "")
    else:
        return str(number)

# Replace numbers in text
def replace_numbers_with_words(text, lang="en"):
    def replace(match):
        number = int(match.group())
        return number_to_words(number, lang)

    # Replace all numbers in the text
    return re.sub(r'\b\d+\b', replace, text)

# llm_response = generate_llm_response("Explain Deep Learning in Igbo")
# llm_response_cleaned = normalize_text(cleanup_text(replace_numbers_with_words(llm_response, "yo"), "yo"))

# print(f"LLM Response: {llm_response}")
# print(f"LLM Response Cleaned: {llm_response_cleaned}")

# returning spech from text (and bringing to CPU)
def synthesise(text):
    inputs = processor(text=text, return_tensors="pt")
    speech = model_default.generate_speech(
        inputs["input_ids"].to(device), speaker_embeddings.to(device), vocoder=vocoder
    )
    return speech.cpu()


# putting the ST and TTS system together
import numpy as np

target_dtype = np.int16
max_range = np.iinfo(target_dtype).max # Maximum value for 16-bit PCM audio conversion

# Modified speech-to-speech translation with textbox
def speech_to_speech_translation(audio):
    # Speech to Text
    transcribed_text = transcribe(audio)
    print(f"Transcribed: {transcribed_text}")

    # Generate LLM Response
    print("Now making LLM Call ~~~~~~~~~~~~~~~~~~~~~~~~")
    llm_response = generate_llm_response(transcribed_text)
    llm_response_cleaned = normalize_text(cleanup_text(replace_numbers_with_words(llm_response, "yo"), "yo"))
    print(f"LLM Response: {llm_response}")
    print(f"LLM Response Cleaned: {llm_response_cleaned}")

    # Text to Speech
    print("Synthesizing Speech ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
    synthesised_speech = synthesise(llm_response_cleaned)
    synthesised_speech = (synthesised_speech.numpy() * max_range).astype(np.int16)
    print("Speech Synthesis Completed~~~~~~~~~~~~~~~~~~~")

    return transcribed_text, (16000, synthesised_speech), llm_response


# Gradio Demo
import gradio as gr

demo = gr.Blocks()

mic_translate = gr.Interface(
    fn=speech_to_speech_translation,
    inputs=gr.Audio(sources="microphone", type="filepath"),
    outputs=[
        gr.Textbox(label="Transcribed Text", interactive=False),
        gr.Audio(label="Generated Speech", type="numpy"),
        gr.Markdown(label="LLM Enhanced Response")  # New Markdown output
        ]
)

file_translate = gr.Interface(
    fn=speech_to_speech_translation,
    inputs=gr.Audio(sources="upload", type="filepath"),
    outputs=[
        gr.Textbox(label="Transcribed Text", interactive=False),
        gr.Audio(label="Generated Speech", type="numpy"),
        gr.Markdown(label="LLM Enhanced Response")  # New Markdown output
        ]
)

with demo:
    gr.TabbedInterface([mic_translate, file_translate], ["Microphone", "Audio File"])

demo.launch(share=True)