Upload rwkv7-0.4B-g1-respark-voice-tunable_ipa/ref_audio_utilities.py with huggingface_hub
Browse files
rwkv7-0.4B-g1-respark-voice-tunable_ipa/ref_audio_utilities.py
ADDED
@@ -0,0 +1,306 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import onnxruntime as ort
|
2 |
+
import numpy as np
|
3 |
+
import librosa
|
4 |
+
import soundfile as sf
|
5 |
+
import soxr
|
6 |
+
from pathlib import Path
|
7 |
+
from typing import Tuple, Union, Optional
|
8 |
+
import soundfile as sf
|
9 |
+
|
10 |
+
|
11 |
+
class RefAudioUtilities:
|
12 |
+
"""音频处理工具类,使用ONNX模型生成tokens"""
|
13 |
+
|
14 |
+
def __init__(self, onnx_model_path: str, wav2vec2_path,
|
15 |
+
ref_segment_duration: float = 6.0, latent_hop_length: int = 320):
|
16 |
+
"""
|
17 |
+
初始化ONNX模型
|
18 |
+
|
19 |
+
Args:
|
20 |
+
onnx_model_path: ONNX模型文件路径
|
21 |
+
wav2vec2_path: wav2vec2 ONNX模型文件路径,如果为None则不加载wav2vec2模型
|
22 |
+
ref_segment_duration: 参考音频时长(秒)
|
23 |
+
latent_hop_length: 潜在特征跳长度
|
24 |
+
"""
|
25 |
+
self.ort_session = ort.InferenceSession(onnx_model_path,
|
26 |
+
providers=['CUDAExecutionProvider','CPUExecutionProvider'])
|
27 |
+
print(f"🖥️ONNX Session actual providers: {self.ort_session.get_providers()}")
|
28 |
+
self.sample_rate = 16000
|
29 |
+
self.ref_segment_duration = ref_segment_duration
|
30 |
+
self.latent_hop_length = latent_hop_length
|
31 |
+
|
32 |
+
# 获取模型输入输出信息
|
33 |
+
self.input_names = [input_info.name for input_info in self.ort_session.get_inputs()]
|
34 |
+
self.output_names = [output_info.name for output_info in self.ort_session.get_outputs()]
|
35 |
+
|
36 |
+
print(f"模型输入: {self.input_names}")
|
37 |
+
print(f"模型输出: {self.output_names}")
|
38 |
+
|
39 |
+
# 初始化wav2vec2模型
|
40 |
+
self.wav2vec2_session = ort.InferenceSession(wav2vec2_path,
|
41 |
+
providers=['CUDAExecutionProvider','CPUExecutionProvider'])
|
42 |
+
print(f"🖥️Wav2Vec2 Session actual providers: {self.wav2vec2_session.get_providers()}")
|
43 |
+
def load_audio(self, audio_path: Union[str, Path], target_sr: int = 16000,
|
44 |
+
volume_normalize: bool = False) -> np.ndarray:
|
45 |
+
"""
|
46 |
+
加载音频文件,与BiCodecTokenizer保持一致
|
47 |
+
|
48 |
+
Args:
|
49 |
+
audio_path: 音频文件路径
|
50 |
+
target_sr: 目标采样率
|
51 |
+
volume_normalize: 是否进行音量归一化
|
52 |
+
|
53 |
+
Returns:
|
54 |
+
音频数据数组
|
55 |
+
"""
|
56 |
+
if isinstance(audio_path, str):
|
57 |
+
audio_path = Path(audio_path)
|
58 |
+
|
59 |
+
# 使用soundfile加载音频,与BiCodecTokenizer保持一致
|
60 |
+
audio, sr = sf.read(audio_path)
|
61 |
+
if len(audio.shape) > 1:
|
62 |
+
audio = audio[:, 0] # 如果是立体声,取第一个通道
|
63 |
+
|
64 |
+
# 重采样到目标采样率
|
65 |
+
if sr != target_sr:
|
66 |
+
audio = soxr.resample(audio, sr, target_sr, quality="VHQ")
|
67 |
+
sr = target_sr
|
68 |
+
|
69 |
+
# 音量归一化
|
70 |
+
if volume_normalize:
|
71 |
+
audio = self._audio_volume_normalize(audio)
|
72 |
+
|
73 |
+
return audio
|
74 |
+
|
75 |
+
def _audio_volume_normalize(self, audio: np.ndarray, coeff: float = 0.2) -> np.ndarray:
|
76 |
+
"""音频音量归一化"""
|
77 |
+
# Sort the absolute values of the audio signal
|
78 |
+
temp = np.sort(np.abs(audio))
|
79 |
+
|
80 |
+
# If the maximum value is less than 0.1, scale the array to have a maximum of 0.1
|
81 |
+
if temp[-1] < 0.1:
|
82 |
+
scaling_factor = max(
|
83 |
+
temp[-1], 1e-3
|
84 |
+
) # Prevent division by zero with a small constant
|
85 |
+
audio = audio / scaling_factor * 0.1
|
86 |
+
|
87 |
+
# Filter out values less than 0.01 from temp
|
88 |
+
temp = temp[temp > 0.01]
|
89 |
+
L = temp.shape[0] # Length of the filtered array
|
90 |
+
|
91 |
+
# If there are fewer than or equal to 10 significant values, return the audio without further processing
|
92 |
+
if L <= 10:
|
93 |
+
return audio
|
94 |
+
|
95 |
+
# Compute the average of the top 10% to 1% of values in temp
|
96 |
+
volume = np.mean(temp[int(0.9 * L) : int(0.99 * L)])
|
97 |
+
|
98 |
+
# Normalize the audio to the target coefficient level, clamping the scale factor between 0.1 and 10
|
99 |
+
audio = audio * np.clip(coeff / volume, a_min=0.1, a_max=10)
|
100 |
+
|
101 |
+
# Ensure the maximum absolute value in the audio does not exceed 1
|
102 |
+
max_value = np.max(np.abs(audio))
|
103 |
+
if max_value > 1:
|
104 |
+
audio = audio / max_value
|
105 |
+
|
106 |
+
return audio
|
107 |
+
|
108 |
+
def extract_mel_spectrogram(self, wav: np.ndarray, n_mels: int = 128,
|
109 |
+
n_fft: int = 1024, hop_length: int = 320,
|
110 |
+
win_length: int = 640) -> np.ndarray:
|
111 |
+
"""
|
112 |
+
提取梅尔频谱图
|
113 |
+
|
114 |
+
Args:
|
115 |
+
wav: 音频数据
|
116 |
+
n_mels: 梅尔滤波器组数量
|
117 |
+
n_fft: FFT窗口大小
|
118 |
+
hop_length: 帧移
|
119 |
+
win_length: 窗口长度
|
120 |
+
|
121 |
+
Returns:
|
122 |
+
梅尔频谱图
|
123 |
+
"""
|
124 |
+
mel_spec = librosa.feature.melspectrogram(
|
125 |
+
y=wav,
|
126 |
+
sr=self.sample_rate,
|
127 |
+
n_mels=n_mels,
|
128 |
+
n_fft=n_fft,
|
129 |
+
hop_length=hop_length,
|
130 |
+
win_length=win_length,
|
131 |
+
power=1,
|
132 |
+
norm="slaney",
|
133 |
+
fmin=10,
|
134 |
+
)
|
135 |
+
|
136 |
+
return mel_spec
|
137 |
+
|
138 |
+
def extract_wav2vec2_features(self, wav: np.ndarray) -> np.ndarray:
|
139 |
+
"""
|
140 |
+
使用ONNX wav2vec2模型提取特征,模拟BiCodecTokenizer的行为
|
141 |
+
|
142 |
+
Args:
|
143 |
+
wav: 音频数据
|
144 |
+
|
145 |
+
Returns:
|
146 |
+
特征向量
|
147 |
+
"""
|
148 |
+
# 检查wav2vec2模型是否已加载
|
149 |
+
if self.wav2vec2_session is None:
|
150 |
+
raise RuntimeError("wav2vec2模型未加载,请在初始化时提供wav2vec2_path参数")
|
151 |
+
|
152 |
+
# 添加batch维度
|
153 |
+
input_data = wav[np.newaxis, :].astype(np.float32) # [1, sequence_length]
|
154 |
+
|
155 |
+
# 运行wav2vec2推理
|
156 |
+
# 注意:这个ONNX模型已经包含了特征提取器的预处理和多个隐藏层的组合
|
157 |
+
inputs = {'input': input_data}
|
158 |
+
outputs = self.wav2vec2_session.run(None, inputs)
|
159 |
+
|
160 |
+
# 输出形状应该是 [1, time_steps, 1024]
|
161 |
+
# 这个输出已经是通过选择隐藏层11, 14, 16并计算平均值得到的
|
162 |
+
print(f'outputs: {outputs}')
|
163 |
+
print(f'outputs: {outputs[0].shape}')
|
164 |
+
features = outputs[0][0] # 移除batch维度,得到 [time_steps, 1024]
|
165 |
+
|
166 |
+
return features.astype(np.float32)
|
167 |
+
|
168 |
+
|
169 |
+
|
170 |
+
def get_ref_clip(self, wav: np.ndarray) -> np.ndarray:
|
171 |
+
"""
|
172 |
+
获取参考音频片段,与BiCodecTokenizer保持一致
|
173 |
+
|
174 |
+
Args:
|
175 |
+
wav: 原始音频数据
|
176 |
+
|
177 |
+
Returns:
|
178 |
+
参考音频片段
|
179 |
+
"""
|
180 |
+
# 使用与BiCodecTokenizer相同的计算方式
|
181 |
+
ref_segment_length = (
|
182 |
+
int(self.sample_rate * self.ref_segment_duration)
|
183 |
+
// self.latent_hop_length
|
184 |
+
* self.latent_hop_length
|
185 |
+
)
|
186 |
+
wav_length = len(wav)
|
187 |
+
|
188 |
+
if ref_segment_length > wav_length:
|
189 |
+
# 如果音频不足指定长度,重复音频直到达到要求
|
190 |
+
repeat_times = ref_segment_length // wav_length + 1
|
191 |
+
wav = np.tile(wav, repeat_times)
|
192 |
+
|
193 |
+
# 截取指定长度
|
194 |
+
return wav[:ref_segment_length]
|
195 |
+
|
196 |
+
def process_audio(self, audio_path: Union[str, Path], volume_normalize: bool = False) -> Tuple[np.ndarray, np.ndarray]:
|
197 |
+
"""
|
198 |
+
处理音频文件,返回原始音频和参考音频,与BiCodecTokenizer保持一致
|
199 |
+
|
200 |
+
Args:
|
201 |
+
audio_path: 音频文件路径
|
202 |
+
volume_normalize: 是否进行音量归一化
|
203 |
+
|
204 |
+
Returns:
|
205 |
+
(原始音频, 参考音频)
|
206 |
+
"""
|
207 |
+
wav = self.load_audio(audio_path, volume_normalize=volume_normalize)
|
208 |
+
ref_wav = self.get_ref_clip(wav)
|
209 |
+
|
210 |
+
return wav, ref_wav
|
211 |
+
|
212 |
+
def tokenize(self, audio_path: Union[str, Path]) -> Tuple[np.ndarray, np.ndarray]:
|
213 |
+
"""
|
214 |
+
使用ONNX模型生成tokens
|
215 |
+
|
216 |
+
Args:
|
217 |
+
audio_path: 音频文件路径
|
218 |
+
|
219 |
+
Returns:
|
220 |
+
(global_tokens, semantic_tokens)
|
221 |
+
"""
|
222 |
+
# 处理音频
|
223 |
+
wav, ref_wav = self.process_audio(audio_path)
|
224 |
+
|
225 |
+
# 提取特征
|
226 |
+
feat = self.extract_wav2vec2_features(wav)
|
227 |
+
ref_mel = self.extract_mel_spectrogram(ref_wav)
|
228 |
+
|
229 |
+
|
230 |
+
# 添加batch维度
|
231 |
+
ref_mel_input = ref_mel[np.newaxis, :, :].astype(np.float32) # [1, 128, 301]
|
232 |
+
feat_input = feat[np.newaxis, :, :].astype(np.float32) # [1, feat_len, 1024]
|
233 |
+
|
234 |
+
# 运行ONNX模型
|
235 |
+
inputs = {
|
236 |
+
'ref_wav_mel': ref_mel_input,
|
237 |
+
'feat': feat_input
|
238 |
+
}
|
239 |
+
|
240 |
+
outputs = self.ort_session.run(self.output_names, inputs)
|
241 |
+
|
242 |
+
# 解析输出
|
243 |
+
semantic_tokens = outputs[0] # 第一个输出
|
244 |
+
global_tokens = outputs[1] # 第二个输出
|
245 |
+
|
246 |
+
return global_tokens, semantic_tokens
|
247 |
+
|
248 |
+
def tokenize_batch(self, audio_paths: list) -> Tuple[list, list]:
|
249 |
+
"""
|
250 |
+
批量处理音频文件
|
251 |
+
|
252 |
+
Args:
|
253 |
+
audio_paths: 音频文件路径列表
|
254 |
+
|
255 |
+
Returns:
|
256 |
+
(global_tokens_list, semantic_tokens_list)
|
257 |
+
"""
|
258 |
+
global_tokens_list = []
|
259 |
+
semantic_tokens_list = []
|
260 |
+
|
261 |
+
for audio_path in audio_paths:
|
262 |
+
global_tokens, semantic_tokens = self.tokenize(audio_path)
|
263 |
+
global_tokens_list.append(global_tokens)
|
264 |
+
semantic_tokens_list.append(semantic_tokens)
|
265 |
+
|
266 |
+
return global_tokens_list, semantic_tokens_list
|
267 |
+
|
268 |
+
|
269 |
+
# 测试函数
|
270 |
+
def test_ref_audio_utilities():
|
271 |
+
"""测试RefAudioUtilities类"""
|
272 |
+
# 初始化工具类
|
273 |
+
onnx_model_path = '/Volumes/bigdata/models/RWKVTTS_WebRWKV/BiCodecTokenize.onnx'
|
274 |
+
wav2vec2_path = "/Volumes/bigdata/models/RWKVTTS_WebRWKV/wav2vec2-large-xlsr-53.onnx"
|
275 |
+
# 使用与BiCodecTokenizer相同的���数
|
276 |
+
utilities = RefAudioUtilities(
|
277 |
+
onnx_model_path,
|
278 |
+
wav2vec2_path,
|
279 |
+
ref_segment_duration=6.0, # 6秒参考音频
|
280 |
+
latent_hop_length=320 # 潜在特征跳长度
|
281 |
+
)
|
282 |
+
|
283 |
+
# 测试音频文件(使用项目中的示例音频)
|
284 |
+
test_audio_path = "demos/刘德华/dehua_zh.wav"
|
285 |
+
|
286 |
+
if Path(test_audio_path).exists():
|
287 |
+
print(f"测试音频文件: {test_audio_path}")
|
288 |
+
|
289 |
+
try:
|
290 |
+
# 生成tokens
|
291 |
+
global_tokens, semantic_tokens = utilities.tokenize(test_audio_path)
|
292 |
+
|
293 |
+
print(f"Global tokens shape: {global_tokens.shape}")
|
294 |
+
print(f"Semantic tokens shape: {semantic_tokens.shape}")
|
295 |
+
print(f"Global tokens: {global_tokens.flatten().tolist()}")
|
296 |
+
print(f"Semantic tokens : {semantic_tokens.flatten().tolist()}")
|
297 |
+
|
298 |
+
except Exception as e:
|
299 |
+
print(f"处理音频时出错: {e}")
|
300 |
+
else:
|
301 |
+
print(f"测试音频文件不存在: {test_audio_path}")
|
302 |
+
print("请确保测试音频文件存在")
|
303 |
+
|
304 |
+
|
305 |
+
if __name__ == "__main__":
|
306 |
+
test_ref_audio_utilities()
|