|
import os |
|
import sys |
|
import time |
|
import numpy as np |
|
import soundfile as sf |
|
from collections import defaultdict |
|
import json |
|
from datetime import datetime |
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
print('add current dir to sys.path', current_dir) |
|
sys.path.append(current_dir) |
|
|
|
from sparktts.models.audio_tokenizer import BiCodecTokenizer |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from utilities import generate_embeddings_batch |
|
|
|
def calculate_rtf(audio_length_seconds, processing_time_seconds): |
|
""" |
|
计算RTF (Real-Time Factor) |
|
RTF = 处理时间 / 音频长度 |
|
RTF < 1 表示实时处理,RTF > 1 表示处理时间超过音频长度 |
|
""" |
|
return processing_time_seconds / audio_length_seconds |
|
|
|
def generate_speech_batch_with_timing(model, tokenizer, texts, bicodec, prompt_text=None, prompt_audio=None, |
|
max_new_tokens=3000, do_sample=True, top_k=50, top_p=0.95, |
|
temperature=1.0, device="cuda:0"): |
|
""" |
|
带时间测量的批量语音生成函数 |
|
|
|
Returns: |
|
tuple: (音频波形列表, 处理时间, 音频长度列表) |
|
""" |
|
import torch |
|
|
|
|
|
eos_token_id = model.config.vocab_size - 1 |
|
|
|
|
|
embeddings, attention_mask = generate_embeddings_batch( |
|
model=model, |
|
tokenizer=tokenizer, |
|
texts=texts, |
|
bicodec=bicodec, |
|
prompt_text=prompt_text, |
|
prompt_audio=prompt_audio |
|
) |
|
|
|
batch_size = len(texts) |
|
global_tokens = embeddings['global_tokens'] |
|
|
|
|
|
model.eval() |
|
|
|
|
|
start_time = time.time() |
|
|
|
with torch.no_grad(): |
|
|
|
generated_outputs = model.generate( |
|
inputs_embeds=embeddings['input_embs'], |
|
attention_mask=attention_mask, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=do_sample, |
|
top_k=top_k, |
|
top_p=top_p, |
|
temperature=temperature, |
|
eos_token_id=eos_token_id, |
|
pad_token_id=tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer.eos_token_id, |
|
use_cache=True |
|
) |
|
|
|
|
|
wavs = [] |
|
eos_index = torch.where(generated_outputs == eos_token_id)[1] |
|
|
|
for i in range(batch_size): |
|
|
|
sample_outputs = generated_outputs[i] |
|
|
|
eos_token_id_index = eos_index[i] |
|
sample_outputs = sample_outputs[:eos_token_id_index] |
|
|
|
|
|
with torch.no_grad(): |
|
wav = bicodec.detokenize(global_tokens[i:i+1], sample_outputs.unsqueeze(0)) |
|
|
|
wavs.append(wav) |
|
|
|
|
|
end_time = time.time() |
|
processing_time = end_time - start_time |
|
|
|
|
|
sample_rate = bicodec.config['sample_rate'] |
|
audio_lengths = [len(wav) / sample_rate for wav in wavs] |
|
|
|
return wavs, processing_time, audio_lengths |
|
|
|
def generate_speech_with_timing(model, tokenizer, text, bicodec, prompt_text=None, prompt_audio=None, |
|
max_new_tokens=3000, do_sample=True, top_k=50, top_p=0.95, |
|
temperature=1.0, device="cuda:0"): |
|
""" |
|
带时间测量的单次语音生成函数 |
|
|
|
Returns: |
|
tuple: (音频波形, 处理时间, 音频长度) |
|
""" |
|
import torch |
|
|
|
|
|
eos_token_id = model.config.vocab_size - 1 |
|
|
|
|
|
embeddings, attention_mask = generate_embeddings_batch( |
|
model=model, |
|
tokenizer=tokenizer, |
|
texts=[text], |
|
bicodec=bicodec, |
|
prompt_text=prompt_text, |
|
prompt_audio=prompt_audio |
|
) |
|
|
|
batch_size = 1 |
|
global_tokens = embeddings['global_tokens'] |
|
|
|
|
|
model.eval() |
|
|
|
|
|
start_time = time.time() |
|
|
|
with torch.no_grad(): |
|
|
|
generated_outputs = model.generate( |
|
inputs_embeds=embeddings['input_embs'], |
|
attention_mask=attention_mask, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=do_sample, |
|
top_k=top_k, |
|
top_p=top_p, |
|
temperature=temperature, |
|
eos_token_id=eos_token_id, |
|
pad_token_id=tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer.eos_token_id, |
|
use_cache=True |
|
) |
|
|
|
|
|
sample_outputs = generated_outputs[0] |
|
eos_index = torch.where(generated_outputs[0] == eos_token_id)[0] |
|
if len(eos_index) > 0: |
|
sample_outputs = sample_outputs[:eos_index[0]] |
|
|
|
|
|
with torch.no_grad(): |
|
wav = bicodec.detokenize(global_tokens, sample_outputs.unsqueeze(0)) |
|
|
|
|
|
end_time = time.time() |
|
processing_time = end_time - start_time |
|
|
|
|
|
sample_rate = bicodec.config['sample_rate'] |
|
audio_length = len(wav) / sample_rate |
|
|
|
return wav, processing_time, audio_length |
|
|
|
def warmup_model(model, tokenizer, audio_tokenizer, prompt_audio=None, device="cuda:0", warmup_count=3): |
|
""" |
|
模型预热函数,进行几次不计算时间的生成 |
|
|
|
Args: |
|
model: 语言模型 |
|
tokenizer: 文本分词器 |
|
audio_tokenizer: BiCodecTokenizer实例 |
|
prompt_audio: 提示音频(可选) |
|
device: 设备 |
|
warmup_count: 预热次数 |
|
|
|
Returns: |
|
None |
|
""" |
|
import torch |
|
|
|
print(f"开始模型预热,进行 {warmup_count} 次生成...") |
|
|
|
|
|
warmup_texts = [ |
|
"你好,这是一个预热测试。", |
|
"人工智能技术正在快速发展。", |
|
"语音合成技术将文本转换为自然的语音输出。" |
|
] |
|
|
|
for i in range(warmup_count): |
|
print(f" 预热 {i+1}/{warmup_count}") |
|
|
|
|
|
warmup_text = warmup_texts[i % len(warmup_texts)] |
|
|
|
try: |
|
|
|
eos_token_id = model.config.vocab_size - 1 |
|
|
|
|
|
embeddings, attention_mask = generate_embeddings_batch( |
|
model=model, |
|
tokenizer=tokenizer, |
|
texts=[warmup_text], |
|
bicodec=audio_tokenizer, |
|
prompt_text=None, |
|
prompt_audio=prompt_audio |
|
) |
|
|
|
global_tokens = embeddings['global_tokens'] |
|
|
|
|
|
model.eval() |
|
|
|
with torch.no_grad(): |
|
|
|
generated_outputs = model.generate( |
|
inputs_embeds=embeddings['input_embs'], |
|
attention_mask=attention_mask, |
|
max_new_tokens=1000, |
|
do_sample=True, |
|
top_k=50, |
|
top_p=0.95, |
|
temperature=1.0, |
|
eos_token_id=eos_token_id, |
|
pad_token_id=tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer.eos_token_id, |
|
use_cache=True |
|
) |
|
|
|
|
|
sample_outputs = generated_outputs[0] |
|
eos_index = torch.where(generated_outputs[0] == eos_token_id)[0] |
|
if len(eos_index) > 0: |
|
sample_outputs = sample_outputs[:eos_index[0]] |
|
|
|
|
|
with torch.no_grad(): |
|
wav = audio_tokenizer.detokenize(global_tokens, sample_outputs.unsqueeze(0)) |
|
|
|
print(f" 预热完成,生成音频长度: {len(wav) / audio_tokenizer.config['sample_rate']:.2f}s") |
|
|
|
except Exception as e: |
|
print(f" 预热错误: {str(e)}") |
|
|
|
print("模型预热完成!") |
|
print("-" * 40) |
|
|
|
def run_rtf_batch_test(texts, model, tokenizer, audio_tokenizer, prompt_audio=None, |
|
device="cuda:0", output_dir="rtf_test_results", warmup_count=3, batch_size=4): |
|
""" |
|
运行批量RTF测试 |
|
|
|
Args: |
|
texts: 要测试的文本列表 |
|
model: 语言模型 |
|
tokenizer: 文本分词器 |
|
audio_tokenizer: BiCodecTokenizer实例 |
|
prompt_audio: 提示音频(可选) |
|
device: 设备 |
|
output_dir: 输出目录 |
|
warmup_count: 预热次数 |
|
batch_size: 批量大小 |
|
|
|
Returns: |
|
dict: 测试结果统计 |
|
""" |
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
warmup_model(model, tokenizer, audio_tokenizer, prompt_audio, device, warmup_count) |
|
|
|
|
|
results = [] |
|
total_processing_time = 0 |
|
total_audio_length = 0 |
|
|
|
print(f"开始批量RTF测试,共 {len(texts)} 个文本,批量大小: {batch_size}...") |
|
print("=" * 80) |
|
|
|
|
|
for batch_start in range(0, len(texts), batch_size): |
|
batch_end = min(batch_start + batch_size, len(texts)) |
|
batch_texts = texts[batch_start:batch_end] |
|
batch_num = batch_start // batch_size + 1 |
|
total_batches = (len(texts) + batch_size - 1) // batch_size |
|
|
|
print(f"\n处理批次 {batch_num}/{total_batches} (文本 {batch_start+1}-{batch_end})") |
|
print(f"批次文本数量: {len(batch_texts)}") |
|
|
|
try: |
|
|
|
wavs, processing_time, audio_lengths = generate_speech_batch_with_timing( |
|
model=model, |
|
tokenizer=tokenizer, |
|
texts=batch_texts, |
|
bicodec=audio_tokenizer, |
|
prompt_audio=prompt_audio, |
|
device=device |
|
) |
|
|
|
|
|
batch_total_audio_length = sum(audio_lengths) |
|
|
|
batch_rtf = calculate_rtf(batch_total_audio_length, processing_time) |
|
|
|
|
|
for i, (wav, audio_length) in enumerate(zip(wavs, audio_lengths)): |
|
text_index = batch_start + i |
|
text = batch_texts[i] |
|
|
|
|
|
output_filename = os.path.join(output_dir, f"test_{text_index+1:03d}.wav") |
|
sf.write(output_filename, wav, audio_tokenizer.config['sample_rate']) |
|
|
|
|
|
result = { |
|
"index": text_index + 1, |
|
"batch": batch_num, |
|
"text": text, |
|
"batch_processing_time": processing_time, |
|
"audio_length": audio_length, |
|
"batch_rtf": batch_rtf, |
|
"output_file": output_filename |
|
} |
|
results.append(result) |
|
|
|
print(f" 文本 {text_index+1}: 音频长度 {audio_length:.3f}s") |
|
|
|
print(f" 批次总处理时间: {processing_time:.3f}s") |
|
print(f" 批次总音频时长: {batch_total_audio_length:.3f}s") |
|
print(f" 批次RTF: {batch_rtf:.3f}") |
|
|
|
|
|
total_processing_time += processing_time |
|
total_audio_length += batch_total_audio_length |
|
|
|
except Exception as e: |
|
print(f" 批次错误: {str(e)}") |
|
|
|
for i, text in enumerate(batch_texts): |
|
text_index = batch_start + i |
|
result = { |
|
"index": text_index + 1, |
|
"batch": batch_num, |
|
"text": text, |
|
"error": str(e) |
|
} |
|
results.append(result) |
|
|
|
|
|
successful_results = [r for r in results if "error" not in r] |
|
|
|
if successful_results: |
|
|
|
batch_rtfs = [r["batch_rtf"] for r in successful_results] |
|
batch_processing_times = [r["batch_processing_time"] for r in successful_results] |
|
avg_audio_length = np.mean([r["audio_length"] for r in successful_results]) |
|
total_rtf = calculate_rtf(total_audio_length, total_processing_time) |
|
|
|
stats = { |
|
"total_tests": len(texts), |
|
"successful_tests": len(successful_results), |
|
"failed_tests": len(texts) - len(successful_results), |
|
"batch_size": batch_size, |
|
"total_batches": total_batches, |
|
"total_processing_time": total_processing_time, |
|
"total_audio_length": total_audio_length, |
|
"total_rtf": total_rtf, |
|
"avg_batch_rtf": np.mean(batch_rtfs), |
|
"avg_batch_processing_time": np.mean(batch_processing_times), |
|
"avg_audio_length": avg_audio_length, |
|
"min_batch_rtf": min(batch_rtfs), |
|
"max_batch_rtf": max(batch_rtfs), |
|
"std_batch_rtf": np.std(batch_rtfs) |
|
} |
|
else: |
|
stats = { |
|
"total_tests": len(texts), |
|
"successful_tests": 0, |
|
"failed_tests": len(texts), |
|
"batch_size": batch_size, |
|
"total_batches": total_batches, |
|
"error": "所有测试都失败了" |
|
} |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
results_file = os.path.join(output_dir, f"rtf_test_results_{timestamp}.json") |
|
|
|
output_data = { |
|
"test_info": { |
|
"timestamp": timestamp, |
|
"device": device, |
|
"model_path": current_dir, |
|
"batch_size": batch_size |
|
}, |
|
"statistics": stats, |
|
"detailed_results": results |
|
} |
|
|
|
with open(results_file, 'w', encoding='utf-8') as f: |
|
json.dump(output_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
print("\n" + "=" * 80) |
|
print("RTF测试统计结果:") |
|
print("=" * 80) |
|
print(f"总测试数: {stats['total_tests']}") |
|
print(f"成功测试数: {stats['successful_tests']}") |
|
print(f"失败测试数: {stats['failed_tests']}") |
|
print(f"批量大小: {batch_size}") |
|
print(f"总批次数: {total_batches}") |
|
|
|
if successful_results: |
|
print(f"总处理时间: {stats['total_processing_time']:.3f}s") |
|
print(f"总音频长度: {stats['total_audio_length']:.3f}s") |
|
print(f"总体RTF: {stats['total_rtf']:.3f}") |
|
print(f"平均批次RTF: {stats['avg_batch_rtf']:.3f}") |
|
print(f"平均批次处理时间: {stats['avg_batch_processing_time']:.3f}s") |
|
print(f"平均音频长度: {stats['avg_audio_length']:.3f}s") |
|
print(f"最小批次RTF: {stats['min_batch_rtf']:.3f}") |
|
print(f"最大批次RTF: {stats['max_batch_rtf']:.3f}") |
|
print(f"批次RTF标准差: {stats['std_batch_rtf']:.3f}") |
|
|
|
print(f"\n详细结果已保存到: {results_file}") |
|
print(f"音频文件保存在: {output_dir}") |
|
|
|
return stats, results |
|
|
|
if __name__ == "__main__": |
|
import torch |
|
|
|
device = 'cuda:2' |
|
|
|
|
|
print("正在加载模型和分词器...") |
|
audio_tokenizer = BiCodecTokenizer(model_dir=current_dir, device=device) |
|
tokenizer = AutoTokenizer.from_pretrained(current_dir, trust_remote_code=True) |
|
model = AutoModelForCausalLM.from_pretrained(current_dir, trust_remote_code=True) |
|
|
|
model = model.bfloat16().to(device) |
|
model.eval() |
|
|
|
|
|
prompt_audio = None |
|
prompt_audio_file = os.path.join(current_dir, 'kafka.wav') |
|
if os.path.exists(prompt_audio_file): |
|
print(f"加载提示音频: {prompt_audio_file}") |
|
prompt_audio, sampling_rate = sf.read(prompt_audio_file) |
|
|
|
target_sample_rate = audio_tokenizer.config['sample_rate'] |
|
if sampling_rate != target_sample_rate: |
|
print(f"重采样从 {sampling_rate}Hz 到 {target_sample_rate}Hz...") |
|
from librosa import resample |
|
prompt_audio = resample(prompt_audio, orig_sr=sampling_rate, target_sr=target_sample_rate) |
|
prompt_audio = np.array(prompt_audio, dtype=np.float32) |
|
|
|
|
|
test_texts = [ |
|
"一九五二年二月十日,志愿军大英雄张积慧击落美军双料王牌飞行员戴维斯,在自己飞机坠毁处距离戴维斯坠机处不足五百米的情况下,取得了世界空战史不可能复制的奇迹。伟大的张积慧。", |
|
"在数字浪潮汹涌的今天,数智技术正以前所未有的力量重塑着社会的每一个角落。", |
|
"为了点燃青少年对科技的热情,培养他们的创新思维与动手能力", |
|
"杏花岭区巨轮街道社区教育学校携手中车社区教育分校,与太原市科学技术协会联手,于暑期精心策划了一场别开生面的青少年数智技术服务港探索之旅,吸引了众多社区青少年的积极参与。", |
|
"一踏入数智技术服务港的大门,一股浓厚的科技气息便扑面而来。", |
|
"科普课堂上,“简易红绿灯”科学实验更是将抽象的电路原理与日常生活紧密相连。", |
|
"实验开始前,老师生动地介绍了实验物品,并引导青少年思考红绿灯的工作原理,激发了他们浓厚的探索兴趣。", |
|
"在老师的指导下,青少年们开始动手组装电路,将红绿灯的各个部件连接起来。", |
|
"他们小心翼翼地调整电路,确保每个部件都正确连接,红灯、绿灯、黄灯依次亮起,仿佛在讲述一个关于交通规则的故事。", |
|
"实验过程中,青少年们不仅学到了电路知识,还体验到了动手实践的乐趣。", |
|
"他们纷纷表示,这次实验不仅让他们对科技有了更深的理解,还培养了他们的创新思维和动手能力。", |
|
"数智技术服务港,让科技触手可及,让创新无处不在。", |
|
"人工智能技术正在快速发展,为各行各业带来了革命性的变化。", |
|
"深度学习模型在语音识别、图像处理、自然语言处理等领域取得了突破性进展。", |
|
"机器学习算法能够从大量数据中学习模式,并做出准确的预测和决策。", |
|
"神经网络模拟人脑的工作方式,通过多层神经元处理复杂的信息。", |
|
"计算机视觉技术让机器能够理解和分析图像内容。", |
|
"自然语言处理技术使计算机能够理解和生成人类语言。", |
|
"语音合成技术将文本转换为自然的语音输出。", |
|
"大数据分析帮助企业发现隐藏的模式和趋势。" |
|
] |
|
|
|
|
|
stats, results = run_rtf_batch_test( |
|
texts=test_texts, |
|
model=model, |
|
tokenizer=tokenizer, |
|
audio_tokenizer=audio_tokenizer, |
|
prompt_audio=prompt_audio, |
|
device=device, |
|
output_dir="rtf_test_results", |
|
warmup_count=1, |
|
batch_size=8 |
|
) |