|
import argparse |
|
import logging |
|
import os |
|
from pathlib import Path |
|
import subprocess as sp |
|
import sys |
|
import time |
|
import typing as tp |
|
from tempfile import NamedTemporaryFile, gettempdir |
|
from einops import rearrange |
|
import torch |
|
import gradio as gr |
|
from audiocraft.data.audio_utils import convert_audio |
|
from audiocraft.data.audio import audio_write |
|
from audiocraft.models.encodec import InterleaveStereoCompressionModel |
|
from audiocraft.models import MusicGen, MultiBandDiffusion |
|
import multiprocessing as mp |
|
import warnings |
|
|
|
os.putenv("HF_HUB_ENABLE_HF_TRANSFER","1") |
|
os.environ["SAFETENSORS_FAST_GPU"] = "1" |
|
|
|
torch.backends.cuda.matmul.allow_tf32 = False |
|
torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False |
|
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False |
|
torch.backends.cudnn.allow_tf32 = False |
|
torch.backends.cudnn.deterministic = False |
|
torch.backends.cudnn.benchmark = False |
|
|
|
|
|
torch.set_float32_matmul_precision("highest") |
|
|
|
class FileCleaner: |
|
def __init__(self, file_lifetime: float = 3600): |
|
self.file_lifetime = file_lifetime |
|
self.files = [] |
|
def add(self, path: tp.Union[str, Path]): |
|
self._cleanup() |
|
self.files.append((time.time(), Path(path))) |
|
def _cleanup(self): |
|
now = time.time() |
|
for time_added, path in list(self.files): |
|
if now - time_added > self.file_lifetime: |
|
if path.exists(): |
|
path.unlink() |
|
self.files.pop(0) |
|
else: |
|
break |
|
|
|
file_cleaner = FileCleaner() |
|
|
|
def convert_wav_to_mp4(wav_path, output_path=None): |
|
"""Converts a WAV file to a waveform MP4 video using ffmpeg.""" |
|
if output_path is None: |
|
|
|
output_path = Path(wav_path).with_suffix(".mp4") |
|
try: |
|
command = [ |
|
"ffmpeg", |
|
"-y", |
|
"-i", str(wav_path), |
|
"-filter_complex", |
|
"[0:a]showwaves=s=1280x202:mode=line,format=yuv420p[v]", |
|
"-map", "[v]", |
|
"-map", "0:a", |
|
"-c:v", "libx264", |
|
"-c:a", "aac", |
|
"-preset", "fast", |
|
str(output_path), |
|
] |
|
process = sp.run(command, capture_output=True, text=True, check=True) |
|
return str(output_path) |
|
except sp.CalledProcessError as e: |
|
print(f"Error in ffmpeg conversion: {e}") |
|
print(f"ffmpeg stdout: {e.stdout}") |
|
print(f"ffmpeg stderr: {e.stderr}") |
|
raise |
|
|
|
def model_worker(model_name: str, task_queue: mp.Queue, result_queue: mp.Queue): |
|
""" |
|
Persistent worker process (used when NOT running as a daemon). |
|
""" |
|
try: |
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
model = MusicGen.get_pretrained(model_name, device=device) |
|
mbd = MultiBandDiffusion.get_mbd_musicgen(device=device) |
|
while True: |
|
task = task_queue.get() |
|
if task is None: |
|
break |
|
task_id, text, melody, duration, use_diffusion, gen_params = task |
|
try: |
|
model.set_generation_params(duration=duration, **gen_params) |
|
target_sr = model.sample_rate |
|
target_ac = 1 |
|
processed_melody = None |
|
if melody: |
|
sr, melody_data = melody |
|
melody_tensor = torch.from_numpy(melody_data).to(device).float().t() |
|
if melody_tensor.ndim == 1: |
|
melody_tensor = melody_tensor.unsqueeze(0) |
|
melody_tensor = melody_tensor[..., :int(sr * duration)] |
|
processed_melody = convert_audio(melody_tensor, sr, target_sr, target_ac) |
|
if processed_melody is not None: |
|
output, tokens = model.generate_with_chroma( |
|
descriptions=[text], |
|
melody_wavs=[processed_melody], |
|
melody_sample_rate=target_sr, |
|
progress=True, |
|
return_tokens=True |
|
) |
|
else: |
|
output, tokens = model.generate([text], progress=True, return_tokens=True) |
|
output = output.detach().cpu() |
|
if use_diffusion: |
|
if isinstance(model.compression_model, InterleaveStereoCompressionModel): |
|
left, right = model.compression_model.get_left_right_codes(tokens) |
|
tokens = torch.cat([left, right]) |
|
outputs_diffusion = mbd.tokens_to_wav(tokens) |
|
if isinstance(model.compression_model, InterleaveStereoCompressionModel): |
|
assert outputs_diffusion.shape[1] == 1 |
|
outputs_diffusion = rearrange(outputs_diffusion, '(s b) c t -> b (s c) t', s=2) |
|
outputs_diffusion = outputs_diffusion.detach().cpu() |
|
result_queue.put((task_id, (output, outputs_diffusion))) |
|
else: |
|
result_queue.put((task_id, (output, None))) |
|
except Exception as e: |
|
result_queue.put((task_id, e)) |
|
except Exception as e: |
|
result_queue.put((-1, e)) |
|
|
|
class Predictor: |
|
def __init__(self, model_name: str, depth: str): |
|
self.model_name = model_name |
|
self.is_daemon = mp.current_process().daemon |
|
if self.is_daemon: |
|
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
self.model = MusicGen.get_pretrained(self.model_name, device=self.device, depth=depth) |
|
self.mbd = MultiBandDiffusion.get_mbd_musicgen(device=self.device) |
|
self.current_task_id = 0 |
|
else: |
|
|
|
self.task_queue = mp.Queue() |
|
self.result_queue = mp.Queue() |
|
self.process = mp.Process( |
|
target=model_worker, args=(self.model_name, self.task_queue, self.result_queue) |
|
) |
|
self.process.start() |
|
self.current_task_id = 0 |
|
self._check_initialization() |
|
|
|
def _check_initialization(self): |
|
"""Check if the worker process initialized successfully (only in non-daemon mode).""" |
|
if not self.is_daemon: |
|
time.sleep(2) |
|
try: |
|
task_id, result = self.result_queue.get(timeout=3) |
|
if isinstance(result, Exception): |
|
if task_id == -1: |
|
raise RuntimeError("Model loading failed in worker process.") from result |
|
except: |
|
pass |
|
|
|
def predict(self, text, melody, duration, use_diffusion, **gen_params): |
|
"""Submits a prediction task.""" |
|
if self.is_daemon: |
|
|
|
self.current_task_id +=1 |
|
task_id = self.current_task_id |
|
try: |
|
self.model.set_generation_params(duration=duration, **gen_params) |
|
target_sr = self.model.sample_rate |
|
target_ac = 1 |
|
processed_melody = None |
|
if melody: |
|
sr, melody_data = melody |
|
melody_tensor = torch.from_numpy(melody_data).to(self.device).float().t() |
|
if melody_tensor.ndim == 1: |
|
melody_tensor = melody_tensor.unsqueeze(0) |
|
melody_tensor = melody_tensor[..., :int(sr * duration)] |
|
processed_melody = convert_audio(melody_tensor, sr, target_sr, target_ac) |
|
if processed_melody is not None: |
|
output, tokens = self.model.generate_with_chroma( |
|
descriptions=[text], |
|
melody_wavs=[processed_melody], |
|
melody_sample_rate=target_sr, |
|
progress=True, |
|
return_tokens=True |
|
) |
|
else: |
|
output, tokens = self.model.generate([text], progress=True, return_tokens=True) |
|
output = output.detach().cpu() |
|
if use_diffusion: |
|
if isinstance(self.model.compression_model, InterleaveStereoCompressionModel): |
|
left, right = self.model.compression_model.get_left_right_codes(tokens) |
|
tokens = torch.cat([left, right]) |
|
outputs_diffusion = self.mbd.tokens_to_wav(tokens) |
|
if isinstance(self.model.compression_model, InterleaveStereoCompressionModel): |
|
assert outputs_diffusion.shape[1] == 1 |
|
outputs_diffusion = rearrange(outputs_diffusion, '(s b) c t -> b (s c) t', s=2) |
|
outputs_diffusion = outputs_diffusion.detach().cpu() |
|
return task_id, (output, outputs_diffusion) |
|
else: |
|
return task_id, (output, None) |
|
except Exception as e: |
|
return task_id, e |
|
else: |
|
|
|
self.current_task_id += 1 |
|
task = (self.current_task_id, text, melody, duration, use_diffusion, gen_params) |
|
self.task_queue.put(task) |
|
return self.current_task_id, (None, None) |
|
|
|
def get_result(self, task_id): |
|
"""Retrieves the result of a prediction task.""" |
|
if self.is_daemon: |
|
|
|
result_id, result = task_id, task_id |
|
else: |
|
|
|
while True: |
|
result_task_id, result = self.result_queue.get() |
|
if result_task_id == task_id: |
|
break |
|
if isinstance(result, Exception): |
|
raise result |
|
return result |
|
|
|
def shutdown(self): |
|
"""Shuts down the worker process (if running).""" |
|
if not self.is_daemon and self.process.is_alive(): |
|
self.task_queue.put(None) |
|
self.process.join() |
|
|
|
_default_model_name = "facebook/musicgen-melody" |
|
|
|
def predict_full(model, model_path, depth, use_mbd, text, melody, duration, topk, topp, temperature, cfg_coef): |
|
|
|
predictor = Predictor(model, depth) |
|
task_id, (wav, diffusion_wav) = predictor.predict( |
|
text=text, |
|
melody=melody, |
|
duration=duration, |
|
use_diffusion=use_mbd, |
|
top_k=topk, |
|
top_p=topp, |
|
temperature=temperature, |
|
cfg_coef=cfg_coef, |
|
) |
|
|
|
wav_paths = [] |
|
video_paths = [] |
|
|
|
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: |
|
audio_write( |
|
file.name, wav[0], 44100, strategy="loudness", |
|
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False |
|
) |
|
wav_paths.append(file.name) |
|
|
|
video_path = convert_wav_to_mp4(file.name) |
|
video_paths.append(video_path) |
|
file_cleaner.add(file.name) |
|
file_cleaner.add(video_path) |
|
|
|
if diffusion_wav is not None: |
|
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: |
|
audio_write( |
|
file.name, diffusion_wav[0], 44100, strategy="loudness", |
|
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False |
|
) |
|
wav_paths.append(file.name) |
|
|
|
video_path = convert_wav_to_mp4(file.name) |
|
video_paths.append(video_path) |
|
file_cleaner.add(file.name) |
|
file_cleaner.add(video_path) |
|
|
|
if not predictor.is_daemon: |
|
predictor.shutdown() |
|
if use_mbd: |
|
return video_paths[0], wav_paths[0], video_paths[1], wav_paths[1] |
|
return video_paths[0], wav_paths[0], None, None |
|
|
|
def toggle_audio_src(choice): |
|
if choice == "mic": |
|
return gr.update(sources="microphone", value=None, label="Microphone") |
|
else: |
|
return gr.update(sources="upload", value=None, label="File") |
|
|
|
def toggle_diffusion(choice): |
|
if choice == "MultiBand_Diffusion": |
|
return [gr.update(visible=True)] * 2 |
|
else: |
|
return [gr.update(visible=False)] * 2 |
|
|
|
def ui_full(launch_kwargs): |
|
with gr.Blocks() as interface: |
|
gr.Markdown( |
|
""" |
|
# MusicGen |
|
This is your private demo for [MusicGen](https://github.com/facebookresearch/audiocraft), |
|
a simple and controllable model for music generation |
|
presented at: ["Simple and Controllable Music Generation"](https://huggingface.co/papers/2306.05284) |
|
""" |
|
) |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
text = gr.Text(label="Input Text", interactive=True) |
|
with gr.Column(): |
|
radio = gr.Radio(["file", "mic"], value="file", |
|
label="Condition on a melody (optional) File or Mic") |
|
melody = gr.Audio(sources="upload", type="numpy", label="File", |
|
interactive=True, elem_id="melody-input") |
|
with gr.Row(): |
|
submit = gr.Button("Submit") |
|
|
|
with gr.Row(): |
|
model = gr.Radio(["facebook/musicgen-melody", "facebook/musicgen-medium", "facebook/musicgen-small", |
|
"facebook/musicgen-large", "facebook/musicgen-melody-large", |
|
"facebook/musicgen-stereo-small", "facebook/musicgen-stereo-medium", |
|
"facebook/musicgen-stereo-melody", "facebook/musicgen-stereo-large", |
|
"facebook/musicgen-stereo-melody-large"], |
|
label="Model", value="facebook/musicgen-melody", interactive=True) |
|
model_path = gr.Text(label="Model Path (custom models)", interactive=False, visible=False) |
|
depth = gr.Radio(["float32", "bfloat16", "float16"], |
|
label="Model Precision", value="float32", interactive=True) |
|
with gr.Row(): |
|
decoder = gr.Radio(["Default", "MultiBand_Diffusion"], |
|
label="Decoder", value="Default", interactive=True) |
|
with gr.Row(): |
|
duration = gr.Slider(minimum=1, maximum=120, value=10, label="Duration", interactive=True) |
|
with gr.Row(): |
|
topk = gr.Number(label="Top-k", value=250, interactive=True) |
|
topp = gr.Number(label="Top-p", value=0, interactive=True) |
|
temperature = gr.Number(label="Temperature", value=1.0, interactive=True) |
|
cfg_coef = gr.Number(label="Classifier Free Guidance", value=3.0, interactive=True) |
|
with gr.Column(): |
|
output = gr.Video(label="Generated Music") |
|
audio_output = gr.Audio(label="Generated Music (wav)", type='filepath') |
|
diffusion_output = gr.Video(label="MultiBand Diffusion Decoder", visible=False) |
|
audio_diffusion = gr.Audio(label="MultiBand Diffusion Decoder (wav)", type='filepath', visible=False) |
|
|
|
submit.click( |
|
toggle_diffusion, decoder, [diffusion_output, audio_diffusion], queue=False |
|
).then( |
|
predict_full, |
|
inputs=[model, model_path, depth, decoder, text, melody, duration, topk, topp, temperature, cfg_coef], |
|
outputs=[output, audio_output, diffusion_output, audio_diffusion] |
|
) |
|
radio.change(toggle_audio_src, radio, [melody], queue=False, show_progress=False) |
|
|
|
gr.Markdown( |
|
""" |
|
### More details |
|
|
|
The model will generate a short music extract based on the description you provided. |
|
The model can generate up to 30 seconds of audio in one pass. |
|
|
|
The model was trained with description from a stock music catalog, descriptions that will work best |
|
should include some level of details on the instruments present, along with some intended use case |
|
(e.g. adding "perfect for a commercial" can somehow help). |
|
|
|
Using one of the `melody` model (e.g. `musicgen-melody-*`), you can optionally provide a reference audio |
|
from which a broad melody will be extracted. |
|
The model will then try to follow both the description and melody provided. |
|
For best results, the melody should be 30 seconds long (I know, the samples we provide are not...) |
|
|
|
It is now possible to extend the generation by feeding back the end of the previous chunk of audio. |
|
This can take a long time, and the model might lose consistency. The model might also |
|
decide at arbitrary positions that the song ends. |
|
|
|
**WARNING:** Choosing long durations will take a long time to generate (2min might take ~10min). |
|
An overlap of 12 seconds is kept with the previously generated chunk, and 18 "new" seconds |
|
are generated each time. |
|
|
|
We present 10 model variations: |
|
1. facebook/musicgen-melody -- a music generation model capable of generating music condition |
|
on text and melody inputs. **Note**, you can also use text only. |
|
2. facebook/musicgen-small -- a 300M transformer decoder conditioned on text only. |
|
3. facebook/musicgen-medium -- a 1.5B transformer decoder conditioned on text only. |
|
4. facebook/musicgen-large -- a 3.3B transformer decoder conditioned on text only. |
|
5. facebook/musicgen-melody-large -- a 3.3B transformer decoder conditioned on text and melody. |
|
6. facebook/musicgen-stereo-small -- a 300M transformer decoder conditioned on text only, fine tuned for stereo output. |
|
7. facebook/musicgen-stereo-medium -- a 1.5B transformer decoder conditioned on text only, fine tuned for stereo output. |
|
8. facebook/musicgen-stereo-melody -- a 1.5B transformer decoder conditioned on text and melody, fine tuned for stereo output. |
|
9. facebook/musicgen-stereo-large -- a 3.3B transformer decoder conditioned on text only, fine tuned for stereo output. |
|
10. facebook/musicgen-stereo-melody-large -- a 3.3B transformer decoder conditioned on text and melody, fine tuned for stereo output. |
|
|
|
We also present two way of decoding the audio tokens: |
|
1. Use the default GAN based compression model. It can suffer from artifacts especially |
|
for crashes, snares etc. |
|
2. Use [MultiBand Diffusion](https://arxiv.org/abs/2308.02560). Should improve the audio quality, |
|
at an extra computational cost. When this is selected, we provide both the GAN based decoded |
|
audio, and the one obtained with MBD. |
|
|
|
See [github.com/facebookresearch/audiocraft](https://github.com/facebookresearch/audiocraft/blob/main/docs/MUSICGEN.md) |
|
for more details. |
|
""" |
|
) |
|
|
|
interface.queue().launch(**launch_kwargs) |
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument( |
|
'--listen', |
|
type=str, |
|
default='0.0.0.0' if 'SPACE_ID' in os.environ else '127.0.0.1', |
|
help='IP to listen on for connections to Gradio', |
|
) |
|
parser.add_argument( |
|
'--username', type=str, default='', help='Username for authentication' |
|
) |
|
parser.add_argument( |
|
'--password', type=str, default='', help='Password for authentication' |
|
) |
|
parser.add_argument( |
|
'--server_port', |
|
type=int, |
|
default=0, |
|
help='Port to run the server listener on', |
|
) |
|
parser.add_argument( |
|
'--inbrowser', action='store_true', help='Open in browser' |
|
) |
|
parser.add_argument( |
|
'--share', action='store_true', help='Share the gradio UI' |
|
) |
|
args = parser.parse_args() |
|
launch_kwargs = {} |
|
launch_kwargs['server_name'] = args.listen |
|
if args.username and args.password: |
|
launch_kwargs['auth'] = (args.username, args.password) |
|
if args.server_port: |
|
launch_kwargs['server_port'] = args.server_port |
|
if args.inbrowser: |
|
launch_kwargs['inbrowser'] = args.inbrowser |
|
launch_kwargs['share'] = True |
|
logging.basicConfig(level=logging.INFO, stream=sys.stderr) |
|
|
|
try: |
|
ui_full(launch_kwargs) |
|
finally: |
|
if _predictor is not None: |
|
_predictor.shutdown() |