|
import numpy as np |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
import matplotlib.pyplot as plt |
|
import soundfile as sf |
|
from collections import defaultdict |
|
from dtw import dtw |
|
from sklearn_extra.cluster import KMedoids |
|
from copy import deepcopy |
|
import os, librosa, json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def z_score(x, mean, std): |
|
return (x - mean) / std |
|
|
|
|
|
|
|
|
|
def get_word_aligns(norm_sent, aln_paths): |
|
""" |
|
Returns a dictionary of word alignments for a given sentence. |
|
""" |
|
word_aligns = defaultdict(list) |
|
slist = norm_sent.split(" ") |
|
|
|
for spk,aln_path in aln_paths: |
|
with open(aln_path) as f: |
|
lines = f.read().splitlines() |
|
lines = [l.split('\t') for l in lines] |
|
try: |
|
assert len(lines) == len(slist) |
|
word_aligns[spk] = [(w,float(s),float(e)) for w,s,e in lines] |
|
except: |
|
print(slist, lines, "<---- something didn't match") |
|
return word_aligns |
|
|
|
|
|
|
|
|
|
def get_pitches(start_time, end_time, fpath): |
|
""" |
|
Returns an array of pitch values for a given speech. |
|
Reads from .f0 file of Time, F0, IsVoiced |
|
""" |
|
|
|
with open(fpath) as f: |
|
lines = f.read().splitlines() |
|
lines = [[float(x) for x in line.split()] for line in lines] |
|
pitches = [] |
|
|
|
|
|
|
|
mean = np.mean([line[1] for line in lines if line[2] != -1]) |
|
|
|
std = np.std([line[1] for line in lines if line[2] != -1]) |
|
|
|
|
|
for line in lines: |
|
time, pitch, is_pitch = line |
|
|
|
if start_time <= time <= end_time: |
|
if is_pitch: |
|
pitches.append(z_score(pitch, mean, std)) |
|
else: |
|
|
|
pitches.append(-0.99) |
|
|
|
return pitches |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_rmse(start_time, end_time, wpath): |
|
""" |
|
Returns an array of RMSE values for a given speech. |
|
""" |
|
|
|
audio, sr = librosa.load(wpath, sr=16000) |
|
segment = audio[int(np.floor(start_time * sr)):int(np.ceil(end_time * sr))] |
|
rmse = librosa.feature.rms(y=segment,frame_length=480,hop_length=80) |
|
rmse = rmse[0] |
|
|
|
return rmse |
|
|
|
|
|
|
|
def downsample_rmse2pitch(rmse,pitch_len): |
|
idx = np.round(np.linspace(0, len(rmse) - 1, pitch_len)).astype(int) |
|
return rmse[idx] |
|
|
|
|
|
|
|
|
|
|
|
def parse_word_indices(start_end_word_index): |
|
ixs = start_end_word_index.split('-') |
|
if len(ixs) == 1: |
|
s = int(ixs[0]) |
|
e = int(ixs[0]) |
|
else: |
|
s = int(ixs[0]) |
|
e = int(ixs[-1]) |
|
return s-1,e-1 |
|
|
|
|
|
|
|
|
|
def get_data(norm_sent,path_key,start_end_word_index): |
|
""" |
|
Returns a dictionary of pitch, rmse, and spectral centroids values for a given sentence/word combinations. |
|
""" |
|
|
|
s_ix, e_ix = parse_word_indices(start_end_word_index) |
|
words = '_'.join(norm_sent.split(' ')[s_ix:e_ix+1]) |
|
|
|
align_paths = [(spk,pdict['aln']) for spk,pdict in path_key] |
|
word_aligns = get_word_aligns(norm_sent, align_paths) |
|
|
|
data = defaultdict(list) |
|
align_data = defaultdict(list) |
|
|
|
for spk, pdict in path_key: |
|
word_al = word_aligns[spk] |
|
start_time = word_al[s_ix][1] |
|
end_time = word_al[e_ix][2] |
|
|
|
seg_aligns = word_al[s_ix:e_ix+1] |
|
seg_aligns = [(w,round(s-start_time,2),round(e-start_time,2)) for w,s,e in seg_aligns] |
|
|
|
pitches = get_pitches(start_time, end_time, pdict['f0']) |
|
|
|
rmses = get_rmse(start_time, end_time, pdict['wav']) |
|
rmses = downsample_rmse2pitch(rmses,len(pitches)) |
|
|
|
|
|
pitches_cpy = np.array(deepcopy(pitches)) |
|
rmses_cpy = np.array(deepcopy(rmses)) |
|
d = [[p, r] for p, r in zip(pitches_cpy, rmses_cpy)] |
|
|
|
data[f"{words}**{spk}"] = d |
|
align_data[f"{words}**{spk}"] = seg_aligns |
|
|
|
return words, data, align_data |
|
|
|
|
|
|
|
def dtw_distance(x, y): |
|
""" |
|
Returns the DTW distance between two pitch sequences. |
|
""" |
|
|
|
alignment = dtw(x, y, keep_internals=True) |
|
return alignment.normalizedDistance |
|
|
|
|
|
|
|
|
|
|
|
|
|
def pair_dists(data,words,recs): |
|
|
|
dtw_dists = [] |
|
|
|
for rec1 in recs: |
|
key1 = f'{words}**{rec1}' |
|
val1 = data[key1] |
|
for rec2 in recs: |
|
key2 = f'{words}**{rec2}' |
|
val2 = data[key2] |
|
dtw_dists.append((f"{rec1}**{rec2}", dtw_distance(val1, val2))) |
|
|
|
return dtw_dists |
|
|
|
|
|
|
|
|
|
|
|
def kmedoids_clustering(X): |
|
kmedoids = KMedoids(n_clusters=3, random_state=0).fit(X) |
|
y_km = kmedoids.labels_ |
|
return y_km, kmedoids |
|
|
|
|
|
|
|
def match_tts(clusters, speech_data, tts_data, tts_align, words, seg_aligns, voice): |
|
|
|
|
|
tts_info = [] |
|
for label in set([c for r,c in clusters]): |
|
recs = [r for r,c in clusters if c==label] |
|
dists = [] |
|
for rec in recs: |
|
key = f'{words}**{rec}' |
|
dists.append(dtw_distance(tts_data, speech_data[key])) |
|
tts_info.append((label,np.nanmean(dists))) |
|
|
|
tts_info = sorted(tts_info,key = lambda x: x[1]) |
|
best_cluster = tts_info[0][0] |
|
best_cluster_score = tts_info[0][1] |
|
|
|
matched_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==best_cluster} |
|
|
|
|
|
|
|
|
|
mid_cluster = tts_info[1][0] |
|
mid_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==mid_cluster} |
|
bad_cluster = tts_info[2][0] |
|
bad_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==bad_cluster} |
|
|
|
|
|
tts_fig_p = plot_one_cluster(words,'pitch',matched_data,seg_aligns,best_cluster,tts_data=tts_data,tts_align=tts_align,voice=voice) |
|
fig_mid_p = plot_one_cluster(words,'pitch',mid_data,seg_aligns,mid_cluster) |
|
fig_bad_p = plot_one_cluster(words,'pitch',bad_data,seg_aligns,bad_cluster) |
|
|
|
|
|
tts_fig_e = plot_one_cluster(words,'rmse',matched_data,seg_aligns,best_cluster,tts_data=tts_data,tts_align=tts_align,voice=voice) |
|
fig_mid_e = plot_one_cluster(words,'rmse',mid_data,seg_aligns,mid_cluster) |
|
fig_bad_e = plot_one_cluster(words,'rmse',bad_data,seg_aligns,bad_cluster) |
|
|
|
return best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e |
|
|
|
|
|
|
|
def gp(d,s,x): |
|
return os.path.join(d, f'{s}.{x}') |
|
|
|
def gen_tts_paths(tdir,voices): |
|
plist = [(v, {'wav': gp(tdir,v,'wav'), 'aln': gp(tdir,v,'tsv'), 'f0': gp(tdir,v,'f0')}) for v in voices] |
|
return plist |
|
|
|
def gen_h_paths(wdir,adir,f0dir,spks): |
|
plist = [(s, {'wav': gp(wdir,s,'wav'), 'aln': gp(adir,s,'tsv'), 'f0': gp(f0dir,s,'f0')}) for s in spks] |
|
return plist |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cluster(norm_sent,orig_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, tts_sent_dir, voices, start_end_word_index): |
|
|
|
h_spk_ids = sorted(h_spk_ids) |
|
nsents = len(h_spk_ids) |
|
|
|
h_all_paths = gen_h_paths(h_wav_dir,h_align_dir,h_f0_dir,h_spk_ids) |
|
|
|
words, h_data, h_seg_aligns = get_data(norm_sent,h_all_paths,start_end_word_index) |
|
|
|
dtw_dists = pair_dists(h_data,words,h_spk_ids) |
|
|
|
kmedoids_cluster_dists = [] |
|
|
|
X = [d[1] for d in dtw_dists] |
|
X = [X[i:i+nsents] for i in range(0, len(X), nsents)] |
|
X = np.array(X) |
|
|
|
y_km, kmedoids = kmedoids_clustering(X) |
|
|
|
|
|
|
|
result = zip(X, kmedoids.labels_) |
|
groups = [[r,c] for r,c in zip(h_spk_ids,kmedoids.labels_)] |
|
|
|
|
|
tts_all_paths = gen_tts_paths(tts_sent_dir, voices) |
|
_, tts_data, tts_seg_aligns = get_data(norm_sent,tts_all_paths,start_end_word_index) |
|
|
|
for v in voices: |
|
voice_data = tts_data[f"{words}**{v}"] |
|
voice_align = tts_seg_aligns[f"{words}**{v}"] |
|
|
|
|
|
|
|
|
|
best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e = match_tts(groups, h_data, voice_data, voice_align, words, h_seg_aligns,v) |
|
|
|
|
|
return best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def spks_all_cdist(): |
|
speaker_to_tts_dtw_dists = defaultdict(list) |
|
|
|
for key1, value1 in data.items(): |
|
d = key1.split("-") |
|
words1 = d[:-2] |
|
id1, id2 = d[-2], d[-1] |
|
for key2, value2 in tts_data.items(): |
|
d = key2.split("-") |
|
words2 = d[:-2] |
|
id3, id4 = d[-2], d[-1] |
|
if all([w1 == w2 for w1, w2 in zip(words1, words2)]): |
|
speaker_to_tts_dtw_dists[f"{'-'.join(words1)}"].append((f"{id1}-{id2}_{id3}-{id4}", dtw_distance(value1, value2))) |
|
return speaker_to_tts_dtw_dists |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def tts_cdist(): |
|
tts_dist_to_cluster = defaultdict(list) |
|
|
|
for words1, datas1 in kmedoids_cluster_dists.items(): |
|
for d1 in datas1: |
|
cluster, sp_id1, arr = d1 |
|
for words2, datas2 in speaker_to_tts_dtw_dists.items(): |
|
for d2 in datas2: |
|
ids, dist = d2 |
|
sp_id2, tts_alfur = ids.split("_") |
|
if sp_id1 == sp_id2 and words1 == words2: |
|
tts_dist_to_cluster[f"{words1}-{cluster}"].append(dist) |
|
|
|
tts_mean_dist_to_cluster = { |
|
key: np.mean(value) for key, value in tts_dist_to_cluster.items() |
|
} |
|
return tts_mean_dist_to_cluster |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_audio_part(start_time, end_time, id, path): |
|
""" |
|
Returns a dictionary of RMSE values for a given sentence. |
|
""" |
|
|
|
f = os.path.join(path, id + ".wav") |
|
audio, sr = librosa.load(f, sr=16000) |
|
segment = audio[int(np.floor(start_time * sr)):int(np.ceil(end_time * sr))] |
|
return segment |
|
|
|
|
|
|
|
def plot_one_cluster(words,feature,speech_data,seg_aligns,cluster_id,tts_data=None,tts_align=None,voice=None): |
|
|
|
colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] |
|
cc = 0 |
|
fig = plt.figure(figsize=(10, 5)) |
|
|
|
if feature.lower() in ['pitch','f0']: |
|
fname = 'Pitch' |
|
ffunc = lambda x: [p for p,e in x] |
|
pfunc = plt.scatter |
|
elif feature.lower() in ['energy', 'rmse']: |
|
fname = 'Energy' |
|
ffunc = lambda x: [e for p,e in x] |
|
pfunc = plt.plot |
|
else: |
|
print('problem with the figure') |
|
return fig |
|
|
|
plt.title(f"{words} - {fname} - Cluster {cluster_id}") |
|
for k,v in speech_data.items(): |
|
|
|
spk = k.split('**')[1] |
|
|
|
word_times = seg_aligns[k] |
|
|
|
|
|
feats = ffunc(v) |
|
|
|
feat_xvals = [x*0.005 for x in range(len(feats))] |
|
|
|
|
|
|
|
if len(word_times)>1: |
|
realign = np.mean([word_times[0][2],word_times[1][1]]) |
|
feat_xvals = [x - realign for x in feat_xvals] |
|
word_times = [(w,s-realign,e-realign) for w,s,e in word_times] |
|
plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") |
|
|
|
if len(word_times)>2: |
|
for i in range(1,len(word_times)-1): |
|
bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) |
|
plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") |
|
|
|
pfunc(feat_xvals, feats, color=colors[cc], label=f"Speaker {spk}") |
|
cc += 1 |
|
if cc >= len(colors): |
|
cc=0 |
|
|
|
if voice: |
|
tfeats = ffunc(tts_data) |
|
t_xvals = [x*0.005 for x in range(len(tfeats))] |
|
|
|
if len(tts_align)>1: |
|
realign = np.mean([tts_align[0][2],tts_align[1][1]]) |
|
t_xvals = [x - realign for x in t_xvals] |
|
tts_align = [(w,s-realign,e-realign) for w,s,e in tts_align] |
|
|
|
if len(tts_align)>2: |
|
for i in range(1,len(tts_align)-1): |
|
bound_line = np.mean([tts_align[i][2],tts_align[i+1][1]]) |
|
plt.axvline(x=bound_line, color="black", linestyle='--', linewidth=1, label=f"TTS -> {tts_align[i+1][0]}") |
|
pfunc(t_xvals, tfeats, color="black", label=f"TTS {voice}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|