|
from TTS.api import TTS
|
|
import numpy as np
|
|
import simpleaudio as sa
|
|
import torch
|
|
import threading
|
|
import queue
|
|
import os
|
|
|
|
|
|
if torch.cuda.is_available():
|
|
device = "cuda"
|
|
else:
|
|
device = "cpu"
|
|
|
|
|
|
tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", progress_bar=True)
|
|
tts.to(device)
|
|
|
|
|
|
audio_queue = queue.Queue()
|
|
|
|
|
|
def tts_worker(text, speaker=None, language="en", reference_audio=None, chunk_size=10):
|
|
words = text.split()
|
|
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
|
|
|
|
for chunk in chunks:
|
|
print(f"Processing chunk: {chunk}")
|
|
if reference_audio:
|
|
|
|
audio = tts.tts(
|
|
text=chunk,
|
|
speaker_wav=reference_audio,
|
|
language=language
|
|
)
|
|
else:
|
|
|
|
audio = tts.tts(
|
|
text=chunk,
|
|
speaker=speaker,
|
|
language=language
|
|
)
|
|
audio_data = np.array(audio, dtype=np.float32)
|
|
audio_data = (audio_data * 32767).astype(np.int16)
|
|
audio_queue.put(audio_data)
|
|
|
|
audio_queue.put(None)
|
|
|
|
|
|
def audio_worker():
|
|
while True:
|
|
audio_data = audio_queue.get()
|
|
if audio_data is None:
|
|
break
|
|
play_obj = sa.play_buffer(audio_data, 1, 2, tts.synthesizer.output_sample_rate)
|
|
play_obj.wait_done()
|
|
|
|
|
|
def list_wav_files():
|
|
clone_folder = "clone"
|
|
if not os.path.exists(clone_folder):
|
|
print(f"Error: Folder '{clone_folder}' not found.")
|
|
return []
|
|
|
|
wav_files = [f for f in os.listdir(clone_folder) if f.endswith(".wav")]
|
|
if not wav_files:
|
|
print(f"No .wav files found in '{clone_folder}'.")
|
|
return []
|
|
|
|
return wav_files
|
|
|
|
|
|
def select_voice():
|
|
print("Select voice type:")
|
|
print("1. Use an existing speaker (e.g., 'Ana Florence')")
|
|
print("2. Use a reference voice (voice cloning)")
|
|
choice = input("Enter your choice (1 or 2): ").strip()
|
|
|
|
if choice == "1":
|
|
speaker = input("Enter the speaker name (e.g., 'Ana Florence'): ").strip()
|
|
reference_audio = None
|
|
elif choice == "2":
|
|
wav_files = list_wav_files()
|
|
if not wav_files:
|
|
return None, None
|
|
|
|
print("Available .wav files for cloning:")
|
|
for i, file in enumerate(wav_files):
|
|
print(f"{i + 1}. {file}")
|
|
|
|
file_choice = input("Enter the number of the .wav file to use: ").strip()
|
|
try:
|
|
file_choice = int(file_choice) - 1
|
|
if file_choice < 0 or file_choice >= len(wav_files):
|
|
print("Invalid choice. Please try again.")
|
|
return None, None
|
|
reference_audio = os.path.join("clone", wav_files[file_choice])
|
|
speaker = None
|
|
except ValueError:
|
|
print("Invalid input. Please enter a number.")
|
|
return None, None
|
|
else:
|
|
print("Invalid choice. Please try again.")
|
|
return None, None
|
|
|
|
return speaker, reference_audio
|
|
|
|
|
|
print("Welcome to the TTS streaming system!")
|
|
speaker, reference_audio = select_voice()
|
|
if speaker is None and reference_audio is None:
|
|
exit("Voice selection failed. Exiting.")
|
|
|
|
|
|
print("Enter text to generate speech. Type 'exit' to quit.")
|
|
while True:
|
|
|
|
text = input("Enter text: ")
|
|
|
|
|
|
if text.lower() == "exit":
|
|
print("Exiting...")
|
|
break
|
|
|
|
|
|
tts_thread = threading.Thread(target=tts_worker, args=(text, speaker, "en", reference_audio))
|
|
tts_thread.start()
|
|
|
|
|
|
audio_thread = threading.Thread(target=audio_worker)
|
|
audio_thread.start()
|
|
|
|
|
|
tts_thread.join()
|
|
audio_thread.join()
|
|
|
|
print("Streaming finished.") |