Spaces:
Running
Running
# Dataloader based on https://github.com/jeonchangbin49/LimitAug | |
import os | |
from glob import glob | |
import random | |
from typing import Optional, Callable | |
import numpy as np | |
import torch | |
import librosa | |
from torch.utils.data import Dataset | |
import pyloudnorm as pyln | |
from pedalboard import Pedalboard, Limiter, Gain, Compressor, Clipping | |
from utils import load_wav_arbitrary_position_stereo, db2linear | |
# based on https://github.com/sigsep/open-unmix-pytorch | |
def aug_from_str(list_of_function_names: list): | |
if list_of_function_names: | |
return Compose([globals()["_augment_" + aug] for aug in list_of_function_names]) | |
else: | |
return lambda audio: audio | |
class Compose(object): | |
"""Composes several augmentation transforms. | |
Args: | |
augmentations: list of augmentations to compose. | |
""" | |
def __init__(self, transforms): | |
self.transforms = transforms | |
def __call__(self, audio: torch.Tensor) -> torch.Tensor: | |
for t in self.transforms: | |
audio = t(audio) | |
return audio | |
# numpy based augmentation | |
# based on https://github.com/sigsep/open-unmix-pytorch | |
def _augment_gain(audio, low=0.25, high=1.25): | |
"""Applies a random gain between `low` and `high`""" | |
g = low + random.random() * (high - low) | |
return audio * g | |
def _augment_channelswap(audio): | |
"""Swap channels of stereo signals with a probability of p=0.5""" | |
if audio.shape[0] == 2 and random.random() < 0.5: | |
return np.flip(audio, axis=0) # axis=0 must be given | |
else: | |
return audio | |
# Linear gain increasing implementation for Method (1) | |
def apply_linear_gain_increase(mixture, target, board, meter, samplerate, target_lufs): | |
mixture, target = mixture.T, target.T | |
loudness = meter.integrated_loudness(mixture) | |
if np.isinf(loudness): | |
augmented_gain = 0.0 | |
board[0].gain_db = augmented_gain | |
else: | |
augmented_gain = target_lufs - loudness | |
board[0].gain_db = augmented_gain | |
mixture = board(mixture.T, samplerate) | |
target = board(target.T, samplerate) | |
return mixture, target | |
# LimitAug implementation for Method (2) and | |
# implementation of LimitAug then Loudness normalization for Method (4) | |
def apply_limitaug( | |
audio, | |
board, | |
meter, | |
samplerate, | |
target_lufs, | |
target_loudnorm_lufs=None, | |
loudness=None, | |
): | |
audio = audio.T | |
if loudness is None: | |
loudness = meter.integrated_loudness(audio) | |
if np.isinf(loudness): | |
augmented_gain = 0.0 | |
board[0].gain_db = augmented_gain | |
else: | |
augmented_gain = target_lufs - loudness | |
board[0].gain_db = augmented_gain | |
audio = board(audio.T, samplerate) | |
if target_loudnorm_lufs: | |
after_loudness = meter.integrated_loudness(audio.T) | |
if np.isinf(after_loudness): | |
pass | |
else: | |
target_gain = target_loudnorm_lufs - after_loudness | |
audio = audio * db2linear(target_gain) | |
return audio, loudness | |
""" | |
This dataloader implementation is based on https://github.com/sigsep/open-unmix-pytorch | |
""" | |
class MusdbTrainDataset(Dataset): | |
def __init__( | |
self, | |
target: str = "vocals", | |
root: str = None, | |
seq_duration: Optional[float] = 6.0, | |
samples_per_track: int = 64, | |
source_augmentations: Optional[Callable] = lambda audio: audio, | |
sample_rate: int = 44100, | |
seed: int = 42, | |
limitaug_method: str = "limitaug_then_loudnorm", | |
limitaug_mode: str = "normal_L", | |
limitaug_custom_target_lufs: float = None, | |
limitaug_custom_target_lufs_std: float = None, | |
target_loudnorm_lufs: float = -14.0, | |
custom_limiter_attack_range: list = [2.0, 2.0], | |
custom_limiter_release_range: list = [200.0, 200.0], | |
*args, | |
**kwargs, | |
) -> None: | |
""" | |
Parameters | |
---------- | |
limitaug_method : str | |
choose from ["linear_gain_increase", "limitaug", "limitaug_then_loudnorm", "only_loudnorm"] | |
limitaug_mode : str | |
choose from ["uniform", "normal", "normal_L", "normal_XL", "normal_short_term", "normal_L_short_term", "normal_XL_short_term", "custom"] | |
limitaug_custom_target_lufs : float | |
valid only when | |
limitaug_mode == "custom" | |
limitaug_custom_target_lufs_std : float | |
also valid only when | |
limitaug_mode == "custom | |
target_loudnorm_lufs : float | |
valid only when | |
limitaug_method == 'limitaug_then_loudnorm' or 'only_loudnorm' | |
default is -14. | |
To the best of my knowledge, Spotify and Youtube music is using -14 as a reference loudness normalization level. | |
No special reason for the choice of -14 as target_loudnorm_lufs. | |
target : str | |
target name of the source to be separated, defaults to ``vocals``. | |
root : str | |
root path of MUSDB | |
seq_duration : float | |
training is performed in chunks of ``seq_duration`` (in seconds, | |
defaults to ``None`` which loads the full audio track | |
samples_per_track : int | |
sets the number of samples, yielded from each track per epoch. | |
Defaults to 64 | |
source_augmentations : list[callables] | |
provide list of augmentation function that take a multi-channel | |
audio file of shape (src, samples) as input and output. Defaults to | |
no-augmentations (input = output) | |
seed : int | |
control randomness of dataset iterations | |
args, kwargs : additional keyword arguments | |
used to add further control for the musdb dataset | |
initialization function. | |
""" | |
self.seed = seed | |
random.seed(seed) | |
self.seq_duration = seq_duration | |
self.target = target | |
self.samples_per_track = samples_per_track | |
self.source_augmentations = source_augmentations | |
self.sample_rate = sample_rate | |
self.root = root | |
self.sources = ["vocals", "bass", "drums", "other"] | |
self.train_list = glob(f"{self.root}/train/*") | |
self.valid_list = [ | |
"ANiMAL - Rockshow", | |
"Actions - One Minute Smile", | |
"Alexander Ross - Goodbye Bolero", | |
"Clara Berry And Wooldog - Waltz For My Victims", | |
"Fergessen - Nos Palpitants", | |
"James May - On The Line", | |
"Johnny Lokke - Promises & Lies", | |
"Leaf - Summerghost", | |
"Meaxic - Take A Step", | |
"Patrick Talbot - A Reason To Leave", | |
"Skelpolu - Human Mistakes", | |
"Traffic Experiment - Sirens", | |
"Triviul - Angelsaint", | |
"Young Griffo - Pennies", | |
] | |
self.train_list = [ | |
x for x in self.train_list if os.path.basename(x) not in self.valid_list | |
] | |
# limitaug related | |
self.limitaug_method = limitaug_method | |
self.limitaug_mode = limitaug_mode | |
self.limitaug_custom_target_lufs = limitaug_custom_target_lufs | |
self.limitaug_custom_target_lufs_std = limitaug_custom_target_lufs_std | |
self.target_loudnorm_lufs = target_loudnorm_lufs | |
self.meter = pyln.Meter(self.sample_rate) | |
# Method (1) in our paper's Results section and Table 5 | |
if self.limitaug_method == "linear_gain_increase": | |
print("using linear gain increasing!") | |
self.board = Pedalboard([Gain(gain_db=0.0)]) | |
# Method (2) in our paper's Results section and Table 5 | |
elif self.limitaug_method == "limitaug": | |
print("using limitaug!") | |
self.board = Pedalboard( | |
[Gain(gain_db=0.0), Limiter(threshold_db=0.0, release_ms=100.0)] | |
) | |
# Method (3) in our paper's Results section and Table 5 | |
elif self.limitaug_method == "only_loudnorm": | |
print("using only loudness normalized inputs") | |
# Method (4) in our paper's Results section and Table 5 | |
elif self.limitaug_method == "limitaug_then_loudnorm": | |
print("using limitaug then loudness normalize!") | |
self.board = Pedalboard( | |
[Gain(gain_db=0.0), Limiter(threshold_db=0.0, release_ms=100.0)] | |
) | |
elif self.limitaug_method == "custom_limiter_limitaug": | |
print("using Custom limiter limitaug!") | |
self.custom_limiter_attack_range = custom_limiter_attack_range | |
self.custom_limiter_release_range = custom_limiter_release_range | |
self.board = Pedalboard( | |
[ | |
Gain(gain_db=0.0), | |
Compressor( | |
threshold_db=-10.0, ratio=4.0, attack_ms=2.0, release_ms=200.0 | |
), # attack_ms and release_ms will be changed later. | |
Compressor( | |
threshold_db=0.0, | |
ratio=1000.0, | |
attack_ms=0.001, | |
release_ms=100.0, | |
), | |
Gain(gain_db=3.75), | |
Clipping(threshold_db=0.0), | |
] | |
) # This implementation is the same as JUCE Limiter. | |
# However, we want the first compressor to have a variable attack and release time. | |
# Therefore, we use the Custom Limiter instead of the JUCE Limiter. | |
self.limitaug_mode_statistics = { | |
"normal": [ | |
-15.954, | |
1.264, | |
], # -15.954 is mean LUFS of musdb-hq and 1.264 is standard deviation | |
"normal_L": [ | |
-10.887, | |
1.191, | |
], # -10.887 is mean LUFS of musdb-L and 1.191 is standard deviation | |
"normal_XL": [ | |
-8.608, | |
1.165, | |
], # -8.608 is mean LUFS of musdb-L and 1.165 is standard deviation | |
"normal_short_term": [ | |
-17.317, | |
5.036, | |
], # In our experiments, short-term statistics were not helpful. | |
"normal_L_short_term": [-12.303, 5.233], | |
"normal_XL_short_term": [-9.988, 5.518], | |
"custom": [limitaug_custom_target_lufs, limitaug_custom_target_lufs_std], | |
} | |
def sample_target_lufs(self): | |
if ( | |
self.limitaug_mode == "uniform" | |
): # if limitaug_mode is uniform, then choose target_lufs from uniform distribution | |
target_lufs = random.uniform(-20, -5) | |
else: # else, choose target_lufs from gaussian distribution | |
target_lufs = random.gauss( | |
self.limitaug_mode_statistics[self.limitaug_mode][0], | |
self.limitaug_mode_statistics[self.limitaug_mode][1], | |
) | |
return target_lufs | |
def get_limitaug_results(self, mixture, target): | |
# Apply linear gain increasing (Method (1)) | |
if self.limitaug_method == "linear_gain_increase": | |
target_lufs = self.sample_target_lufs() | |
mixture, target = apply_linear_gain_increase( | |
mixture, | |
target, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
) | |
# Apply LimitAug (Method (2)) | |
elif self.limitaug_method == "limitaug": | |
self.board[1].release_ms = random.uniform(30.0, 200.0) | |
mixture_orig = mixture.copy() | |
target_lufs = self.sample_target_lufs() | |
mixture, _ = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
) | |
print("mixture shape:", mixture.shape) | |
print("target shape:", target.shape) | |
target *= mixture / (mixture_orig + 1e-8) | |
# Apply only loudness normalization (Method(3)) | |
elif self.limitaug_method == "only_loudnorm": | |
mixture_loudness = self.meter.integrated_loudness(mixture.T) | |
if np.isinf( | |
mixture_loudness | |
): # if the source is silence, then mixture_loudness is -inf. | |
pass | |
else: | |
augmented_gain = ( | |
self.target_loudnorm_lufs - mixture_loudness | |
) # default target_loudnorm_lufs is -14. | |
mixture = mixture * db2linear(augmented_gain) | |
target = target * db2linear(augmented_gain) | |
# Apply LimitAug then loudness normalization (Method (4)) | |
elif self.limitaug_method == "limitaug_then_loudnorm": | |
self.board[1].release_ms = random.uniform(30.0, 200.0) | |
mixture_orig = mixture.copy() | |
target_lufs = self.sample_target_lufs() | |
mixture, _ = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
target_loudnorm_lufs=self.target_loudnorm_lufs, | |
) | |
target *= mixture / (mixture_orig + 1e-8) | |
# Apply LimitAug using Custom Limiter | |
elif self.limitaug_method == "custom_limiter_limitaug": | |
# Change attack time of First compressor of the Limiter | |
self.board[1].attack_ms = random.uniform( | |
self.custom_limiter_attack_range[0], self.custom_limiter_attack_range[1] | |
) | |
# Change release time of First compressor of the Limiter | |
self.board[1].release_ms = random.uniform( | |
self.custom_limiter_release_range[0], | |
self.custom_limiter_release_range[1], | |
) | |
# Change release time of Second compressor of the Limiter | |
self.board[2].release_ms = random.uniform(30.0, 200.0) | |
mixture_orig = mixture.copy() | |
target_lufs = self.sample_target_lufs() | |
mixture, _ = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
target_loudnorm_lufs=self.target_loudnorm_lufs, | |
) | |
target *= mixture / (mixture_orig + 1e-8) | |
return mixture, target | |
def __getitem__(self, index): | |
audio_sources = [] | |
target_ind = None | |
for k, source in enumerate(self.sources): | |
# memorize index of target source | |
if source == self.target: # if source is 'vocals' | |
target_ind = k | |
track_path = self.train_list[ | |
index // self.samples_per_track | |
] # we want to use # training samples per each track. | |
audio_path = f"{track_path}/{source}.wav" | |
audio = load_wav_arbitrary_position_stereo( | |
audio_path, self.sample_rate, self.seq_duration | |
) | |
else: | |
track_path = random.choice(self.train_list) | |
audio_path = f"{track_path}/{source}.wav" | |
audio = load_wav_arbitrary_position_stereo( | |
audio_path, self.sample_rate, self.seq_duration | |
) | |
audio = self.source_augmentations(audio) | |
audio_sources.append(audio) | |
stems = np.stack(audio_sources, axis=0) | |
# # apply linear mix over source index=0 | |
x = stems.sum(0) | |
# get the target stem | |
y = stems[target_ind] | |
# Apply the limitaug, | |
x, y = self.get_limitaug_results(x, y) | |
x = torch.as_tensor(x, dtype=torch.float32) | |
y = torch.as_tensor(y, dtype=torch.float32) | |
return x, y | |
def __len__(self): | |
return len(self.train_list) * self.samples_per_track | |
class MusdbValidDataset(Dataset): | |
def __init__( | |
self, | |
target: str = "vocals", | |
root: str = None, | |
*args, | |
**kwargs, | |
) -> None: | |
"""MUSDB18 torch.data.Dataset that samples from the MUSDB tracks | |
using track and excerpts with replacement. | |
Parameters | |
---------- | |
target : str | |
target name of the source to be separated, defaults to ``vocals``. | |
root : str | |
root path of MUSDB18HQ dataset, defaults to ``None``. | |
args, kwargs : additional keyword arguments | |
used to add further control for the musdb dataset | |
initialization function. | |
""" | |
self.target = target | |
self.sample_rate = 44100.0 # musdb is fixed sample rate | |
self.root = root | |
self.sources = ["vocals", "bass", "drums", "other"] | |
self.train_list = glob(f"{self.root}/train/*") | |
self.valid_list = [ | |
"ANiMAL - Rockshow", | |
"Actions - One Minute Smile", | |
"Alexander Ross - Goodbye Bolero", | |
"Clara Berry And Wooldog - Waltz For My Victims", | |
"Fergessen - Nos Palpitants", | |
"James May - On The Line", | |
"Johnny Lokke - Promises & Lies", | |
"Leaf - Summerghost", | |
"Meaxic - Take A Step", | |
"Patrick Talbot - A Reason To Leave", | |
"Skelpolu - Human Mistakes", | |
"Traffic Experiment - Sirens", | |
"Triviul - Angelsaint", | |
"Young Griffo - Pennies", | |
] | |
self.valid_list = [ | |
x for x in self.train_list if os.path.basename(x) in self.valid_list | |
] | |
def __getitem__(self, index): | |
audio_sources = [] | |
target_ind = None | |
for k, source in enumerate(self.sources): | |
# memorize index of target source | |
if source == self.target: # if source is 'vocals' | |
target_ind = k | |
track_path = self.valid_list[index] | |
song_name = os.path.basename(track_path) | |
audio_path = f"{track_path}/{source}.wav" | |
# audio = utils.load_wav_stereo(audio_path, self.sample_rate) | |
audio = librosa.load(audio_path, mono=False, sr=self.sample_rate)[0] | |
else: | |
track_path = self.valid_list[index] | |
song_name = os.path.basename(track_path) | |
audio_path = f"{track_path}/{source}.wav" | |
# audio = utils.load_wav_stereo(audio_path, self.sample_rate) | |
audio = librosa.load(audio_path, mono=False, sr=self.sample_rate)[0] | |
audio = torch.as_tensor(audio, dtype=torch.float32) | |
audio_sources.append(audio) | |
stems = torch.stack(audio_sources, dim=0) | |
# # apply linear mix over source index=0 | |
x = stems.sum(0) | |
# get the target stem | |
y = stems[target_ind] | |
return x, y, song_name | |
def __len__(self): | |
return len(self.valid_list) | |
# If you want to check the LUFS values of training examples, run this. | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Make musdb-L and musdb-XL dataset from its ratio data" | |
) | |
parser.add_argument( | |
"--musdb_root", | |
type=str, | |
default="/path/to/musdb", | |
help="root path of musdb-hq dataset", | |
) | |
parser.add_argument( | |
"--limitaug_method", | |
type=str, | |
default="limitaug", | |
choices=[ | |
"linear_gain_increase", | |
"limitaug", | |
"limitaug_then_loudnorm", | |
"only_loudnorm", | |
None, | |
], | |
help="choose limitaug method", | |
) | |
parser.add_argument( | |
"--limitaug_mode", | |
type=str, | |
default="normal_L", | |
choices=[ | |
"uniform", | |
"normal", | |
"normal_L", | |
"normal_XL", | |
"normal_short_term", | |
"normal_L_short_term", | |
"normal_XL_short_term", | |
"custom", | |
], | |
help="if you use LimitAug, what lufs distribution to target", | |
) | |
parser.add_argument( | |
"--limitaug_custom_target_lufs", | |
type=float, | |
default=None, | |
help="if limitaug_mode is custom, set custom target lufs for LimitAug", | |
) | |
args, _ = parser.parse_known_args() | |
source_augmentations_ = aug_from_str(["gain", "channelswap"]) | |
train_dataset = MusdbTrainDataset( | |
target="vocals", | |
root=args.musdb_root, | |
seq_duration=6.0, | |
source_augmentations=source_augmentations_, | |
limitaug_method=args.limitaug_method, | |
limitaug_mode=args.limitaug_mode, | |
limitaug_custom_target_lufs=args.limitaug_custom_target_lufs, | |
) | |
dataloader = torch.utils.data.DataLoader( | |
train_dataset, | |
batch_size=1, | |
shuffle=True, | |
num_workers=4, | |
pin_memory=True, | |
drop_last=False, | |
) | |
meter = pyln.Meter(44100) | |
for i in range(5): | |
for x, y in dataloader: | |
loudness = meter.integrated_loudness(x[0].numpy().T) | |
print(f"mixture loudness : {loudness} LUFS") | |