Spaces:
Running
on
L4
Running
on
L4
| import gc | |
| import os | |
| import re | |
| from audio_separator.separator import Separator | |
| os.environ["MODELSCOPE_CACHE"] = "./.cache/funasr" | |
| os.environ["UVR5_CACHE"] = "./.cache/uvr5-models" | |
| import json | |
| import subprocess | |
| from pathlib import Path | |
| import click | |
| import torch | |
| from loguru import logger | |
| from pydub import AudioSegment | |
| from silero_vad import get_speech_timestamps, load_silero_vad, read_audio | |
| from tqdm import tqdm | |
| from tools.file import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS, list_files | |
| from tools.sensevoice.auto_model import AutoModel | |
| def uvr5_cli( | |
| audio_dir: Path, | |
| output_folder: Path, | |
| audio_files: list[Path] | None = None, | |
| output_format: str = "flac", | |
| model: str = "BS-Roformer-Viperx-1297.ckpt", | |
| ): | |
| # ["BS-Roformer-Viperx-1297.ckpt", "BS-Roformer-Viperx-1296.ckpt", "BS-Roformer-Viperx-1053.ckpt", "Mel-Roformer-Viperx-1143.ckpt"] | |
| sepr = Separator( | |
| model_file_dir=os.environ["UVR5_CACHE"], | |
| output_dir=output_folder, | |
| output_format=output_format, | |
| ) | |
| dictmodel = { | |
| "BS-Roformer-Viperx-1297.ckpt": "model_bs_roformer_ep_317_sdr_12.9755.ckpt", | |
| "BS-Roformer-Viperx-1296.ckpt": "model_bs_roformer_ep_368_sdr_12.9628.ckpt", | |
| "BS-Roformer-Viperx-1053.ckpt": "model_bs_roformer_ep_937_sdr_10.5309.ckpt", | |
| "Mel-Roformer-Viperx-1143.ckpt": "model_mel_band_roformer_ep_3005_sdr_11.4360.ckpt", | |
| } | |
| roformer_model = dictmodel[model] | |
| sepr.load_model(roformer_model) | |
| if audio_files is None: | |
| audio_files = list_files( | |
| path=audio_dir, extensions=AUDIO_EXTENSIONS, recursive=True | |
| ) | |
| total_files = len(audio_files) | |
| print(f"{total_files} audio files found") | |
| res = [] | |
| for audio in tqdm(audio_files, desc="Denoising: "): | |
| file_path = str(audio_dir / audio) | |
| sep_out = sepr.separate(file_path) | |
| if isinstance(sep_out, str): | |
| res.append(sep_out) | |
| elif isinstance(sep_out, list): | |
| res.extend(sep_out) | |
| del sepr | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return res, roformer_model | |
| def get_sample_rate(media_path: Path): | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", | |
| "quiet", | |
| "-print_format", | |
| "json", | |
| "-show_streams", | |
| str(media_path), | |
| ], | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| ) | |
| media_info = json.loads(result.stdout) | |
| for stream in media_info.get("streams", []): | |
| if stream.get("codec_type") == "audio": | |
| return stream.get("sample_rate") | |
| return "44100" # Default sample rate if not found | |
| def convert_to_mono(src_path: Path, out_path: Path, out_fmt: str = "wav"): | |
| sr = get_sample_rate(src_path) | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| if src_path.resolve() == out_path.resolve(): | |
| output = str(out_path.with_stem(out_path.stem + f"_{sr}")) | |
| else: | |
| output = str(out_path) | |
| subprocess.run( | |
| [ | |
| "ffmpeg", | |
| "-loglevel", | |
| "error", | |
| "-i", | |
| str(src_path), | |
| "-acodec", | |
| "pcm_s16le" if out_fmt == "wav" else "flac", | |
| "-ar", | |
| sr, | |
| "-ac", | |
| "1", | |
| "-y", | |
| output, | |
| ], | |
| check=True, | |
| ) | |
| return out_path | |
| def convert_video_to_audio(video_path: Path, audio_dir: Path): | |
| cur_dir = audio_dir / video_path.relative_to(audio_dir).parent | |
| vocals = [ | |
| p | |
| for p in cur_dir.glob(f"{video_path.stem}_(Vocals)*.*") | |
| if p.suffix in AUDIO_EXTENSIONS | |
| ] | |
| if len(vocals) > 0: | |
| return vocals[0] | |
| audio_path = cur_dir / f"{video_path.stem}.wav" | |
| convert_to_mono(video_path, audio_path) | |
| return audio_path | |
| def main( | |
| audio_dir: str, | |
| save_dir: str, | |
| device: str, | |
| language: str, | |
| max_single_segment_time: int, | |
| fsmn_vad: bool, | |
| punc: bool, | |
| denoise: bool, | |
| save_emo: bool, | |
| ): | |
| audios_path = Path(audio_dir) | |
| save_path = Path(save_dir) | |
| save_path.mkdir(parents=True, exist_ok=True) | |
| video_files = list_files( | |
| path=audio_dir, extensions=VIDEO_EXTENSIONS, recursive=True | |
| ) | |
| v2a_files = [convert_video_to_audio(p, audio_dir) for p in video_files] | |
| if denoise: | |
| VOCAL = "_(Vocals)" | |
| original_files = [ | |
| p | |
| for p in audios_path.glob("**/*") | |
| if p.suffix in AUDIO_EXTENSIONS and VOCAL not in p.stem | |
| ] | |
| _, cur_model = uvr5_cli( | |
| audio_dir=audio_dir, output_folder=audio_dir, audio_files=original_files | |
| ) | |
| need_remove = [p for p in audios_path.glob("**/*(Instrumental)*")] | |
| need_remove.extend(original_files) | |
| for _ in need_remove: | |
| _.unlink() | |
| vocal_files = [ | |
| p | |
| for p in audios_path.glob("**/*") | |
| if p.suffix in AUDIO_EXTENSIONS and VOCAL in p.stem | |
| ] | |
| for f in vocal_files: | |
| fn, ext = f.stem, f.suffix | |
| v_pos = fn.find(VOCAL + "_" + cur_model.split(".")[0]) | |
| if v_pos != -1: | |
| new_fn = fn[: v_pos + len(VOCAL)] | |
| new_f = f.with_name(new_fn + ext) | |
| f = f.rename(new_f) | |
| convert_to_mono(f, f, "flac") | |
| f.unlink() | |
| audio_files = list_files( | |
| path=audio_dir, extensions=AUDIO_EXTENSIONS, recursive=True | |
| ) | |
| logger.info("Loading / Downloading Funasr model...") | |
| model_dir = "iic/SenseVoiceSmall" | |
| vad_model = "fsmn-vad" if fsmn_vad else None | |
| vad_kwargs = {"max_single_segment_time": max_single_segment_time} | |
| punc_model = "ct-punc" if punc else None | |
| manager = AutoModel( | |
| model=model_dir, | |
| trust_remote_code=False, | |
| vad_model=vad_model, | |
| vad_kwargs=vad_kwargs, | |
| punc_model=punc_model, | |
| device=device, | |
| ) | |
| if not fsmn_vad and vad_model is None: | |
| vad_model = load_silero_vad() | |
| logger.info("Model loaded.") | |
| pattern = re.compile(r"_\d{3}\.") | |
| for file_path in tqdm(audio_files, desc="Processing audio file"): | |
| if pattern.search(file_path.name): | |
| # logger.info(f"Skipping {file_path} as it has already been processed.") | |
| continue | |
| file_stem = file_path.stem | |
| file_suffix = file_path.suffix | |
| rel_path = Path(file_path).relative_to(audio_dir) | |
| (save_path / rel_path.parent).mkdir(parents=True, exist_ok=True) | |
| audio = AudioSegment.from_file(file_path) | |
| cfg = dict( | |
| cache={}, | |
| language=language, # "zh", "en", "yue", "ja", "ko", "nospeech" | |
| use_itn=False, | |
| batch_size_s=60, | |
| ) | |
| if fsmn_vad: | |
| elapsed, vad_res = manager.vad(input=str(file_path), **cfg) | |
| else: | |
| wav = read_audio( | |
| str(file_path) | |
| ) # backend (sox, soundfile, or ffmpeg) required! | |
| audio_key = file_path.stem | |
| audio_val = [] | |
| speech_timestamps = get_speech_timestamps( | |
| wav, | |
| vad_model, | |
| max_speech_duration_s=max_single_segment_time // 1000, | |
| return_seconds=True, | |
| ) | |
| audio_val = [ | |
| [int(timestamp["start"] * 1000), int(timestamp["end"] * 1000)] | |
| for timestamp in speech_timestamps | |
| ] | |
| vad_res = [] | |
| vad_res.append(dict(key=audio_key, value=audio_val)) | |
| res = manager.inference_with_vadres( | |
| input=str(file_path), vad_res=vad_res, **cfg | |
| ) | |
| for i, info in enumerate(res): | |
| [start_ms, end_ms] = info["interval"] | |
| text = info["text"] | |
| emo = info["emo"] | |
| sliced_audio = audio[start_ms:end_ms] | |
| audio_save_path = ( | |
| save_path / rel_path.parent / f"{file_stem}_{i:03d}{file_suffix}" | |
| ) | |
| sliced_audio.export(audio_save_path, format=file_suffix[1:]) | |
| print(f"Exported {audio_save_path}: {text}") | |
| transcript_save_path = ( | |
| save_path / rel_path.parent / f"{file_stem}_{i:03d}.lab" | |
| ) | |
| with open( | |
| transcript_save_path, | |
| "w", | |
| encoding="utf-8", | |
| ) as f: | |
| f.write(text) | |
| if save_emo: | |
| emo_save_path = save_path / rel_path.parent / f"{file_stem}_{i:03d}.emo" | |
| with open( | |
| emo_save_path, | |
| "w", | |
| encoding="utf-8", | |
| ) as f: | |
| f.write(emo) | |
| if audios_path.resolve() == save_path.resolve(): | |
| file_path.unlink() | |
| if __name__ == "__main__": | |
| main() | |
| exit(0) | |
| from funasr.utils.postprocess_utils import rich_transcription_postprocess | |
| # Load the audio file | |
| audio_path = Path(r"D:\PythonProject\ok\1_output_(Vocals).wav") | |
| model_dir = "iic/SenseVoiceSmall" | |
| m, kwargs = SenseVoiceSmall.from_pretrained(model=model_dir, device="cuda:0") | |
| m.eval() | |
| res = m.inference( | |
| data_in=f"{kwargs['model_path']}/example/zh.mp3", | |
| language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech" | |
| use_itn=False, | |
| ban_emo_unk=False, | |
| **kwargs, | |
| ) | |
| print(res) | |
| text = rich_transcription_postprocess(res[0][0]["text"]) | |
| print(text) | |