šļø Vibe Podcasting
Generating Long-form Multi-speaker AI Podcast with VibeVoice
""" VibeVoice Gradio Demo - High-Quality Dialogue Generation Interface with Streaming Support """ import argparse import json import os import sys import tempfile import time from pathlib import Path from typing import List, Dict, Any, Iterator from datetime import datetime import threading import numpy as np import gradio as gr import librosa import soundfile as sf import torch import os import traceback import spaces from vibevoice.modular.configuration_vibevoice import VibeVoiceConfig from vibevoice.modular.modeling_vibevoice_inference import VibeVoiceForConditionalGenerationInference from vibevoice.processor.vibevoice_processor import VibeVoiceProcessor from vibevoice.modular.streamer import AudioStreamer from transformers.utils import logging from transformers import set_seed logging.set_verbosity_info() logger = logging.get_logger(__name__) # import os # os.environ["FLASH_ATTENTION_2"] = "0" class VibeVoiceDemo: def __init__(self, model_path: str, device: str = "cuda", inference_steps: int = 5): """Initialize the VibeVoice demo with model loading.""" self.model_path = model_path self.device = device self.inference_steps = inference_steps self.is_generating = False # Track generation state self.stop_generation = False # Flag to stop generation self.current_streamer = None # Track current audio streamer self.load_model() self.setup_voice_presets() self.load_example_scripts() # Load example scripts def load_model(self): """Load the VibeVoice model and processor.""" print(f"Loading processor & model from {self.model_path}") # Load processor self.processor = VibeVoiceProcessor.from_pretrained( self.model_path, ) # Load model self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( self.model_path, torch_dtype=torch.bfloat16, device_map='cuda' ) self.model.eval() # Use SDE solver by default self.model.model.noise_scheduler = self.model.model.noise_scheduler.from_config( self.model.model.noise_scheduler.config, algorithm_type='sde-dpmsolver++', beta_schedule='squaredcos_cap_v2' ) self.model.set_ddpm_inference_steps(num_steps=self.inference_steps) if hasattr(self.model.model, 'language_model'): print(f"Language model attention: {self.model.model.language_model.config._attn_implementation}") def setup_voice_presets(self): """Setup voice presets by scanning the voices directory.""" voices_dir = os.path.join(os.path.dirname(__file__), "voices") # Check if voices directory exists if not os.path.exists(voices_dir): print(f"Warning: Voices directory not found at {voices_dir}") self.voice_presets = {} self.available_voices = {} return # Scan for all WAV files in the voices directory self.voice_presets = {} # Get all .wav files in the voices directory wav_files = [f for f in os.listdir(voices_dir) if f.lower().endswith(('.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac')) and os.path.isfile(os.path.join(voices_dir, f))] # Create dictionary with filename (without extension) as key for wav_file in wav_files: # Remove .wav extension to get the name name = os.path.splitext(wav_file)[0] # Create full path full_path = os.path.join(voices_dir, wav_file) self.voice_presets[name] = full_path # Sort the voice presets alphabetically by name for better UI self.voice_presets = dict(sorted(self.voice_presets.items())) # Filter out voices that don't exist (this is now redundant but kept for safety) self.available_voices = { name: path for name, path in self.voice_presets.items() if os.path.exists(path) } if not self.available_voices: raise gr.Error("No voice presets found. Please add .wav files to the demo/voices directory.") print(f"Found {len(self.available_voices)} voice files in {voices_dir}") print(f"Available voices: {', '.join(self.available_voices.keys())}") def read_audio(self, audio_path: str, target_sr: int = 24000) -> np.ndarray: """Read and preprocess audio file.""" try: wav, sr = sf.read(audio_path) if len(wav.shape) > 1: wav = np.mean(wav, axis=1) if sr != target_sr: wav = librosa.resample(wav, orig_sr=sr, target_sr=target_sr) return wav except Exception as e: print(f"Error reading audio {audio_path}: {e}") return np.array([]) @spaces.GPU def generate_podcast(self, num_speakers: int, script: str, speaker_1: str = None, speaker_2: str = None, speaker_3: str = None, speaker_4: str = None, cfg_scale: float = 1.3): """Single GPU function for full generation (streaming + final).""" self.stop_generation = False self.is_generating = True if not script.strip(): raise gr.Error("Please provide a script.") if num_speakers < 1 or num_speakers > 4: raise gr.Error("Number of speakers must be 1ā4.") selected = [speaker_1, speaker_2, speaker_3, speaker_4][:num_speakers] for i, sp in enumerate(selected): if not sp or sp not in self.available_voices: raise gr.Error(f"Invalid speaker {i+1} selection.") # load voices voice_samples = [self.read_audio(self.available_voices[sp]) for sp in selected] if any(len(v) == 0 for v in voice_samples): raise gr.Error("Failed to load one or more voice samples.") # format script lines = script.strip().split("\n") formatted = [] for i, line in enumerate(lines): line = line.strip() if not line: continue if line.startswith("Speaker "): formatted.append(line) else: sp_id = i % num_speakers formatted.append(f"Speaker {sp_id}: {line}") formatted_script = "\n".join(formatted) # processor input inputs = self.processor( text=[formatted_script], voice_samples=[voice_samples], padding=True, return_tensors="pt" ) # === direct generation with streamer === from vibevoice import AudioStreamer, convert_to_16_bit_wav audio_streamer = AudioStreamer(batch_size=1) start = time.time() outputs = self.model.generate( **inputs, cfg_scale=cfg_scale, tokenizer=self.processor.tokenizer, audio_streamer=audio_streamer, verbose=False ) sample_rate = 24000 audio_stream = audio_streamer.get_stream(0) all_chunks, pending = [], [] min_chunk_size = sample_rate * 2 last_yield = time.time() for chunk in audio_stream: if torch.is_tensor(chunk): chunk = chunk.float().cpu().numpy() if chunk.ndim > 1: chunk = chunk.squeeze() chunk16 = convert_to_16_bit_wav(chunk) all_chunks.append(chunk16) pending.append(chunk16) if sum(len(c) for c in pending) >= min_chunk_size or (time.time() - last_yield) > 5: new_audio = np.concatenate(pending) yield (sample_rate, new_audio), None, f"Streaming {len(all_chunks)} chunks..." pending = [] last_yield = time.time() if all_chunks: complete = np.concatenate(all_chunks) total_dur = len(complete) / sample_rate log = f"ā Generation complete in {time.time()-start:.1f}s, {total_dur:.1f}s audio" yield None, (sample_rate, complete), log else: yield None, None, "ā No audio generated." self.is_generating = False def stop_audio_generation(self): """Stop the current audio generation process.""" self.stop_generation = True if self.current_streamer is not None: try: self.current_streamer.end() except Exception as e: print(f"Error stopping streamer: {e}") print("š Audio generation stop requested") def load_example_scripts(self): """Load example scripts from the text_examples directory.""" examples_dir = os.path.join(os.path.dirname(__file__), "text_examples") self.example_scripts = [] # Check if text_examples directory exists if not os.path.exists(examples_dir): print(f"Warning: text_examples directory not found at {examples_dir}") return # Get all .txt files in the text_examples directory txt_files = sorted([f for f in os.listdir(examples_dir) if f.lower().endswith('.txt') and os.path.isfile(os.path.join(examples_dir, f))]) for txt_file in txt_files: file_path = os.path.join(examples_dir, txt_file) import re # Check if filename contains a time pattern like "45min", "90min", etc. time_pattern = re.search(r'(\d+)min', txt_file.lower()) if time_pattern: minutes = int(time_pattern.group(1)) if minutes > 15: print(f"Skipping {txt_file}: duration {minutes} minutes exceeds 15-minute limit") continue try: with open(file_path, 'r', encoding='utf-8') as f: script_content = f.read().strip() # Remove empty lines and lines with only whitespace script_content = '\n'.join(line for line in script_content.split('\n') if line.strip()) if not script_content: continue # Parse the script to determine number of speakers num_speakers = self._get_num_speakers_from_script(script_content) # Add to examples list as [num_speakers, script_content] self.example_scripts.append([num_speakers, script_content]) print(f"Loaded example: {txt_file} with {num_speakers} speakers") except Exception as e: print(f"Error loading example script {txt_file}: {e}") if self.example_scripts: print(f"Successfully loaded {len(self.example_scripts)} example scripts") else: print("No example scripts were loaded") def _get_num_speakers_from_script(self, script: str) -> int: """Determine the number of unique speakers in a script.""" import re speakers = set() lines = script.strip().split('\n') for line in lines: # Use regex to find speaker patterns match = re.match(r'^Speaker\s+(\d+)\s*:', line.strip(), re.IGNORECASE) if match: speaker_id = int(match.group(1)) speakers.add(speaker_id) # If no speakers found, default to 1 if not speakers: return 1 # Return the maximum speaker ID + 1 (assuming 0-based indexing) # or the count of unique speakers if they're 1-based max_speaker = max(speakers) min_speaker = min(speakers) if min_speaker == 0: return max_speaker + 1 else: # Assume 1-based indexing, return the count return len(speakers) def create_demo_interface(demo_instance: VibeVoiceDemo): """Create the Gradio interface with streaming support.""" # Custom CSS for high-end aesthetics with lighter theme custom_css = """ /* Modern light theme with gradients */ .gradio-container { background: linear-gradient(135deg, #f8fafc 0%, #e2e8f0 100%); font-family: 'SF Pro Display', -apple-system, BlinkMacSystemFont, sans-serif; } /* Header styling */ .main-header { background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); padding: 2rem; border-radius: 20px; margin-bottom: 2rem; text-align: center; box-shadow: 0 10px 40px rgba(102, 126, 234, 0.3); } .main-header h1 { color: white; font-size: 2.5rem; font-weight: 700; margin: 0; text-shadow: 0 2px 4px rgba(0,0,0,0.3); } .main-header p { color: rgba(255,255,255,0.9); font-size: 1.1rem; margin: 0.5rem 0 0 0; } /* Card styling */ .settings-card, .generation-card { background: rgba(255, 255, 255, 0.8); backdrop-filter: blur(10px); border: 1px solid rgba(226, 232, 240, 0.8); border-radius: 16px; padding: 1.5rem; margin-bottom: 1rem; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); } /* Speaker selection styling */ .speaker-grid { display: grid; gap: 1rem; margin-bottom: 1rem; } .speaker-item { background: linear-gradient(135deg, #e2e8f0 0%, #cbd5e1 100%); border: 1px solid rgba(148, 163, 184, 0.4); border-radius: 12px; padding: 1rem; color: #374151; font-weight: 500; } /* Streaming indicator */ .streaming-indicator { display: inline-block; width: 10px; height: 10px; background: #22c55e; border-radius: 50%; margin-right: 8px; animation: pulse 1.5s infinite; } @keyframes pulse { 0% { opacity: 1; transform: scale(1); } 50% { opacity: 0.5; transform: scale(1.1); } 100% { opacity: 1; transform: scale(1); } } /* Queue status styling */ .queue-status { background: linear-gradient(135deg, #f0f9ff 0%, #e0f2fe 100%); border: 1px solid rgba(14, 165, 233, 0.3); border-radius: 8px; padding: 0.75rem; margin: 0.5rem 0; text-align: center; font-size: 0.9rem; color: #0369a1; } .generate-btn { background: linear-gradient(135deg, #059669 0%, #0d9488 100%); border: none; border-radius: 12px; padding: 1rem 2rem; color: white; font-weight: 600; font-size: 1.1rem; box-shadow: 0 4px 20px rgba(5, 150, 105, 0.4); transition: all 0.3s ease; } .generate-btn:hover { transform: translateY(-2px); box-shadow: 0 6px 25px rgba(5, 150, 105, 0.6); } .stop-btn { background: linear-gradient(135deg, #ef4444 0%, #dc2626 100%); border: none; border-radius: 12px; padding: 1rem 2rem; color: white; font-weight: 600; font-size: 1.1rem; box-shadow: 0 4px 20px rgba(239, 68, 68, 0.4); transition: all 0.3s ease; } .stop-btn:hover { transform: translateY(-2px); box-shadow: 0 6px 25px rgba(239, 68, 68, 0.6); } /* Audio player styling */ .audio-output { background: linear-gradient(135deg, #f1f5f9 0%, #e2e8f0 100%); border-radius: 16px; padding: 1.5rem; border: 1px solid rgba(148, 163, 184, 0.3); } .complete-audio-section { margin-top: 1rem; padding: 1rem; background: linear-gradient(135deg, #f0fdf4 0%, #dcfce7 100%); border: 1px solid rgba(34, 197, 94, 0.3); border-radius: 12px; } /* Text areas */ .script-input, .log-output { background: rgba(255, 255, 255, 0.9) !important; border: 1px solid rgba(148, 163, 184, 0.4) !important; border-radius: 12px !important; color: #1e293b !important; font-family: 'JetBrains Mono', monospace !important; } .script-input::placeholder { color: #64748b !important; } /* Sliders */ .slider-container { background: rgba(248, 250, 252, 0.8); border: 1px solid rgba(226, 232, 240, 0.6); border-radius: 8px; padding: 1rem; margin: 0.5rem 0; } /* Labels and text */ .gradio-container label { color: #374151 !important; font-weight: 600 !important; } .gradio-container .markdown { color: #1f2937 !important; } /* Responsive design */ @media (max-width: 768px) { .main-header h1 { font-size: 2rem; } .settings-card, .generation-card { padding: 1rem; } } /* Random example button styling - more subtle professional color */ .random-btn { background: linear-gradient(135deg, #64748b 0%, #475569 100%); border: none; border-radius: 12px; padding: 1rem 1.5rem; color: white; font-weight: 600; font-size: 1rem; box-shadow: 0 4px 20px rgba(100, 116, 139, 0.3); transition: all 0.3s ease; display: inline-flex; align-items: center; gap: 0.5rem; } .random-btn:hover { transform: translateY(-2px); box-shadow: 0 6px 25px rgba(100, 116, 139, 0.4); background: linear-gradient(135deg, #475569 0%, #334155 100%); } """ with gr.Blocks( title="VibeVoice - AI Podcast Generator", css=custom_css, theme=gr.themes.Soft( primary_hue="blue", secondary_hue="purple", neutral_hue="slate", ) ) as interface: # Header gr.HTML("""
Generating Long-form Multi-speaker AI Podcast with VibeVoice