Spaces:
Sleeping
Sleeping
| import torch, os, traceback, sys, warnings, shutil, numpy as np | |
| import gradio as gr | |
| import librosa | |
| import asyncio | |
| import rarfile | |
| import edge_tts | |
| import yt_dlp | |
| import ffmpeg | |
| import gdown | |
| import subprocess | |
| import wave | |
| import soundfile as sf | |
| from scipy.io import wavfile | |
| from datetime import datetime | |
| from urllib.parse import urlparse | |
| from mega import Mega | |
| from flask import Flask, request, jsonify, send_file | |
| import base64 | |
| import tempfile | |
| import os | |
| import werkzeug | |
| from pydub import AudioSegment | |
| app = Flask(__name__) | |
| now_dir = os.getcwd() | |
| tmp = os.path.join(now_dir, "TEMP") | |
| shutil.rmtree(tmp, ignore_errors=True) | |
| os.makedirs(tmp, exist_ok=True) | |
| os.environ["TEMP"] = tmp | |
| split_model="htdemucs" | |
| from lib.infer_pack.models import ( | |
| SynthesizerTrnMs256NSFsid, | |
| SynthesizerTrnMs256NSFsid_nono, | |
| SynthesizerTrnMs768NSFsid, | |
| SynthesizerTrnMs768NSFsid_nono, | |
| ) | |
| from fairseq import checkpoint_utils | |
| from vc_infer_pipeline import VC | |
| from config import Config | |
| config = Config() | |
| tts_voice_list = asyncio.get_event_loop().run_until_complete(edge_tts.list_voices()) | |
| voices = [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] | |
| hubert_model = None | |
| f0method_mode = ["pm", "harvest", "crepe"] | |
| f0method_info = "PM is fast, Harvest is good but extremely slow, and Crepe effect is good but requires GPU (Default: PM)" | |
| if os.path.isfile("rmvpe.pt"): | |
| f0method_mode.insert(2, "rmvpe") | |
| f0method_info = "PM is fast, Harvest is good but extremely slow, Rvmpe is alternative to harvest (might be better), and Crepe effect is good but requires GPU (Default: PM)" | |
| def load_hubert(): | |
| global hubert_model | |
| models, _, _ = checkpoint_utils.load_model_ensemble_and_task( | |
| ["hubert_base.pt"], | |
| suffix="", | |
| ) | |
| hubert_model = models[0] | |
| hubert_model = hubert_model.to(config.device) | |
| if config.is_half: | |
| hubert_model = hubert_model.half() | |
| else: | |
| hubert_model = hubert_model.float() | |
| hubert_model.eval() | |
| load_hubert() | |
| weight_root = "weights" | |
| index_root = "weights/index" | |
| weights_model = [] | |
| weights_index = [] | |
| for _, _, model_files in os.walk(weight_root): | |
| for file in model_files: | |
| if file.endswith(".pth"): | |
| weights_model.append(file) | |
| for _, _, index_files in os.walk(index_root): | |
| for file in index_files: | |
| if file.endswith('.index') and "trained" not in file: | |
| weights_index.append(os.path.join(index_root, file)) | |
| def check_models(): | |
| weights_model = [] | |
| weights_index = [] | |
| for _, _, model_files in os.walk(weight_root): | |
| for file in model_files: | |
| if file.endswith(".pth"): | |
| weights_model.append(file) | |
| for _, _, index_files in os.walk(index_root): | |
| for file in index_files: | |
| if file.endswith('.index') and "trained" not in file: | |
| weights_index.append(os.path.join(index_root, file)) | |
| return ( | |
| gr.Dropdown.update(choices=sorted(weights_model), value=weights_model[0]), | |
| gr.Dropdown.update(choices=sorted(weights_index)) | |
| ) | |
| def clean(): | |
| return ( | |
| gr.Dropdown.update(value=""), | |
| gr.Slider.update(visible=False) | |
| ) | |
| def api_convert_voice(): | |
| spk_id = request.form['spk_id'] | |
| voice_transform = request.form['voice_transform'] | |
| # The file part | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file part"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No selected file"}), 400 | |
| # Save the file to a temporary path | |
| filename = werkzeug.utils.secure_filename(file.filename) | |
| input_audio_path = os.path.join(tmp, f"{spk_id}_input_audio.{filename.split('.')[-1]}") | |
| file.save(input_audio_path) | |
| #split audio | |
| cut_vocal_and_inst(input_audio_path,spk_id) | |
| print("audio splitting performed") | |
| vocal_path = f"output/{split_model}/{spk_id}_input_audio/vocals.wav" | |
| inst = f"output/{split_model}/{spk_id}_input_audio/no_vocals.wav" | |
| output_path = convert_voice(spk_id, vocal_path, voice_transform) | |
| output_path1= combine_vocal_and_inst(output_path,inst) | |
| print(output_path1) | |
| if os.path.exists(output_path1): | |
| return send_file(output_path1, as_attachment=True) | |
| else: | |
| return jsonify({"error": "File not found."}), 404 | |
| def convert_voice(spk_id, input_audio_path, voice_transform): | |
| get_vc(spk_id,0.5) | |
| output_audio_path = vc_single( | |
| sid=0, | |
| input_audio_path=input_audio_path, | |
| f0_up_key=voice_transform, # Assuming voice_transform corresponds to f0_up_key | |
| f0_file=None , | |
| f0_method="rmvpe", | |
| file_index=spk_id, # Assuming file_index_path corresponds to file_index | |
| index_rate=0.75, | |
| filter_radius=3, | |
| resample_sr=0, | |
| rms_mix_rate=0.25, | |
| protect=0.33 # Adjusted from protect_rate to protect to match the function signature | |
| ) | |
| print(output_audio_path) | |
| return output_audio_path | |
| def vc_single( | |
| sid, | |
| input_audio_path, | |
| f0_up_key, | |
| f0_file, | |
| f0_method, | |
| file_index, | |
| index_rate, | |
| filter_radius, | |
| resample_sr, | |
| rms_mix_rate, | |
| protect | |
| ): # spk_item, input_audio0, vc_transform0,f0_file,f0method0 | |
| global tgt_sr, net_g, vc, hubert_model, version, cpt | |
| try: | |
| logs = [] | |
| print(f"Converting...") | |
| audio, sr = librosa.load(input_audio_path, sr=16000, mono=True) | |
| print(f"found audio ") | |
| f0_up_key = int(f0_up_key) | |
| times = [0, 0, 0] | |
| if hubert_model == None: | |
| load_hubert() | |
| print("loaded hubert") | |
| if_f0 = 1 | |
| audio_opt = vc.pipeline( | |
| hubert_model, | |
| net_g, | |
| 0, | |
| audio, | |
| input_audio_path, | |
| times, | |
| f0_up_key, | |
| f0_method, | |
| file_index, | |
| # file_big_npy, | |
| index_rate, | |
| if_f0, | |
| filter_radius, | |
| tgt_sr, | |
| resample_sr, | |
| rms_mix_rate, | |
| version, | |
| protect, | |
| f0_file=f0_file | |
| ) | |
| if resample_sr >= 16000 and tgt_sr != resample_sr: | |
| tgt_sr = resample_sr | |
| index_info = ( | |
| "Using index:%s." % file_index | |
| if os.path.exists(file_index) | |
| else "Index not used." | |
| ) | |
| print("writing to FS") | |
| output_file_path = os.path.join("output", f"converted_audio_{sid}.wav") # Adjust path as needed | |
| os.makedirs(os.path.dirname(output_file_path), exist_ok=True) # Create the output directory if it doesn't exist | |
| print("create dir") | |
| # Save the audio file using the target sampling rate | |
| sf.write(output_file_path, audio_opt, tgt_sr) | |
| print("wrote to FS") | |
| # Return the path to the saved file along with any other information | |
| return output_file_path | |
| except: | |
| info = traceback.format_exc() | |
| return info, (None, None) | |
| def get_vc(sid, to_return_protect0): | |
| global n_spk, tgt_sr, net_g, vc, cpt, version, weights_index | |
| if sid == "" or sid == []: | |
| global hubert_model | |
| if hubert_model is not None: # 考虑到轮询, 需要加个判断看是否 sid 是由有模型切换到无模型的 | |
| print("clean_empty_cache") | |
| del net_g, n_spk, vc, hubert_model, tgt_sr # ,cpt | |
| hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| ###楼下不这么折腾清理不干净 | |
| if_f0 = cpt.get("f0", 1) | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs256NSFsid( | |
| *cpt["config"], is_half=config.is_half | |
| ) | |
| else: | |
| net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs768NSFsid( | |
| *cpt["config"], is_half=config.is_half | |
| ) | |
| else: | |
| net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g, cpt | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| cpt = None | |
| return ( | |
| gr.Slider.update(maximum=2333, visible=False), | |
| gr.Slider.update(visible=True), | |
| gr.Dropdown.update(choices=sorted(weights_index), value=""), | |
| gr.Markdown.update(value="# <center> No model selected") | |
| ) | |
| print(f"Loading {sid} model...") | |
| selected_model = sid[:-4] | |
| cpt = torch.load(os.path.join(weight_root, sid), map_location="cpu") | |
| tgt_sr = cpt["config"][-1] | |
| cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] | |
| if_f0 = cpt.get("f0", 1) | |
| if if_f0 == 0: | |
| to_return_protect0 = { | |
| "visible": False, | |
| "value": 0.5, | |
| "__type__": "update", | |
| } | |
| else: | |
| to_return_protect0 = { | |
| "visible": True, | |
| "value": to_return_protect0, | |
| "__type__": "update", | |
| } | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) | |
| else: | |
| net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) | |
| else: | |
| net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g.enc_q | |
| print(net_g.load_state_dict(cpt["weight"], strict=False)) | |
| net_g.eval().to(config.device) | |
| if config.is_half: | |
| net_g = net_g.half() | |
| else: | |
| net_g = net_g.float() | |
| vc = VC(tgt_sr, config) | |
| n_spk = cpt["config"][-3] | |
| weights_index = [] | |
| for _, _, index_files in os.walk(index_root): | |
| for file in index_files: | |
| if file.endswith('.index') and "trained" not in file: | |
| weights_index.append(os.path.join(index_root, file)) | |
| if weights_index == []: | |
| selected_index = gr.Dropdown.update(value="") | |
| else: | |
| selected_index = gr.Dropdown.update(value=weights_index[0]) | |
| for index, model_index in enumerate(weights_index): | |
| if selected_model in model_index: | |
| selected_index = gr.Dropdown.update(value=weights_index[index]) | |
| break | |
| return ( | |
| gr.Slider.update(maximum=n_spk, visible=True), | |
| to_return_protect0, | |
| selected_index, | |
| gr.Markdown.update( | |
| f'## <center> {selected_model}\n'+ | |
| f'### <center> RVC {version} Model' | |
| ) | |
| ) | |
| def find_audio_files(folder_path, extensions): | |
| audio_files = [] | |
| for root, dirs, files in os.walk(folder_path): | |
| for file in files: | |
| if any(file.endswith(ext) for ext in extensions): | |
| audio_files.append(file) | |
| return audio_files | |
| def vc_multi( | |
| spk_item, | |
| vc_input, | |
| vc_output, | |
| vc_transform0, | |
| f0method0, | |
| file_index, | |
| index_rate, | |
| filter_radius, | |
| resample_sr, | |
| rms_mix_rate, | |
| protect, | |
| ): | |
| global tgt_sr, net_g, vc, hubert_model, version, cpt | |
| logs = [] | |
| logs.append("Converting...") | |
| yield "\n".join(logs) | |
| print() | |
| try: | |
| if os.path.exists(vc_input): | |
| folder_path = vc_input | |
| extensions = [".mp3", ".wav", ".flac", ".ogg"] | |
| audio_files = find_audio_files(folder_path, extensions) | |
| for index, file in enumerate(audio_files, start=1): | |
| audio, sr = librosa.load(os.path.join(folder_path, file), sr=16000, mono=True) | |
| input_audio_path = folder_path, file | |
| f0_up_key = int(vc_transform0) | |
| times = [0, 0, 0] | |
| if hubert_model == None: | |
| load_hubert() | |
| if_f0 = cpt.get("f0", 1) | |
| audio_opt = vc.pipeline( | |
| hubert_model, | |
| net_g, | |
| spk_item, | |
| audio, | |
| input_audio_path, | |
| times, | |
| f0_up_key, | |
| f0method0, | |
| file_index, | |
| index_rate, | |
| if_f0, | |
| filter_radius, | |
| tgt_sr, | |
| resample_sr, | |
| rms_mix_rate, | |
| version, | |
| protect, | |
| f0_file=None | |
| ) | |
| if resample_sr >= 16000 and tgt_sr != resample_sr: | |
| tgt_sr = resample_sr | |
| output_path = f"{os.path.join(vc_output, file)}" | |
| os.makedirs(os.path.join(vc_output), exist_ok=True) | |
| sf.write( | |
| output_path, | |
| audio_opt, | |
| tgt_sr, | |
| ) | |
| info = f"{index} / {len(audio_files)} | {file}" | |
| print(info) | |
| logs.append(info) | |
| yield "\n".join(logs) | |
| else: | |
| logs.append("Folder not found or path doesn't exist.") | |
| yield "\n".join(logs) | |
| except: | |
| info = traceback.format_exc() | |
| print(info) | |
| logs.append(info) | |
| yield "\n".join(logs) | |
| def download_audio(url, audio_provider): | |
| logs = [] | |
| os.makedirs("dl_audio", exist_ok=True) | |
| if url == "": | |
| logs.append("URL required!") | |
| yield None, "\n".join(logs) | |
| return None, "\n".join(logs) | |
| if audio_provider == "Youtube": | |
| logs.append("Downloading the audio...") | |
| yield None, "\n".join(logs) | |
| ydl_opts = { | |
| 'noplaylist': True, | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'wav', | |
| }], | |
| "outtmpl": 'result/dl_audio/audio', | |
| } | |
| audio_path = "result/dl_audio/audio.wav" | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| logs.append("Download Complete.") | |
| yield audio_path, "\n".join(logs) | |
| def cut_vocal_and_inst_yt(split_model,spk_id): | |
| logs = [] | |
| logs.append("Starting the audio splitting process...") | |
| yield "\n".join(logs), None, None, None | |
| command = f"demucs --two-stems=vocals -n {split_model} result/dl_audio/audio.wav -o output" | |
| result = subprocess.Popen(command.split(), stdout=subprocess.PIPE, text=True) | |
| for line in result.stdout: | |
| logs.append(line) | |
| yield "\n".join(logs), None, None, None | |
| print(result.stdout) | |
| vocal = f"output/{split_model}/{spk_id}_input_audio/vocals.wav" | |
| inst = f"output/{split_model}/{spk_id}_input_audio/no_vocals.wav" | |
| logs.append("Audio splitting complete.") | |
| yield "\n".join(logs), vocal, inst, vocal | |
| def cut_vocal_and_inst(audio_path,spk_id): | |
| vocal_path = "output/result/audio.wav" | |
| os.makedirs("output/result", exist_ok=True) | |
| #wavfile.write(vocal_path, audio_data[0], audio_data[1]) | |
| #logs.append("Starting the audio splitting process...") | |
| #yield "\n".join(logs), None, None | |
| print("before executing splitter") | |
| command = f"demucs --two-stems=vocals -n {split_model} {audio_path} -o output" | |
| #result = subprocess.Popen(command.split(), stdout=subprocess.PIPE, text=True) | |
| result = subprocess.run(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if result.returncode != 0: | |
| print("Demucs process failed:", result.stderr) | |
| else: | |
| print("Demucs process completed successfully.") | |
| print("after executing splitter") | |
| #for line in result.stdout: | |
| # logs.append(line) | |
| # yield "\n".join(logs), None, None | |
| print(result.stdout) | |
| vocal = f"output/{split_model}/{spk_id}_input_audio/vocals.wav" | |
| inst = f"output/{split_model}/{spk_id}_input_audio/no_vocals.wav" | |
| #logs.append("Audio splitting complete.") | |
| def combine_vocal_and_inst(vocal_path, inst_path): | |
| vocal_volume=1 | |
| inst_volume=1 | |
| os.makedirs("output/result", exist_ok=True) | |
| # Assuming vocal_path and inst_path are now directly passed as arguments | |
| output_path = "output/result/combine.mp3" | |
| #command = f'ffmpeg -y -i "{inst_path}" -i "{vocal_path}" -filter_complex [0:a]volume={inst_volume}[i];[1:a]volume={vocal_volume}[v];[i][v]amix=inputs=2:duration=longest[a] -map [a] -b:a 320k -c:a libmp3lame "{output_path}"' | |
| #command=f'ffmpeg -y -i "{inst_path}" -i "{vocal_path}" -filter_complex "amix=inputs=2:duration=longest" -b:a 320k -c:a libmp3lame "{output_path}"' | |
| # Load the audio files | |
| vocal = AudioSegment.from_file(vocal_path) | |
| instrumental = AudioSegment.from_file(inst_path) | |
| # Overlay the vocal track on top of the instrumental track | |
| combined = vocal.overlay(instrumental) | |
| # Export the result | |
| combined.export(output_path, format="mp3") | |
| #result = subprocess.run(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return output_path | |
| #def combine_vocal_and_inst(audio_data, vocal_volume, inst_volume): | |
| # os.makedirs("output/result", exist_ok=True) | |
| ## output_path = "output/result/combine.mp3" | |
| # inst_path = f"output/{split_model}/audio/no_vocals.wav" | |
| #wavfile.write(vocal_path, audio_data[0], audio_data[1]) | |
| #command = f'ffmpeg -y -i {inst_path} -i {vocal_path} -filter_complex [0:a]volume={inst_volume}[i];[1:a]volume={vocal_volume}[v];[i][v]amix=inputs=2:duration=longest[a] -map [a] -b:a 320k -c:a libmp3lame {output_path}' | |
| #result = subprocess.run(command.split(), stdout=subprocess.PIPE) | |
| #print(result.stdout.decode()) | |
| #return output_path | |
| def download_and_extract_models(urls): | |
| logs = [] | |
| os.makedirs("zips", exist_ok=True) | |
| os.makedirs(os.path.join("zips", "extract"), exist_ok=True) | |
| os.makedirs(os.path.join(weight_root), exist_ok=True) | |
| os.makedirs(os.path.join(index_root), exist_ok=True) | |
| for link in urls.splitlines(): | |
| url = link.strip() | |
| if not url: | |
| raise gr.Error("URL Required!") | |
| return "No URLs provided." | |
| model_zip = urlparse(url).path.split('/')[-2] + '.zip' | |
| model_zip_path = os.path.join('zips', model_zip) | |
| logs.append(f"Downloading...") | |
| yield "\n".join(logs) | |
| if "drive.google.com" in url: | |
| gdown.download(url, os.path.join("zips", "extract"), quiet=False) | |
| elif "mega.nz" in url: | |
| m = Mega() | |
| m.download_url(url, 'zips') | |
| else: | |
| os.system(f"wget {url} -O {model_zip_path}") | |
| logs.append(f"Extracting...") | |
| yield "\n".join(logs) | |
| for filename in os.listdir("zips"): | |
| archived_file = os.path.join("zips", filename) | |
| if filename.endswith(".zip"): | |
| shutil.unpack_archive(archived_file, os.path.join("zips", "extract"), 'zip') | |
| elif filename.endswith(".rar"): | |
| with rarfile.RarFile(archived_file, 'r') as rar: | |
| rar.extractall(os.path.join("zips", "extract")) | |
| for _, dirs, files in os.walk(os.path.join("zips", "extract")): | |
| logs.append(f"Searching Model and Index...") | |
| yield "\n".join(logs) | |
| model = False | |
| index = False | |
| if files: | |
| for file in files: | |
| if file.endswith(".pth"): | |
| basename = file[:-4] | |
| shutil.move(os.path.join("zips", "extract", file), os.path.join(weight_root, file)) | |
| model = True | |
| if file.endswith('.index') and "trained" not in file: | |
| shutil.move(os.path.join("zips", "extract", file), os.path.join(index_root, file)) | |
| index = True | |
| else: | |
| logs.append("No model in main folder.") | |
| yield "\n".join(logs) | |
| logs.append("Searching in subfolders...") | |
| yield "\n".join(logs) | |
| for sub_dir in dirs: | |
| for _, _, sub_files in os.walk(os.path.join("zips", "extract", sub_dir)): | |
| for file in sub_files: | |
| if file.endswith(".pth"): | |
| basename = file[:-4] | |
| shutil.move(os.path.join("zips", "extract", sub_dir, file), os.path.join(weight_root, file)) | |
| model = True | |
| if file.endswith('.index') and "trained" not in file: | |
| shutil.move(os.path.join("zips", "extract", sub_dir, file), os.path.join(index_root, file)) | |
| index = True | |
| shutil.rmtree(os.path.join("zips", "extract", sub_dir)) | |
| if index is False: | |
| logs.append("Model only file, no Index file detected.") | |
| yield "\n".join(logs) | |
| logs.append("Download Completed!") | |
| yield "\n".join(logs) | |
| logs.append("Successfully download all models! Refresh your model list to load the model") | |
| yield "\n".join(logs) | |
| if __name__ == '__main__': | |
| app.run(debug=False, port=5000,host='0.0.0.0') |