|
import numpy as np |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
import matplotlib.pyplot as plt |
|
import soundfile as sf |
|
from collections import defaultdict |
|
from dtw import dtw |
|
from sklearn_extra.cluster import KMedoids |
|
from copy import deepcopy |
|
import os, librosa, json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def z_score(x, mean, std): |
|
return (x - mean) / std |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_word_aligns(rec_ids, norm_sent, aln_dir): |
|
""" |
|
Returns a dictionary of word alignments for a given sentence. |
|
""" |
|
word_aligns = defaultdict(list) |
|
|
|
for rec in rec_ids: |
|
slist = norm_sent.split(" ") |
|
aln_path = os.path.join(aln_dir, f'{rec}.tsv') |
|
with open(aln_path) as f: |
|
lines = f.read().splitlines() |
|
lines = [l.split('\t') for l in lines] |
|
try: |
|
assert len(lines) == len(slist) |
|
word_aligns[rec] = [(w,float(s),float(e)) for w,s,e in lines] |
|
except: |
|
print(slist, lines, "<---- something didn't match") |
|
return word_aligns |
|
|
|
|
|
|
|
def get_pitches(start_time, end_time, id, path): |
|
""" |
|
Returns an array of pitch values for a given speech. |
|
Reads from .f0 file of Time, F0, IsVoiced |
|
""" |
|
|
|
f = os.path.join(path, id + ".f0") |
|
with open(f) as f: |
|
lines = f.read().splitlines() |
|
lines = [[float(x) for x in line.split()] for line in lines] |
|
pitches = [] |
|
|
|
|
|
|
|
mean = np.mean([line[1] for line in lines if line[2] != -1]) |
|
|
|
std = np.std([line[1] for line in lines if line[2] != -1]) |
|
|
|
|
|
for line in lines: |
|
time, pitch, is_pitch = line |
|
|
|
if start_time <= time <= end_time: |
|
if is_pitch: |
|
pitches.append(z_score(pitch, mean, std)) |
|
else: |
|
|
|
pitches.append(-0.99) |
|
|
|
return pitches |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_rmse(start_time, end_time, id, path): |
|
""" |
|
Returns an array of RMSE values for a given speech. |
|
""" |
|
|
|
f = os.path.join(path, id + ".wav") |
|
audio, sr = librosa.load(f, sr=16000) |
|
segment = audio[int(np.floor(start_time * sr)):int(np.ceil(end_time * sr))] |
|
rmse = librosa.feature.rms(y=segment) |
|
rmse = rmse[0] |
|
|
|
return rmse |
|
|
|
|
|
def downsample_rmse2pitch(rmse,pitch_len): |
|
idx = np.round(np.linspace(0, len(rmse) - 1, pitch_len)).astype(int) |
|
return rmse[idx] |
|
|
|
|
|
|
|
|
|
|
|
def parse_word_indices(start_end_word_index): |
|
ixs = start_end_word_index.split('-') |
|
if len(ixs) == 1: |
|
s = int(ixs[0]) |
|
e = int(ixs[0]) |
|
else: |
|
s = int(ixs[0]) |
|
e = int(ixs[-1]) |
|
return s-1,e-1 |
|
|
|
|
|
|
|
|
|
def get_data(norm_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, start_end_word_index): |
|
""" |
|
Returns a dictionary of pitch, rmse, and spectral centroids values for a given sentence/word combinations. |
|
""" |
|
|
|
s_ix, e_ix = parse_word_indices(start_end_word_index) |
|
|
|
words = '_'.join(norm_sent.split(' ')[s_ix:e_ix+1]) |
|
|
|
word_aligns = get_word_aligns(h_spk_ids,norm_sent,h_align_dir) |
|
data = defaultdict(list) |
|
align_data = defaultdict(list) |
|
|
|
for id, word_al in word_aligns.items(): |
|
start_time = word_al[s_ix][1] |
|
end_time = word_al[e_ix][2] |
|
|
|
seg_aligns = word_al[s_ix:e_ix+1] |
|
seg_aligns = [(w,round(s-start_time,2),round(e-start_time,2)) for w,s,e in seg_aligns] |
|
|
|
pitches = get_pitches(start_time, end_time, id, h_f0_dir) |
|
|
|
rmses = get_rmse(start_time, end_time, id, h_wav_dir) |
|
rmses = downsample_rmse2pitch(rmses,len(pitches)) |
|
|
|
|
|
pitches_cpy = np.array(deepcopy(pitches)) |
|
rmses_cpy = np.array(deepcopy(rmses)) |
|
d = [[p, r] for p, r in zip(pitches_cpy, rmses_cpy)] |
|
|
|
data[f"{words}**{id}"] = d |
|
align_data[f"{words}**{id}"] = seg_aligns |
|
|
|
return words, data, align_data |
|
|
|
|
|
|
|
|
|
def dtw_distance(x, y): |
|
""" |
|
Returns the DTW distance between two pitch sequences. |
|
""" |
|
|
|
alignment = dtw(x, y, keep_internals=True) |
|
return alignment.normalizedDistance |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pair_dists(data,words,recs): |
|
|
|
dtw_dists = [] |
|
|
|
for rec1 in recs: |
|
key1 = f'{words}**{rec1}' |
|
val1 = data[key1] |
|
for rec2 in recs: |
|
key2 = f'{words}**{rec2}' |
|
val2 = data[key2] |
|
dtw_dists.append((f"{rec1}**{rec2}", dtw_distance(val1, val2))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return dtw_dists |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def kmedoids_clustering(X): |
|
kmedoids = KMedoids(n_clusters=3, random_state=0).fit(X) |
|
y_km = kmedoids.labels_ |
|
return y_km, kmedoids |
|
|
|
|
|
def get_tts_data(tdir,voice,start_end_word_index): |
|
with open(f'{tdir}{voice}.json') as f: |
|
speechmarks = json.load(f) |
|
speechmarks = speechmarks['alignments'] |
|
|
|
sr=16000 |
|
tts_audio, _ = librosa.load(f'{tdir}{voice}.wav',sr=sr) |
|
|
|
|
|
|
|
|
|
s_ix, e_ix = parse_word_indices(start_end_word_index) |
|
|
|
|
|
|
|
|
|
s_tts = speechmarks[s_ix]["time"]/1000 |
|
if e_ix+1 < len(speechmarks): |
|
e_tts = speechmarks[e_ix+1]["time"]/1000 |
|
tts_segment = tts_audio[int(np.floor(s_tts * sr)):int(np.ceil(e_tts * sr))] |
|
else: |
|
tts_segment = tts_audio[int(np.floor(s_tts * sr)):] |
|
e_tts = len(tts_audio) / sr |
|
|
|
|
|
tts_align = [(speechmarks[ix]["value"],speechmarks[ix]["time"]) for ix in range(s_ix,e_ix+1)] |
|
tts_align = [(w,s/1000) for w,s in tts_align] |
|
tts_align = [(w,round(s-s_tts,3)) for w,s in tts_align] |
|
|
|
tts_f0 = get_pitches(s_tts, e_tts, voice, tdir) |
|
tts_rmse = get_rmse(s_tts, e_tts, voice, tdir) |
|
tts_rmse = downsample_rmse2pitch(tts_rmse,len(tts_f0)) |
|
t_pitches_cpy = np.array(deepcopy(tts_f0)) |
|
t_rmses_cpy = np.array(deepcopy(tts_rmse)) |
|
tts_data = [[p, r] for p, r in zip(t_pitches_cpy, t_rmses_cpy)] |
|
return tts_data, tts_align |
|
|
|
|
|
|
|
def match_tts(clusters, speech_data, tts_data, tts_align, words, seg_aligns, voice): |
|
|
|
tts_info = [] |
|
for label in set([c for r,c in clusters]): |
|
recs = [r for r,c in clusters if c==label] |
|
dists = [] |
|
for rec in recs: |
|
key = f'{words}**{rec}' |
|
dists.append(dtw_distance(tts_data, speech_data[key])) |
|
tts_info.append((label,np.nanmean(dists))) |
|
|
|
tts_info = sorted(tts_info,key = lambda x: x[1]) |
|
best_cluster = tts_info[0][0] |
|
best_cluster_score = tts_info[0][1] |
|
|
|
matched_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==best_cluster} |
|
|
|
|
|
|
|
|
|
mid_cluster = tts_info[1][0] |
|
mid_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==mid_cluster} |
|
bad_cluster = tts_info[2][0] |
|
bad_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==bad_cluster} |
|
|
|
tts_fig_p = plot_pitch_tts(matched_data,tts_data, tts_align, words,seg_aligns,best_cluster,voice) |
|
fig_mid_p = plot_pitch_cluster(mid_data,words,seg_aligns,mid_cluster) |
|
fig_bad_p = plot_pitch_cluster(bad_data,words,seg_aligns,bad_cluster) |
|
|
|
tts_fig_e = plot_rmse_tts(matched_data,tts_data, tts_align, words,seg_aligns,best_cluster,voice) |
|
fig_mid_e = plot_rmse_cluster(mid_data,words,seg_aligns,mid_cluster) |
|
fig_bad_e = plot_rmse_cluster(bad_data,words,seg_aligns,bad_cluster) |
|
|
|
return best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cluster(norm_sent,orig_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, tts_dir, voices, start_end_word_index): |
|
|
|
h_spk_ids = sorted(h_spk_ids) |
|
nsents = len(h_spk_ids) |
|
|
|
words, data, seg_aligns = get_data(norm_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, start_end_word_index) |
|
|
|
dtw_dists = pair_dists(data,words,h_spk_ids) |
|
|
|
kmedoids_cluster_dists = [] |
|
|
|
X = [d[1] for d in dtw_dists] |
|
X = [X[i:i+nsents] for i in range(0, len(X), nsents)] |
|
X = np.array(X) |
|
|
|
y_km, kmedoids = kmedoids_clustering(X) |
|
|
|
|
|
|
|
result = zip(X, kmedoids.labels_) |
|
groups = [[r,c] for r,c in zip(h_spk_ids,kmedoids.labels_)] |
|
|
|
|
|
|
|
tdir = f'{tts_dir}{orig_sent.replace(" ","_")[:65]}/' |
|
for v in voices: |
|
tts_data, tts_align = get_tts_data(tdir,v,start_end_word_index) |
|
|
|
|
|
best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e = match_tts(groups, data, tts_data, tts_align, words, seg_aligns,v) |
|
|
|
|
|
return best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def spks_all_cdist(): |
|
speaker_to_tts_dtw_dists = defaultdict(list) |
|
|
|
for key1, value1 in data.items(): |
|
d = key1.split("-") |
|
words1 = d[:-2] |
|
id1, id2 = d[-2], d[-1] |
|
for key2, value2 in tts_data.items(): |
|
d = key2.split("-") |
|
words2 = d[:-2] |
|
id3, id4 = d[-2], d[-1] |
|
if all([w1 == w2 for w1, w2 in zip(words1, words2)]): |
|
speaker_to_tts_dtw_dists[f"{'-'.join(words1)}"].append((f"{id1}-{id2}_{id3}-{id4}", dtw_distance(value1, value2))) |
|
return speaker_to_tts_dtw_dists |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def tts_cdist(): |
|
tts_dist_to_cluster = defaultdict(list) |
|
|
|
for words1, datas1 in kmedoids_cluster_dists.items(): |
|
for d1 in datas1: |
|
cluster, sp_id1, arr = d1 |
|
for words2, datas2 in speaker_to_tts_dtw_dists.items(): |
|
for d2 in datas2: |
|
ids, dist = d2 |
|
sp_id2, tts_alfur = ids.split("_") |
|
if sp_id1 == sp_id2 and words1 == words2: |
|
tts_dist_to_cluster[f"{words1}-{cluster}"].append(dist) |
|
|
|
tts_mean_dist_to_cluster = { |
|
key: np.mean(value) for key, value in tts_dist_to_cluster.items() |
|
} |
|
return tts_mean_dist_to_cluster |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_audio_part(start_time, end_time, id, path): |
|
""" |
|
Returns a dictionary of RMSE values for a given sentence. |
|
""" |
|
|
|
f = os.path.join(path, id + ".wav") |
|
audio, sr = librosa.load(f, sr=16000) |
|
segment = audio[int(np.floor(start_time * sr)):int(np.ceil(end_time * sr))] |
|
return segment |
|
|
|
|
|
|
|
|
|
def plot_pitch_tts(speech_data,tts_data, tts_align,words,seg_aligns,cluster_id, voice): |
|
colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] |
|
cc = 0 |
|
fig = plt.figure(figsize=(10, 5)) |
|
plt.title(f"{words} - Pitch - Cluster {cluster_id}") |
|
for k,v in speech_data.items(): |
|
|
|
spk = k.split('**')[1] |
|
|
|
word_times = seg_aligns[k] |
|
|
|
pitches = [p for p,e in v] |
|
|
|
pitch_xvals = [x*0.005 for x in range(len(pitches))] |
|
|
|
|
|
|
|
if len(word_times)>1: |
|
realign = np.mean([word_times[0][2],word_times[1][1]]) |
|
pitch_xvals = [x - realign for x in pitch_xvals] |
|
word_times = [(w,s-realign,e-realign) for w,s,e in word_times] |
|
plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") |
|
|
|
if len(word_times)>2: |
|
for i in range(1,len(word_times)-1): |
|
bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) |
|
plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") |
|
|
|
plt.scatter(pitch_xvals, pitches, color=colors[cc], label=f"Speaker {spk}") |
|
cc += 1 |
|
if cc >= len(colors): |
|
cc=0 |
|
|
|
tpitches = [p for p,e in tts_data] |
|
t_xvals = [x*0.005 for x in range(len(tpitches))] |
|
|
|
if len(tts_align)>1: |
|
realign = tts_align[1][1] |
|
t_xvals = [x - realign for x in t_xvals] |
|
tts_align = [(w,s-realign) for w,s in tts_align] |
|
|
|
if len(tts_align)>2: |
|
for i in range(2,len(tts_align)): |
|
bound_line = tts_align[i][1] |
|
plt.axvline(x=bound_line, color="black", linestyle='--', linewidth=1, label=f"TTS -> {tts_align[i][0]}") |
|
plt.scatter(t_xvals, tpitches, color="black", label=f"TTS {voice}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
return fig |
|
|
|
|
|
|
|
def plot_pitch_cluster(speech_data,words,seg_aligns,cluster_id): |
|
colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] |
|
cc = 0 |
|
fig = plt.figure(figsize=(8, 4)) |
|
plt.title(f"{words} - Pitch - Cluster {cluster_id}") |
|
for k,v in speech_data.items(): |
|
|
|
spk = k.split('**')[1] |
|
|
|
word_times = seg_aligns[k] |
|
|
|
pitches = [p for p,e in v] |
|
|
|
pitch_xvals = [x*0.005 for x in range(len(pitches))] |
|
|
|
|
|
|
|
if len(word_times)>1: |
|
realign = np.mean([word_times[0][2],word_times[1][1]]) |
|
pitch_xvals = [x - realign for x in pitch_xvals] |
|
word_times = [(w,s-realign,e-realign) for w,s,e in word_times] |
|
plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") |
|
|
|
if len(word_times)>2: |
|
for i in range(1,len(word_times)-1): |
|
bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) |
|
plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") |
|
|
|
plt.scatter(pitch_xvals, pitches, color=colors[cc], label=f"Speaker {spk}") |
|
cc += 1 |
|
if cc >= len(colors): |
|
cc=0 |
|
|
|
|
|
|
|
|
|
|
|
return fig |
|
|
|
|
|
|
|
|
|
def plot_rmse_tts(speech_data,tts_data, tts_align,words,seg_aligns,cluster_id, voice): |
|
colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] |
|
cc = 0 |
|
fig = plt.figure(figsize=(10, 5)) |
|
plt.title(f"{words} - Energy - Cluster {cluster_id}") |
|
for k,v in speech_data.items(): |
|
|
|
spk = k.split('**')[1] |
|
|
|
word_times = seg_aligns[k] |
|
|
|
rmse = [e for p,e in v] |
|
|
|
rmse_xvals = [x*0.005 for x in range(len(rmse))] |
|
|
|
|
|
|
|
if len(word_times)>1: |
|
realign = np.mean([word_times[0][2],word_times[1][1]]) |
|
rmse_xvals = [x - realign for x in rmse_xvals] |
|
word_times = [(w,s-realign,e-realign) for w,s,e in word_times] |
|
plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") |
|
|
|
if len(word_times)>2: |
|
for i in range(1,len(word_times)-1): |
|
bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) |
|
plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") |
|
|
|
plt.scatter(rmse_xvals, rmse, color=colors[cc], label=f"Speaker {spk}") |
|
cc += 1 |
|
if cc >= len(colors): |
|
cc=0 |
|
|
|
trmse = [e for p,e in tts_data] |
|
t_xvals = [x*0.005 for x in range(len(trmse))] |
|
|
|
if len(tts_align)>1: |
|
realign = tts_align[1][1] |
|
t_xvals = [x - realign for x in t_xvals] |
|
tts_align = [(w,s-realign) for w,s in tts_align] |
|
|
|
if len(tts_align)>2: |
|
for i in range(2,len(tts_align)): |
|
bound_line = tts_align[i][1] |
|
plt.axvline(x=bound_line, color="black", linestyle='--', linewidth=1, label=f"TTS -> {tts_align[i][0]}") |
|
plt.scatter(t_xvals, trmse, color="black", label=f"TTS {voice}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
return fig |
|
|
|
|
|
def plot_rmse_cluster(speech_data,words,seg_aligns,cluster_id): |
|
colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] |
|
cc = 0 |
|
fig = plt.figure(figsize=(10, 5)) |
|
plt.title(f"{words} - Energy - Cluster {cluster_id}") |
|
for k,v in speech_data.items(): |
|
|
|
spk = k.split('**')[1] |
|
|
|
word_times = seg_aligns[k] |
|
|
|
rmse = [e for p,e in v] |
|
|
|
rmse_xvals = [x*0.005 for x in range(len(rmse))] |
|
|
|
|
|
|
|
if len(word_times)>1: |
|
realign = np.mean([word_times[0][2],word_times[1][1]]) |
|
rmse_xvals = [x - realign for x in rmse_xvals] |
|
word_times = [(w,s-realign,e-realign) for w,s,e in word_times] |
|
plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") |
|
|
|
if len(word_times)>2: |
|
for i in range(1,len(word_times)-1): |
|
bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) |
|
plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") |
|
|
|
plt.scatter(rmse_xvals, rmse, color=colors[cc], label=f"Speaker {spk}") |
|
cc += 1 |
|
if cc >= len(colors): |
|
cc=0 |
|
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def plot_clusters(X, y, word): |
|
u_labels = np.unique(y) |
|
|
|
|
|
for i in u_labels: |
|
plt.scatter(X[y == i, 0], X[y == i, 1], label=i) |
|
plt.title(word) |
|
plt.legend() |
|
plt.show() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|