|
import os |
|
import sys |
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
print('add current dir to sys.path', current_dir) |
|
sys.path.append(current_dir) |
|
from sparktts.models.audio_tokenizer import BiCodecTokenizer |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
import soundfile as sf |
|
import numpy as np |
|
import torch |
|
from utilities import generate_embeddings |
|
def generate_speech(model, tokenizer, text, bicodec, prompt_text=None, prompt_audio=None, |
|
max_new_tokens=3000, do_sample=True, top_k=50, top_p=0.95, |
|
temperature=1.0, device="cuda:0"): |
|
""" |
|
生成语音的函数 |
|
|
|
Args: |
|
model: 语言模型 |
|
tokenizer: 文本分词器 |
|
text: 要生成语音的文本 |
|
bicodec: BiCodecTokenizer 实例 |
|
prompt_text: 提示文本(可选) |
|
prompt_audio: 提示音频数组(可选) |
|
max_new_tokens: 最大生成token数 |
|
do_sample: 是否使用采样 |
|
top_k: top-k采样参数 |
|
top_p: top-p采样参数 |
|
temperature: 温度参数 |
|
device: 设备 |
|
|
|
Returns: |
|
wav: 生成的音频波形 |
|
""" |
|
|
|
eos_token_id = model.config.vocab_size - 1 |
|
print(f"EOS token ID: {eos_token_id}") |
|
|
|
|
|
embeddings = generate_embeddings( |
|
model=model, |
|
tokenizer=tokenizer, |
|
text=text, |
|
bicodec=bicodec, |
|
prompt_text=prompt_text, |
|
prompt_audio=prompt_audio |
|
) |
|
|
|
print("开始生成语音...") |
|
print(f"输入嵌入形状: {embeddings['input_embs'].shape}") |
|
global_tokens = embeddings['global_tokens'].unsqueeze(0) |
|
|
|
print(f'embeddings dtype: {embeddings["input_embs"].dtype}') |
|
model.eval() |
|
|
|
with torch.no_grad(): |
|
|
|
generated_outputs = model.generate( |
|
inputs_embeds=embeddings['input_embs'], |
|
attention_mask=torch.ones((1, embeddings['input_embs'].shape[1]),dtype=torch.long,device=device), |
|
max_new_tokens=max_new_tokens, |
|
do_sample=do_sample, |
|
top_k=top_k, |
|
top_p=top_p, |
|
temperature=temperature, |
|
eos_token_id=eos_token_id, |
|
pad_token_id=tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer.eos_token_id, |
|
use_cache=True |
|
) |
|
print(f"generated_outputs: {generated_outputs}") |
|
|
|
print(f"生成的token数量: {generated_outputs.shape}") |
|
print(f"生成的token IDs: {generated_outputs.tolist()}") |
|
|
|
|
|
|
|
semantic_tokens_tensor = generated_outputs[:,:-1] |
|
|
|
print(f"Semantic tokens shape: {semantic_tokens_tensor.shape}") |
|
|
|
|
|
target_sample_rate = bicodec.config['sample_rate'] |
|
print(f"Global tokens shape: {global_tokens.shape}") |
|
BUF_SIZE = 25 |
|
chunk_size = 125 |
|
buffered_semantic_tokens = torch.zeros((1, 0), dtype=torch.long, device=device) |
|
whole_wav = np.array([], dtype=np.float32) |
|
for i in range(0, semantic_tokens_tensor.shape[1], chunk_size): |
|
buffered_size = buffered_semantic_tokens.shape[1] |
|
current_semantic_tokens = semantic_tokens_tensor[:, i:i+chunk_size] |
|
print(f"generate segmant [{i}:{i+chunk_size}]: shape {current_semantic_tokens.shape}") |
|
current_semantic_tokens = torch.cat([buffered_semantic_tokens, current_semantic_tokens], dim=1) |
|
print(f"After concat: shape {current_semantic_tokens.shape} with buffered shape {buffered_semantic_tokens.shape}") |
|
buffered_semantic_tokens = current_semantic_tokens[:, -BUF_SIZE:] |
|
with torch.no_grad(): |
|
wav = bicodec.detokenize(global_tokens, current_semantic_tokens) |
|
print(f"Generated audio shape: {wav.shape}") |
|
wav = wav[int(target_sample_rate * buffered_size/50):] |
|
print(f"After cut: shape {wav.shape}") |
|
whole_wav = np.concatenate([whole_wav, wav]) |
|
print(f"Whole wav shape: {whole_wav.shape}") |
|
return whole_wav |
|
|
|
device = 'cuda:2' |
|
|
|
audio_tokenizer = BiCodecTokenizer(model_dir=current_dir, device=device) |
|
|
|
print(audio_tokenizer) |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(current_dir, trust_remote_code=True) |
|
model = AutoModelForCausalLM.from_pretrained(current_dir, trust_remote_code=True) |
|
print(tokenizer) |
|
print(model) |
|
|
|
model = model.bfloat16().to(device) |
|
model.eval() |
|
|
|
prompt_text = "我们并不是通过物理移动手段找到星河的。" |
|
prompt_audio_file = os.path.join(current_dir, 'kafka.wav') |
|
prompt_audio, sampling_rate = sf.read(prompt_audio_file) |
|
|
|
print(f"Loaded prompt audio from {prompt_audio_file}") |
|
print(f"Original sampling rate: {sampling_rate}Hz") |
|
print(f"Audio shape: {prompt_audio.shape}") |
|
|
|
target_sample_rate = audio_tokenizer.config['sample_rate'] |
|
if sampling_rate != target_sample_rate: |
|
print(f"Resampling from {sampling_rate}Hz to {target_sample_rate}Hz...") |
|
from librosa import resample |
|
prompt_audio = resample(prompt_audio, orig_sr=sampling_rate, target_sr=target_sample_rate) |
|
prompt_audio = np.array(prompt_audio, dtype=np.float32) |
|
print(f"Resampled audio shape: {prompt_audio.shape}") |
|
else: |
|
print(f"Audio sampling rate already matches target ({target_sample_rate}Hz)") |
|
|
|
text = "二房他们已经接受了老爷子安排的:大房拿企业、二房拿钱的设定。富贵闲人他们也做了。在嫡长女和国资抢股权期间不出来搅局,就连老爷子的葬礼都没有露面,安安静静坐实老爷子一辈子的完美人设。" |
|
wav = generate_speech(model, tokenizer, text, audio_tokenizer, prompt_audio=prompt_audio, device=device) |
|
sf.write('output_streaming.wav', wav, target_sample_rate) |
|
|
|
|
|
|
|
|
|
|