from TTS.api import TTS import numpy as np import simpleaudio as sa import torch import threading import queue import os # Check if GPU is available if torch.cuda.is_available(): device = "cuda" else: device = "cpu" # Initialize the TTS object tts = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", progress_bar=True) tts.to(device) # Use GPU if available # Queue to hold audio chunks for playback audio_queue = queue.Queue() # Function to process text and generate audio chunks def tts_worker(text, speaker=None, language="en", reference_audio=None, chunk_size=10): words = text.split() chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] for chunk in chunks: print(f"Processing chunk: {chunk}") if reference_audio: # Use reference voice (voice cloning) audio = tts.tts( text=chunk, speaker_wav=reference_audio, language=language ) else: # Use existing speaker audio = tts.tts( text=chunk, speaker=speaker, language=language ) audio_data = np.array(audio, dtype=np.float32) audio_data = (audio_data * 32767).astype(np.int16) # Convert to 16-bit PCM audio_queue.put(audio_data) # Add the audio chunk to the queue audio_queue.put(None) # Signal the end of processing # Function to play audio chunks def audio_worker(): while True: audio_data = audio_queue.get() # Get the next audio chunk if audio_data is None: # End of processing break play_obj = sa.play_buffer(audio_data, 1, 2, tts.synthesizer.output_sample_rate) play_obj.wait_done() # Wait for the current chunk to finish playing # Function to list .wav files in the /clone/ folder def list_wav_files(): clone_folder = "clone" if not os.path.exists(clone_folder): print(f"Error: Folder '{clone_folder}' not found.") return [] wav_files = [f for f in os.listdir(clone_folder) if f.endswith(".wav")] if not wav_files: print(f"No .wav files found in '{clone_folder}'.") return [] return wav_files # Function to select voice type at the start def select_voice(): print("Select voice type:") print("1. Use an existing speaker (e.g., 'Ana Florence')") print("2. Use a reference voice (voice cloning)") choice = input("Enter your choice (1 or 2): ").strip() if choice == "1": speaker = input("Enter the speaker name (e.g., 'Ana Florence'): ").strip() reference_audio = None elif choice == "2": wav_files = list_wav_files() if not wav_files: return None, None print("Available .wav files for cloning:") for i, file in enumerate(wav_files): print(f"{i + 1}. {file}") file_choice = input("Enter the number of the .wav file to use: ").strip() try: file_choice = int(file_choice) - 1 if file_choice < 0 or file_choice >= len(wav_files): print("Invalid choice. Please try again.") return None, None reference_audio = os.path.join("clone", wav_files[file_choice]) speaker = None except ValueError: print("Invalid input. Please enter a number.") return None, None else: print("Invalid choice. Please try again.") return None, None return speaker, reference_audio # Select voice type at the start print("Welcome to the TTS streaming system!") speaker, reference_audio = select_voice() if speaker is None and reference_audio is None: exit("Voice selection failed. Exiting.") # Continuous loop for text input print("Enter text to generate speech. Type 'exit' to quit.") while True: # Get user input text = input("Enter text: ") # Exit the loop if the user types 'exit' if text.lower() == "exit": print("Exiting...") break # Start the TTS worker thread tts_thread = threading.Thread(target=tts_worker, args=(text, speaker, "en", reference_audio)) tts_thread.start() # Start the audio worker thread audio_thread = threading.Thread(target=audio_worker) audio_thread.start() # Wait for both threads to finish tts_thread.join() audio_thread.join() print("Streaming finished.")