import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import soundfile as sf from collections import defaultdict from dtw import dtw from sklearn_extra.cluster import KMedoids from copy import deepcopy import os, librosa, json # based on original implementation by # https://colab.research.google.com/drive/1RApnJEocx3-mqdQC2h5SH8vucDkSlQYt?authuser=1#scrollTo=410ecd91fa29bc73 # by magnús freyr morthens 2023 supported by rannís nsn def z_score(x, mean, std): return (x - mean) / std # output # {'013823-0457777': [('hvaða', 0.89, 1.35), # ('sjúkdómar', 1.35, 2.17), # ('geta', 2.17, 2.4), # ('fylgt', 2.4, 2.83), # ('óbeinum', 2.83, 3.29), # ('reykingum', 3.29, 3.9)], # '014226-0508808': [('hvaða', 1.03, 1.45), # ('sjúkdómar', 1.45, 2.28), # ('geta', 2.41, 2.7), # ('fylgt', 2.7, 3.09), # ('óbeinum', 3.09, 3.74), # ('reykingum', 3.74, 4.42)], # '013726-0843679': [('hvaða', 0.87, 1.14), # ('sjúkdómar', 1.14, 1.75), # ('geta', 1.75, 1.96), # ('fylgt', 1.96, 2.27), # ('óbeinum', 2.27, 2.73), # ('reykingum', 2.73, 3.27)] } # takes a list of human SPEAKER IDS not the whole meta db def get_word_aligns(rec_ids, norm_sent, aln_dir): """ Returns a dictionary of word alignments for a given sentence. """ word_aligns = defaultdict(list) for rec in rec_ids: slist = norm_sent.split(" ") aln_path = os.path.join(aln_dir, f'{rec}.tsv') with open(aln_path) as f: lines = f.read().splitlines() lines = [l.split('\t') for l in lines] try: assert len(lines) == len(slist) word_aligns[rec] = [(w,float(s),float(e)) for w,s,e in lines] except: print(slist, lines, "<---- something didn't match") return word_aligns def get_pitches(start_time, end_time, id, path): """ Returns an array of pitch values for a given speech. Reads from .f0 file of Time, F0, IsVoiced """ f = os.path.join(path, id + ".f0") with open(f) as f: lines = f.read().splitlines() lines = [[float(x) for x in line.split()] for line in lines] # split lines into floats pitches = [] # find the mean of all pitches in the whole sentence mean = np.mean([line[1] for line in lines if line[2] != -1]) # find the std of all pitches in the whole sentence std = np.std([line[1] for line in lines if line[2] != -1]) for line in lines: time, pitch, is_pitch = line if start_time <= time <= end_time: if is_pitch: pitches.append(z_score(pitch, mean, std)) else: #pitches.append(z_score(fifth_percentile, mean, std)) pitches.append(-0.99) return pitches # jcheng used energy from esps get_f0 # get f0 says (?) : #The RMS value of each record is computed based on a 30 msec hanning #window with its left edge placed 5 msec before the beginning of the #frame. # jcheng z-scored the energys, per file. # TODO: implement that. ? # not sure librosa provides hamming window in rms function directly # TODO handle audio that not originally .wav def get_rmse(start_time, end_time, id, path): """ Returns an array of RMSE values for a given speech. """ f = os.path.join(path, id + ".wav") audio, sr = librosa.load(f, sr=16000) segment = audio[int(np.floor(start_time * sr)):int(np.ceil(end_time * sr))] rmse = librosa.feature.rms(y=segment) rmse = rmse[0] #idx = np.round(np.linspace(0, len(rmse) - 1, pitch_len)).astype(int) return rmse#[idx] def downsample_rmse2pitch(rmse,pitch_len): idx = np.round(np.linspace(0, len(rmse) - 1, pitch_len)).astype(int) return rmse[idx] # parse user input string to usable word indices for the sentence # TODO handle cases def parse_word_indices(start_end_word_index): ixs = start_end_word_index.split('-') if len(ixs) == 1: s = int(ixs[0]) e = int(ixs[0]) else: s = int(ixs[0]) e = int(ixs[-1]) return s-1,e-1 # take any (1stword, lastword) or (word) # unit and prepare data for that unit def get_data(norm_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, start_end_word_index): """ Returns a dictionary of pitch, rmse, and spectral centroids values for a given sentence/word combinations. """ s_ix, e_ix = parse_word_indices(start_end_word_index) words = '_'.join(norm_sent.split(' ')[s_ix:e_ix+1]) word_aligns = get_word_aligns(h_spk_ids,norm_sent,h_align_dir) data = defaultdict(list) align_data = defaultdict(list) for id, word_al in word_aligns.items(): start_time = word_al[s_ix][1] end_time = word_al[e_ix][2] seg_aligns = word_al[s_ix:e_ix+1] seg_aligns = [(w,round(s-start_time,2),round(e-start_time,2)) for w,s,e in seg_aligns] pitches = get_pitches(start_time, end_time, id, h_f0_dir) rmses = get_rmse(start_time, end_time, id, h_wav_dir) rmses = downsample_rmse2pitch(rmses,len(pitches)) #spectral_centroids = get_spectral_centroids(start_time, end_time, id, wav_dir, len(pitches)) pitches_cpy = np.array(deepcopy(pitches)) rmses_cpy = np.array(deepcopy(rmses)) d = [[p, r] for p, r in zip(pitches_cpy, rmses_cpy)] #words = "-".join(word_combs) data[f"{words}**{id}"] = d align_data[f"{words}**{id}"] = seg_aligns return words, data, align_data def dtw_distance(x, y): """ Returns the DTW distance between two pitch sequences. """ alignment = dtw(x, y, keep_internals=True) return alignment.normalizedDistance # recs is a sorted list of rec IDs # all recs/data contain the same words # rec1 and rec2 can be the same def pair_dists(data,words,recs): dtw_dists = [] for rec1 in recs: key1 = f'{words}**{rec1}' val1 = data[key1] for rec2 in recs: key2 = f'{words}**{rec2}' val2 = data[key2] dtw_dists.append((f"{rec1}**{rec2}", dtw_distance(val1, val2))) #for key1, value1 in data.items(): # d1 = key1.split("**") # words1 = d1[0] # if not words: # words = words1 # spk1 = d1[1] # for key2, value2 in data.items(): # d2 = key2.split("**") # words2 = d2[0] # spk2 = d2[1] # if all([w0 == w2 for w0, w2 in zip(words.split('_'), words2.split('_'))]): #dtw_dists[words1].append((f"{spk1}**{spk2}", dtw_distance(value1, value2))) # dtw_dists.append((f"{spk1}**{spk2}", dtw_distance(value1, value2))) return dtw_dists # dtw dists is the dict from units to list of tuples # or: now just the list not labelled with the unit. # {'hvaða-sjúkdómar': # [('013823-0457777_013823-0457777', 0.0), # ('013823-0457777_013698-0441666', 0.5999433281203399), # ('013823-0457777_014675-0563760', 0.4695447105594414), # ('014226-0508808_013823-0457777', 0.44080874425223393), # ('014226-0508808_014226-0508808', 0.0), # ('014226-0508808_013726-0843679', 0.5599404672667414), # ('014226-0508808_013681-0442313', 0.6871330752342419)] # } # the 0-distance self-comparisons are present here # along with both copies of symmetric Speaker1**Speaker2, Speaker2**Speaker1 # TODO # make n_clusters a param with default 3 def kmedoids_clustering(X): kmedoids = KMedoids(n_clusters=3, random_state=0).fit(X) y_km = kmedoids.labels_ return y_km, kmedoids def get_tts_data(tdir,voice,start_end_word_index): with open(f'{tdir}{voice}.json') as f: speechmarks = json.load(f) speechmarks = speechmarks['alignments'] sr=16000 tts_audio, _ = librosa.load(f'{tdir}{voice}.wav',sr=sr) # TODO # tts operates on punctuated version # so clean this up instead of assuming it will work s_ix, e_ix = parse_word_indices(start_end_word_index) # TODO # default speechmarks return word start time only - # this cannot describe pauses ####### s_tts = speechmarks[s_ix]["time"]/1000 if e_ix+1 < len(speechmarks): #if user doesn't want final word, which has no end time mark, e_tts = speechmarks[e_ix+1]["time"]/1000 tts_segment = tts_audio[int(np.floor(s_tts * sr)):int(np.ceil(e_tts * sr))] else: tts_segment = tts_audio[int(np.floor(s_tts * sr)):] e_tts = len(tts_audio) / sr # TODO not ideal as probably silence padding on end file? tts_align = [(speechmarks[ix]["value"],speechmarks[ix]["time"]) for ix in range(s_ix,e_ix+1)] tts_align = [(w,s/1000) for w,s in tts_align] tts_align = [(w,round(s-s_tts,3)) for w,s in tts_align] tts_f0 = get_pitches(s_tts, e_tts, voice, tdir) tts_rmse = get_rmse(s_tts, e_tts, voice, tdir) tts_rmse = downsample_rmse2pitch(tts_rmse,len(tts_f0)) t_pitches_cpy = np.array(deepcopy(tts_f0)) t_rmses_cpy = np.array(deepcopy(tts_rmse)) tts_data = [[p, r] for p, r in zip(t_pitches_cpy, t_rmses_cpy)] return tts_data, tts_align def match_tts(clusters, speech_data, tts_data, tts_align, words, seg_aligns, voice): tts_info = [] for label in set([c for r,c in clusters]): recs = [r for r,c in clusters if c==label] dists = [] for rec in recs: key = f'{words}**{rec}' dists.append(dtw_distance(tts_data, speech_data[key])) tts_info.append((label,np.nanmean(dists))) tts_info = sorted(tts_info,key = lambda x: x[1]) best_cluster = tts_info[0][0] best_cluster_score = tts_info[0][1] matched_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==best_cluster} # now do graphs of matched_data with tts_data # and report best_cluster_score mid_cluster = tts_info[1][0] mid_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==mid_cluster} bad_cluster = tts_info[2][0] bad_data = {f'{words}**{r}': speech_data[f'{words}**{r}'] for r,c in clusters if c==bad_cluster} tts_fig_p = plot_pitch_tts(matched_data,tts_data, tts_align, words,seg_aligns,best_cluster,voice) fig_mid_p = plot_pitch_cluster(mid_data,words,seg_aligns,mid_cluster) fig_bad_p = plot_pitch_cluster(bad_data,words,seg_aligns,bad_cluster) tts_fig_e = plot_rmse_tts(matched_data,tts_data, tts_align, words,seg_aligns,best_cluster,voice) fig_mid_e = plot_rmse_cluster(mid_data,words,seg_aligns,mid_cluster) fig_bad_e = plot_rmse_cluster(bad_data,words,seg_aligns,bad_cluster) return best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e # since clustering strictly operates on X, # once reduce a duration metric down to pair-distances, # it no longer matters that duration and pitch/energy had different dimensionality # TODO option to dtw on 3 feats pitch/ener/dur separately # check if possible cluster with 3dim distance mat? # or can it not take that input in multidimensional space # then the 3 dists can still be averaged to flatten, if appropriately scaled def cluster(norm_sent,orig_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, tts_dir, voices, start_end_word_index): h_spk_ids = sorted(h_spk_ids) nsents = len(h_spk_ids) words, data, seg_aligns = get_data(norm_sent,h_spk_ids, h_align_dir, h_f0_dir, h_wav_dir, start_end_word_index) dtw_dists = pair_dists(data,words,h_spk_ids) kmedoids_cluster_dists = [] X = [d[1] for d in dtw_dists] X = [X[i:i+nsents] for i in range(0, len(X), nsents)] X = np.array(X) y_km, kmedoids = kmedoids_clustering(X) #plot_clusters(X, y_km, words) #c1, c2, c3 = [X[np.where(kmedoids.labels_ == i)] for i in range(3)] result = zip(X, kmedoids.labels_) groups = [[r,c] for r,c in zip(h_spk_ids,kmedoids.labels_)] # tts: assume the first 64 chars of sentence are enough tdir = f'{tts_dir}{orig_sent.replace(" ","_")[:65]}/' for v in voices: tts_data, tts_align = get_tts_data(tdir,v,start_end_word_index) # match the data with a cluster ----- best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e = match_tts(groups, data, tts_data, tts_align, words, seg_aligns,v) # only supports one voice at a time currently return best_cluster_score, tts_fig_p, fig_mid_p, fig_bad_p, tts_fig_e, fig_mid_e, fig_bad_e #return words, kmedoids_cluster_dists, group # TODO there IS sth for making tts_data # but im probably p much on my own rlly for that. # TODO this one is v v helpful. # but mind if i adjusted a dictionaries earlier. def spks_all_cdist(): speaker_to_tts_dtw_dists = defaultdict(list) for key1, value1 in data.items(): d = key1.split("-") words1 = d[:-2] id1, id2 = d[-2], d[-1] for key2, value2 in tts_data.items(): d = key2.split("-") words2 = d[:-2] id3, id4 = d[-2], d[-1] if all([w1 == w2 for w1, w2 in zip(words1, words2)]): speaker_to_tts_dtw_dists[f"{'-'.join(words1)}"].append((f"{id1}-{id2}_{id3}-{id4}", dtw_distance(value1, value2))) return speaker_to_tts_dtw_dists #TODO i think this is also gr8 # but like figure out how its doing # bc dict format and stuff, # working keying by word index instead of word text, *********** # and for 1 wd or 3+ wd units... def tts_cdist(): tts_dist_to_cluster = defaultdict(list) for words1, datas1 in kmedoids_cluster_dists.items(): for d1 in datas1: cluster, sp_id1, arr = d1 for words2, datas2 in speaker_to_tts_dtw_dists.items(): for d2 in datas2: ids, dist = d2 sp_id2, tts_alfur = ids.split("_") if sp_id1 == sp_id2 and words1 == words2: tts_dist_to_cluster[f"{words1}-{cluster}"].append(dist) tts_mean_dist_to_cluster = { key: np.mean(value) for key, value in tts_dist_to_cluster.items() } return tts_mean_dist_to_cluster # TODO check if anything uses this? def get_audio_part(start_time, end_time, id, path): """ Returns a dictionary of RMSE values for a given sentence. """ f = os.path.join(path, id + ".wav") audio, sr = librosa.load(f, sr=16000) segment = audio[int(np.floor(start_time * sr)):int(np.ceil(end_time * sr))] return segment def plot_pitch_tts(speech_data,tts_data, tts_align,words,seg_aligns,cluster_id, voice): colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] cc = 0 fig = plt.figure(figsize=(10, 5)) plt.title(f"{words} - Pitch - Cluster {cluster_id}") for k,v in speech_data.items(): spk = k.split('**')[1] word_times = seg_aligns[k] pitches = [p for p,e in v] # datapoint interval is 0.005 seconds pitch_xvals = [x*0.005 for x in range(len(pitches))] # centre around the first word boundary - # if 3+ words, too bad. if len(word_times)>1: realign = np.mean([word_times[0][2],word_times[1][1]]) pitch_xvals = [x - realign for x in pitch_xvals] word_times = [(w,s-realign,e-realign) for w,s,e in word_times] plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") if len(word_times)>2: for i in range(1,len(word_times)-1): bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") plt.scatter(pitch_xvals, pitches, color=colors[cc], label=f"Speaker {spk}") cc += 1 if cc >= len(colors): cc=0 tpitches = [p for p,e in tts_data] t_xvals = [x*0.005 for x in range(len(tpitches))] if len(tts_align)>1: realign = tts_align[1][1] t_xvals = [x - realign for x in t_xvals] tts_align = [(w,s-realign) for w,s in tts_align] if len(tts_align)>2: for i in range(2,len(tts_align)): bound_line = tts_align[i][1] plt.axvline(x=bound_line, color="black", linestyle='--', linewidth=1, label=f"TTS -> {tts_align[i][0]}") plt.scatter(t_xvals, tpitches, color="black", label=f"TTS {voice}") #plt.legend() #plt.show() return fig def plot_pitch_cluster(speech_data,words,seg_aligns,cluster_id): colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] cc = 0 fig = plt.figure(figsize=(8, 4)) plt.title(f"{words} - Pitch - Cluster {cluster_id}") for k,v in speech_data.items(): spk = k.split('**')[1] word_times = seg_aligns[k] pitches = [p for p,e in v] # datapoint interval is 0.005 seconds pitch_xvals = [x*0.005 for x in range(len(pitches))] # centre around the first word boundary - # if 3+ words, too bad. if len(word_times)>1: realign = np.mean([word_times[0][2],word_times[1][1]]) pitch_xvals = [x - realign for x in pitch_xvals] word_times = [(w,s-realign,e-realign) for w,s,e in word_times] plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") if len(word_times)>2: for i in range(1,len(word_times)-1): bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") plt.scatter(pitch_xvals, pitches, color=colors[cc], label=f"Speaker {spk}") cc += 1 if cc >= len(colors): cc=0 #plt.legend() #plt.show() return fig def plot_rmse_tts(speech_data,tts_data, tts_align,words,seg_aligns,cluster_id, voice): colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] cc = 0 fig = plt.figure(figsize=(10, 5)) plt.title(f"{words} - Energy - Cluster {cluster_id}") for k,v in speech_data.items(): spk = k.split('**')[1] word_times = seg_aligns[k] rmse = [e for p,e in v] # datapoint interval is 0.005 seconds rmse_xvals = [x*0.005 for x in range(len(rmse))] # centre around the first word boundary - # if 3+ words, too bad. if len(word_times)>1: realign = np.mean([word_times[0][2],word_times[1][1]]) rmse_xvals = [x - realign for x in rmse_xvals] word_times = [(w,s-realign,e-realign) for w,s,e in word_times] plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") if len(word_times)>2: for i in range(1,len(word_times)-1): bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") plt.plot(rmse_xvals, rmse, color=colors[cc], label=f"Speaker {spk}") cc += 1 if cc >= len(colors): cc=0 trmse = [e for p,e in tts_data] t_xvals = [x*0.005 for x in range(len(trmse))] if len(tts_align)>1: realign = tts_align[1][1] t_xvals = [x - realign for x in t_xvals] tts_align = [(w,s-realign) for w,s in tts_align] if len(tts_align)>2: for i in range(2,len(tts_align)): bound_line = tts_align[i][1] plt.axvline(x=bound_line, color="black", linestyle='--', linewidth=1, label=f"TTS -> {tts_align[i][0]}") plt.plot(t_xvals, trmse, color="black", label=f"TTS {voice}") #plt.legend() #plt.show() return fig def plot_rmse_cluster(speech_data,words,seg_aligns,cluster_id): colors = ["red", "green", "blue", "orange", "purple", "pink", "brown", "gray", "cyan"] cc = 0 fig = plt.figure(figsize=(10, 5)) plt.title(f"{words} - Energy - Cluster {cluster_id}") for k,v in speech_data.items(): spk = k.split('**')[1] word_times = seg_aligns[k] rmse = [e for p,e in v] # datapoint interval is 0.005 seconds rmse_xvals = [x*0.005 for x in range(len(rmse))] # centre around the first word boundary - # if 3+ words, too bad. if len(word_times)>1: realign = np.mean([word_times[0][2],word_times[1][1]]) rmse_xvals = [x - realign for x in rmse_xvals] word_times = [(w,s-realign,e-realign) for w,s,e in word_times] plt.axvline(x= 0, color="gray", linestyle='--', linewidth=1, label=f"{word_times[0][0]} -> {word_times[1][0]} boundary") if len(word_times)>2: for i in range(1,len(word_times)-1): bound_line = np.mean([word_times[i][2],word_times[i+1][1]]) plt.axvline(x=bound_line, color=colors[cc], linestyle='--', linewidth=1, label=f"Speaker {spk} -> {word_times[i+1][0]}") plt.plot(rmse_xvals, rmse, color=colors[cc], label=f"Speaker {spk}") cc += 1 if cc >= len(colors): cc=0 return fig # want to: # - find tts best cluster # - find avg dist for tts in that cluster # - find avg dist for any human to the rest of its cluster # see near end of notebook for v nice way to grab timespans of tts audio # (or just the start/end timestamps to mark them) from alignment json # based on word position index - # so probably really do show user the sentence with each word numbered. # THEN there is - # \# Plot pitch, rmse, and spectral centroid for each word combination for each speaker # - this is one persontoken per graph and has a word division line - idk if works >2 wds. # it might be good to do this for tts at least, eh # Plot pitch values for each word combination for each speaker in each cluster (with word boundaries) # - multi speakers (one cluster) per graph - this will be good to show, with tts on top. # i may want to recentre it around wd bound. at least if only 2 wds. # well i could just pick, like, it will be centred around the 1st wboundary & good luck if more. # - the same as above, but rmse # go all the way to the bottom to see gphs with a tts added on to one cluster. # will need: # the whole sentence text (index, word) pairs # the indices of units the user wants # human meta db of all human recordings # tts dir, human wav + align + f0 dirs # list of tts voices # an actual wav file for each human rec, probably # params like: use f0, use rmse, (use dur), [.....] # .. check. def plot_clusters(X, y, word): u_labels = np.unique(y) # plot the results for i in u_labels: plt.scatter(X[y == i, 0], X[y == i, 1], label=i) plt.title(word) plt.legend() plt.show()