respark / epoch2 /tts_streaming.py
yueyulin's picture
Upload folder using huggingface_hub
b3c4c5d verified
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
print('add current dir to sys.path', current_dir)
sys.path.append(current_dir)
from sparktts.models.audio_tokenizer import BiCodecTokenizer
from transformers import AutoTokenizer, AutoModelForCausalLM
import soundfile as sf
import numpy as np
import torch
from utilities import generate_embeddings
def generate_speech(model, tokenizer, text, bicodec, prompt_text=None, prompt_audio=None,
max_new_tokens=3000, do_sample=True, top_k=50, top_p=0.95,
temperature=1.0, device="cuda:0"):
"""
生成语音的函数
Args:
model: 语言模型
tokenizer: 文本分词器
text: 要生成语音的文本
bicodec: BiCodecTokenizer 实例
prompt_text: 提示文本(可选)
prompt_audio: 提示音频数组(可选)
max_new_tokens: 最大生成token数
do_sample: 是否使用采样
top_k: top-k采样参数
top_p: top-p采样参数
temperature: 温度参数
device: 设备
Returns:
wav: 生成的音频波形
"""
# 设置eos_token_id - 根据训练代码,eos_token_id = model.config.vocab_size - 1
eos_token_id = model.config.vocab_size - 1
print(f"EOS token ID: {eos_token_id}")
# 生成输入嵌入
embeddings = generate_embeddings(
model=model,
tokenizer=tokenizer,
text=text,
bicodec=bicodec,
prompt_text=prompt_text,
prompt_audio=prompt_audio
)
print("开始生成语音...")
print(f"输入嵌入形状: {embeddings['input_embs'].shape}")
global_tokens = embeddings['global_tokens'].unsqueeze(0)
# 设置模型为评估模式
print(f'embeddings dtype: {embeddings["input_embs"].dtype}')
model.eval()
with torch.no_grad():
# 使用模型的generate方法
generated_outputs = model.generate(
inputs_embeds=embeddings['input_embs'],
attention_mask=torch.ones((1, embeddings['input_embs'].shape[1]),dtype=torch.long,device=device),
max_new_tokens=max_new_tokens,
do_sample=do_sample,
top_k=top_k,
top_p=top_p,
temperature=temperature,
eos_token_id=eos_token_id,
pad_token_id=tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer.eos_token_id,
use_cache=True
)
print(f"generated_outputs: {generated_outputs}")
print(f"生成的token数量: {generated_outputs.shape}")
print(f"生成的token IDs: {generated_outputs.tolist()}")
# 直接使用生成的token ID作为semantic tokens
# 注意:这里生成的token ID是模型词表中的ID,不是原始tokenizer的词表
semantic_tokens_tensor = generated_outputs[:,:-1]
print(f"Semantic tokens shape: {semantic_tokens_tensor.shape}")
#simulate streaming
target_sample_rate = bicodec.config['sample_rate']
print(f"Global tokens shape: {global_tokens.shape}")
BUF_SIZE = 25 # since 50 tokens per second, 25 tokens is 0.5 second
chunk_size = 125 # start to generate audio after 125 tokens
buffered_semantic_tokens = torch.zeros((1, 0), dtype=torch.long, device=device)
whole_wav = np.array([], dtype=np.float32)
for i in range(0, semantic_tokens_tensor.shape[1], chunk_size):
buffered_size = buffered_semantic_tokens.shape[1]
current_semantic_tokens = semantic_tokens_tensor[:, i:i+chunk_size]
print(f"generate segmant [{i}:{i+chunk_size}]: shape {current_semantic_tokens.shape}")
current_semantic_tokens = torch.cat([buffered_semantic_tokens, current_semantic_tokens], dim=1)
print(f"After concat: shape {current_semantic_tokens.shape} with buffered shape {buffered_semantic_tokens.shape}")
buffered_semantic_tokens = current_semantic_tokens[:, -BUF_SIZE:]
with torch.no_grad():
wav = bicodec.detokenize(global_tokens, current_semantic_tokens)
print(f"Generated audio shape: {wav.shape}")
wav = wav[int(target_sample_rate * buffered_size/50):]
print(f"After cut: shape {wav.shape}")
whole_wav = np.concatenate([whole_wav, wav])
print(f"Whole wav shape: {whole_wav.shape}")
return whole_wav
device = 'cuda:2'
audio_tokenizer = BiCodecTokenizer(model_dir=current_dir, device=device)
print(audio_tokenizer)
tokenizer = AutoTokenizer.from_pretrained(current_dir, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(current_dir, trust_remote_code=True)
print(tokenizer)
print(model)
model = model.bfloat16().to(device)
model.eval()
prompt_text = "我们并不是通过物理移动手段找到星河的。"
prompt_audio_file = os.path.join(current_dir, 'kafka.wav')
prompt_audio, sampling_rate = sf.read(prompt_audio_file)
print(f"Loaded prompt audio from {prompt_audio_file}")
print(f"Original sampling rate: {sampling_rate}Hz")
print(f"Audio shape: {prompt_audio.shape}")
target_sample_rate = audio_tokenizer.config['sample_rate']
if sampling_rate != target_sample_rate:
print(f"Resampling from {sampling_rate}Hz to {target_sample_rate}Hz...")
from librosa import resample
prompt_audio = resample(prompt_audio, orig_sr=sampling_rate, target_sr=target_sample_rate)
prompt_audio = np.array(prompt_audio, dtype=np.float32)
print(f"Resampled audio shape: {prompt_audio.shape}")
else:
print(f"Audio sampling rate already matches target ({target_sample_rate}Hz)")
text = "二房他们已经接受了老爷子安排的:大房拿企业、二房拿钱的设定。富贵闲人他们也做了。在嫡长女和国资抢股权期间不出来搅局,就连老爷子的葬礼都没有露面,安安静静坐实老爷子一辈子的完美人设。"
wav = generate_speech(model, tokenizer, text, audio_tokenizer, prompt_audio=prompt_audio, device=device)
sf.write('output_streaming.wav', wav, target_sample_rate)