svincoff's picture
dependencies and embedding_exploration benchmark
c43fbc6
import pandas as pd
import numpy as np
import pickle
from sklearn.manifold import TSNE
import matplotlib.font_manager as fm
from matplotlib.font_manager import FontProperties
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib.patches as patches
import seaborn as sns
import umap
import os
from fuson_plm.benchmarking.embed import embed_dataset_for_benchmark
import fuson_plm.benchmarking.embedding_exploration.config as config
from fuson_plm.utils.visualizing import set_font
from fuson_plm.utils.constants import TCGA_CODES, FODB_CODES, VALID_AAS, DELIMITERS
from fuson_plm.utils.logging import get_local_time, open_logfile, log_update, print_configpy
def get_dimred_embeddings(embeddings, dimred_type="umap"):
if dimred_type=="umap":
dimred_embeddings = get_umap_embeddings(embeddings)
return dimred_embeddings
if dimred_type=="tsne":
dimred_embeddings = get_tsne_embeddings(embeddings)
return dimred_embeddings
def get_tsne_embeddings(embeddings):
embeddings = np.array(embeddings)
tsne = TSNE(n_components=2, random_state=42,perplexity=5)
tsne_embeddings = tsne.fit_transform(embeddings)
return tsne_embeddings
def get_umap_embeddings(embeddings):
embeddings = np.array(embeddings)
umap_model = umap.UMAP(n_components=2, random_state=42, n_neighbors=15, metric='euclidean') # default parameters for UMAP
umap_embeddings = umap_model.fit_transform(embeddings)
return umap_embeddings
def plot_half_filled_circle(ax, x, y, left_color, right_color, size=100):
"""
Plots a circle filled in halves with specified colors.
Parameters:
- ax: Matplotlib axis to draw on.
- x, y: Coordinates of the marker.
- left_color: Color of the left half.
- right_color: Color of the right half.
- size: Size of the marker.
"""
radius = (size ** 0.5) / 100 # Scale the radius
# Create left half-circle (0° to 180°)
left_half = patches.Wedge((x, y), radius, 90, 270, color=left_color, ec="black")
# Create right half-circle (180° to 360°)
right_half = patches.Wedge((x, y), radius, 270, 90, color=right_color, ec="black")
# Add both halves to the plot
ax.add_patch(left_half)
ax.add_patch(right_half)
def plot_umap_scatter_tftf_kk(df, filename="umap.png"):
"""
Plots a 2D scatterplot of UMAP coordinates with different markers and colors based on 'type'.
Only for TF::TF and Kinase::Kinase fusions
Parameters:
- df (pd.DataFrame): DataFrame containing 'umap1', 'umap2', 'sequence', and 'type' columns.
"""
set_font()
# Define colors for each type
colors = {
"TF": "pink",
"Kinase": "orange"
}
# Define marker types and colors for each combination
marker_colors = {
"TF::TF": colors["TF"],
"Kinase::Kinase": colors["Kinase"],
}
# Create the plot
fig, ax = plt.subplots(figsize=(10, 8))
x_min, x_max = df["umap1"].min() - 1, df["umap1"].max() + 1
y_min, y_max = df["umap2"].min() - 1, df["umap2"].max() + 1
ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)
# Plot each point with the specified half-filled marker
for i in range(len(df)):
row = df.iloc[i]
marker_type = row["fusion_type"]
x, y = row["umap1"], row["umap2"]
color = marker_colors[marker_type]
ax.scatter(x, y, color=color, s=15, edgecolors="black", linewidth=0.5)
# Add custom legend
legend_elements = [
patches.Patch(facecolor="pink", edgecolor="black", label="TF::TF"),
patches.Patch(facecolor="orange", edgecolor="black", label="Kinase::Kinase")
]
ax.legend(handles=legend_elements, title="Fusion Type", fontsize=16, title_fontsize=16)
# Add labels and title
plt.xlabel("UMAP 1", fontsize=20)
plt.ylabel("UMAP 2", fontsize=20)
plt.title("FusOn-pLM-embedded Transcription Factor and Kinase Fusions", fontsize=20)
plt.tight_layout()
# Save and show the plot
plt.savefig(filename, dpi=300)
plt.show()
def plot_umap_scatter_half_filled(df, filename="umap.png"):
"""
Plots a 2D scatterplot of UMAP coordinates with different markers and colors based on 'type'.
Parameters:
- df (pd.DataFrame): DataFrame containing 'umap1', 'umap2', 'sequence', and 'type' columns.
"""
# Define colors for each type
colors = {
"TF": "pink",
"Kinase": "orange",
"Other": "grey"
}
# Define marker types and colors for each combination
marker_colors = {
"TF::TF": {"left": colors["TF"], "right": colors["TF"]},
"TF::Other": {"left": colors["TF"], "right": colors["Other"]},
"Other::TF": {"left": colors["Other"], "right": colors["TF"]},
"Kinase::Kinase": {"left": colors["Kinase"], "right": colors["Kinase"]},
"Kinase::Other": {"left": colors["Kinase"], "right": colors["Other"]},
"Other::Kinase": {"left": colors["Other"], "right": colors["Kinase"]},
"Kinase::TF": {"left": colors["Kinase"], "right": colors["TF"]},
"TF::Kinase": {"left": colors["TF"], "right": colors["Kinase"]},
"Other::Other": {"left": colors["Other"], "right": colors["Other"]}
}
# Create the plot
fig, ax = plt.subplots(figsize=(10, 8))
x_min, x_max = df["umap1"].min() - 1, df["umap1"].max() + 1
y_min, y_max = df["umap2"].min() - 1, df["umap2"].max() + 1
ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)
# Plot each point with the specified half-filled marker
for i in range(len(df)):
row = df.iloc[i]
marker_type = row["fusion_type"]
x, y = row["umap1"], row["umap2"]
left_color = marker_colors[marker_type]["left"]
right_color = marker_colors[marker_type]["right"]
plot_half_filled_circle(ax, x, y, left_color, right_color, size=100)
# Add custom legend
legend_elements = [
patches.Patch(facecolor="pink", edgecolor="black", label="TF"),
patches.Patch(facecolor="orange", edgecolor="black", label="Kinase"),
patches.Patch(facecolor="grey", edgecolor="black", label="Other")
]
ax.legend(handles=legend_elements, title="Type")
# Add labels and title
plt.xlabel("UMAP 1")
plt.ylabel("UMAP 2")
plt.title("UMAP Scatter Plot")
plt.tight_layout()
# Save and show the plot
plt.savefig(filename, dpi=300)
plt.show()
def get_gene_type(gene, d):
if gene in d:
if d[gene] == 'kinase':
return 'Kinase'
if d[gene] == 'tf':
return 'TF'
else:
return 'Other'
def get_tf_and_kinase_fusions_dataset():
# Load TF and Kinase Fusions
tf_kinase_parts = pd.read_csv("data/salokas_2020_tableS3.csv")
print(tf_kinase_parts)
ht_tf_kinase_dict = dict(zip(tf_kinase_parts['Gene'],tf_kinase_parts['Kinase or TF']))
# This one has each row with one fusiongene name
fuson_ht_db = pd.read_csv("../../data/blast/fuson_ht_db.csv")
fuson_ht_db[['hg','tg']] = fuson_ht_db['fusiongenes'].str.split("::",expand=True)
fuson_ht_db['hg_type'] = fuson_ht_db['hg'].apply(lambda x: get_gene_type(x, ht_tf_kinase_dict))
fuson_ht_db['tg_type'] = fuson_ht_db['tg'].apply(lambda x: get_gene_type(x, ht_tf_kinase_dict))
fuson_ht_db['fusion_type'] = fuson_ht_db['hg_type']+'::'+fuson_ht_db['tg_type']
fuson_ht_db['type']=['fusion']*len(fuson_ht_db)
# Keep 100 things in each category
categories = pd.DataFrame(fuson_ht_db['fusion_type'].value_counts()).reset_index()['index'].tolist()
categories = ["TF::TF","Kinase::Kinase"] # manually set some easier categories
print(categories)
plot_df = None
for i, cat in enumerate(categories):
random_sample = fuson_ht_db.loc[fuson_ht_db['fusion_type']==cat].reset_index(drop=True)
#random_sample = random_sample.sample(n=100, random_state=1).reset_index(drop=True)
if i==0:
plot_df = random_sample
else:
plot_df = pd.concat([plot_df,random_sample],axis=0).reset_index(drop=True)
print(plot_df['fusion_type'].value_counts())
# Now, need to add in the embeddings
plot_df = plot_df[['aa_seq','fusiongenes','fusion_type','type']].rename(
columns={'aa_seq':'sequence','fusiongenes':'ID'}
)
return plot_df
def make_tf_and_kinase_fusions_plot(seqs_with_embeddings, savedir = '', dimred_type='umap'):
fuson_db = pd.read_csv("../../data/fuson_db.csv")
seq_id_dict = dict(zip(fuson_db['aa_seq'],fuson_db['seq_id']))
# add sequences so we can save results/sequence
data = seqs_with_embeddings[[f'{dimred_type}1',f'{dimred_type}2','sequence','fusion_type','ID']]
data['seq_id'] = data['sequence'].map(seq_id_dict)
tfkinase_save_dir = f"{savedir}"
os.makedirs(tfkinase_save_dir,exist_ok=True)
data.to_csv(f"{tfkinase_save_dir}/{dimred_type}_tf_and_kinase_fusions_source_data.csv",index=False)
plot_umap_scatter_tftf_kk(data,filename=f"{tfkinase_save_dir}/{dimred_type}_tf_and_kinase_fusions_visualization.png")
def tf_and_kinase_fusions_plot(dimred_types, output_dir):
"""
Makes the embeddings, THEN calls the plot. only on the four favorites
"""
plot_df = get_tf_and_kinase_fusions_dataset()
plot_df.to_csv("data/tf_and_kinase_fusions.csv",index=False)
# path to the pkl file with FOdb embeddings
input_fname='tf_and_kinase'
all_embedding_paths = embed_dataset_for_benchmark(
fuson_ckpts=config.FUSON_PLM_CKPT,
input_data_path='data/tf_and_kinase_fusions.csv', input_fname=input_fname,
average=True, seq_col='sequence',
benchmark_fusonplm=True,
benchmark_esm=False,
benchmark_fo_puncta_ml=False,
overwrite=config.PERMISSION_TO_OVERWRITE)
# For each of the models we are benchmarking, load embeddings and make plots
log_update("\nEmbedding sequences")
# loop through the embedding paths and train each one
for embedding_path, details in all_embedding_paths.items():
log_update(f"\tBenchmarking embeddings at: {embedding_path}")
try:
with open(embedding_path, "rb") as f:
embeddings = pickle.load(f)
except:
raise Exception(f"Cannot read embeddings from {embedding_path}")
# combine the embeddings and splits into one dataframe
seqs_with_embeddings = pd.DataFrame.from_dict(embeddings.items())
seqs_with_embeddings = seqs_with_embeddings.rename(columns={0: 'sequence', 1: 'embedding'}) # the column that was called FusOn-pLM is now called embedding
seqs_with_embeddings = pd.merge(seqs_with_embeddings, plot_df, on='sequence', how='inner')
# get UMAP transform of the embeddings
for dimred_type in dimred_types:
dimred_embeddings = get_dimred_embeddings(seqs_with_embeddings['embedding'].tolist(),dimred_type=dimred_type)
# turn the result into a dataframe, and add it to seqs_with_embeddings
data = pd.DataFrame(dimred_embeddings, columns=[f'{dimred_type}1', f'{dimred_type}2'])
# save the umap data!
model_name = "_".join(embedding_path.split('embeddings/')[1].split('/')[1:-1])
seqs_with_embeddings[[f'{dimred_type}1', f'{dimred_type}2']] = data
# make subdirectory
intermediate = '/'.join(embedding_path.split('embeddings/')[1].split('/')[0:-1])
cur_output_dir = f"{output_dir}/{dimred_type}_plots/{intermediate}/{input_fname}"
os.makedirs(cur_output_dir,exist_ok=True)
make_tf_and_kinase_fusions_plot(seqs_with_embeddings, savedir = cur_output_dir, dimred_type=dimred_type)
def make_fusion_v_parts_favorites_plot(seqs_with_embeddings, savedir = None, dimred_type='umap'):
"""
Make plots showing that PAX3::FOXO1, EWS::FLI1, SS18::SSX1, EML4::ALK are embedded distinctly from their heads and tails
"""
set_font()
# Load one sequence each for four proteins in the test set: PAX3::FOXO1, EWS::FLI1, SS18::SSX1, EML4::ALK
data = pd.read_csv("data/top_genes.csv")
seqs_with_embeddings = pd.merge(seqs_with_embeddings, data, on="sequence")
seqs_with_embeddings["Type"] = [""]*len(seqs_with_embeddings)
seqs_with_embeddings.loc[
seqs_with_embeddings["gene"].str.contains("::"),"Type"
] = "fusion_embeddings"
heads = seqs_with_embeddings.loc[seqs_with_embeddings["gene"].str.contains("::")]["gene"].str.split("::",expand=True)[0].tolist()
tails = seqs_with_embeddings.loc[seqs_with_embeddings["gene"].str.contains("::")]["gene"].str.split("::",expand=True)[1].tolist()
seqs_with_embeddings.loc[
seqs_with_embeddings["gene"].isin(heads),"Type"
] = "h_embeddings"
seqs_with_embeddings.loc[
seqs_with_embeddings["gene"].isin(tails),"Type"
] = "t_embeddings"
# make merge
merge = seqs_with_embeddings.loc[seqs_with_embeddings['gene'].str.contains('::')].reset_index(drop=True)[['gene','sequence']]
merge["head"] = merge["gene"].str.split("::",expand=True)[0]
merge["tail"] = merge["gene"].str.split("::",expand=True)[1]
merge = pd.merge(merge, seqs_with_embeddings[['gene','sequence']].rename(
columns={'gene': 'head', 'sequence': 'h_sequence'}),
on='head',how='left'
)
merge = pd.merge(merge, seqs_with_embeddings[['gene','sequence']].rename(
columns={'gene': 'tail', 'sequence': 't_sequence'}),
on='tail',how='left'
)
plt.figure()
# Define colors and markers
colors = {
'fusion_embeddings': '#cf9dfa', # old color #0C4A4D
'h_embeddings': '#eb8888', # Updated to original names; old color #619283
't_embeddings': '#5fa3e3', # Updated to original names; old color #619283
}
markers = {
'fusion_embeddings': 'o',
'h_embeddings': '^', # Updated to original names
't_embeddings': 'v' # Updated to original names
}
label_map = {
'fusion_embeddings': 'Fusion',
'h_embeddings': 'Head', # Updated label
't_embeddings': 'Tail', # Updated label
}
# Create a 2x3 grid of plots
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
#fig, axes = plt.subplots(1, 4, figsize= (18, 7))
# Get the global min and max for the x and y axis ranges
all_tsne1 = seqs_with_embeddings[f'{dimred_type}1']
all_tsne2 = seqs_with_embeddings[f'{dimred_type}2']
x_min, x_max = all_tsne1.min(), all_tsne1.max()
y_min, y_max = all_tsne2.min(), all_tsne2.max()
x_min, x_max = [11, 16] # manually set range for cleaner plotting
y_min, y_max = [10, 22]
# Determine tick positions
x_ticks = np.arange(x_min, x_max + 1, 1)
y_ticks = np.arange(y_min, y_max + 1, 1)
# Flatten the axes array for easier iteration
axes = axes.flatten()
for i, ax in enumerate(axes):
# Extract the gene names from the current row
fgene_name = merge.loc[i, 'gene']
hgene = merge.loc[i, 'head']
tgene = merge.loc[i, 'tail']
# Filter tsne_embeddings for the relevant entries
tsne_data = seqs_with_embeddings[seqs_with_embeddings['gene'].isin([fgene_name, hgene, tgene])]
# Plot each type
for emb_type in tsne_data['Type'].unique():
subset = tsne_data[tsne_data['Type'] == emb_type]
ax.scatter(subset[f'{dimred_type}1'], subset[f'{dimred_type}2'], label=label_map[emb_type], color=colors[emb_type], marker=markers[emb_type], s=120, zorder=3)
ax.set_title(f'{fgene_name}',fontsize=44)
label_transform = {
'tsne': 't-SNE',
'umap': 'UMAP'
}
ax.set_xlabel(f'{label_transform[dimred_type]} 1',fontsize=44)
ax.set_ylabel(f'{label_transform[dimred_type]} 2',fontsize=44)
ax.grid(True, which='both', linestyle='--', linewidth=0.5, color='gray', zorder=1)
# Set the same limits and ticks for all axes
ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)
ax.set_xticks(x_ticks)#\\, labelsize=24)
ax.set_yticks(y_ticks)#, labelsize=24)
# Rotate x-axis labels
ax.set_xticklabels(ax.get_xticks(), rotation=45, ha='right')
ax.tick_params(axis='x', labelsize=16)
ax.tick_params(axis='y', labelsize=16)
for label in ax.get_xticklabels():
label.set_fontsize(24)
for label in ax.get_yticklabels():
label.set_fontsize(24)
# Set font size for the legend if needed
if i == 0:
legend = ax.legend(fontsize=20, markerscale=2, loc='best')
for text in legend.get_texts():
text.set_fontsize(24)
# Adjust layout to prevent overlap
plt.tight_layout()
# Show the plot
plt.show()
# Save the figure
plt.savefig(f'{savedir}/{dimred_type}_favorites_visualization.png', dpi=300)
# Save the data
seq_to_id_dict = pd.read_csv("../../data/fuson_db.csv")
seq_to_id_dict = dict(zip(seq_to_id_dict['aa_seq'],seq_to_id_dict['seq_id']))
seqs_with_embeddings['seq_id'] = seqs_with_embeddings['sequence'].map(seq_to_id_dict)
seqs_with_embeddings[['umap1','umap2','sequence','Type','gene','id','seq_id']].to_csv(f"{savedir}/{dimred_type}_favorites_source_data.csv",index=False)
def fusion_v_parts_favorites(dimred_types, output_dir):
"""
Makes the embeddings, THEN calls the plot. only on the four favorites
"""
# path to the pkl file with FOdb embeddings
input_fname='favorites'
all_embedding_paths = embed_dataset_for_benchmark(
fuson_ckpts=config.FUSON_PLM_CKPT,
input_data_path='data/top_genes.csv', input_fname=input_fname,
average=True, seq_col='sequence',
benchmark_fusonplm=True,
benchmark_esm=False,
benchmark_fo_puncta_ml=False,
overwrite=config.PERMISSION_TO_OVERWRITE)
# For each of the models we are benchmarking, load embeddings and make plots
log_update("\nEmbedding sequences")
# loop through the embedding paths and train each one
for embedding_path, details in all_embedding_paths.items():
log_update(f"\tBenchmarking embeddings at: {embedding_path}")
try:
with open(embedding_path, "rb") as f:
embeddings = pickle.load(f)
except:
raise Exception(f"Cannot read embeddings from {embedding_path}")
# combine the embeddings and splits into one dataframe
seqs_with_embeddings = pd.DataFrame.from_dict(embeddings.items())
seqs_with_embeddings = seqs_with_embeddings.rename(columns={0: 'sequence', 1: 'embedding'}) # the column that was called FusOn-pLM is now called embedding
# get UMAP transform of the embeddings
for dimred_type in dimred_types:
dimred_embeddings = get_dimred_embeddings(seqs_with_embeddings['embedding'].tolist(),dimred_type=dimred_type)
# turn the result into a dataframe, and add it to seqs_with_embeddings
data = pd.DataFrame(dimred_embeddings, columns=[f'{dimred_type}1', f'{dimred_type}2'])
# save the umap data!
model_name = "_".join(embedding_path.split('embeddings/')[1].split('/')[1:-1])
seqs_with_embeddings[[f'{dimred_type}1', f'{dimred_type}2']] = data
# make subdirectory
intermediate = '/'.join(embedding_path.split('embeddings/')[1].split('/')[0:-1])
cur_output_dir = f"{output_dir}/{dimred_type}_plots/{intermediate}/{input_fname}"
os.makedirs(cur_output_dir,exist_ok=True)
make_fusion_v_parts_favorites_plot(seqs_with_embeddings, savedir = cur_output_dir, dimred_type=dimred_type)
def main():
# make directory to save results
os.makedirs('results',exist_ok=True)
output_dir = f'results/{get_local_time()}'
os.makedirs(output_dir,exist_ok=True)
dimred_types = []
if config.PLOT_UMAP:
dimred_types.append("umap")
#os.makedirs(f"{output_dir}/umap_data",exist_ok=True)
os.makedirs(f"{output_dir}/umap_plots",exist_ok=True)
if config.PLOT_TSNE:
dimred_types.append("tsne")
#os.makedirs(f"{output_dir}/tsne_data",exist_ok=True)
os.makedirs(f"{output_dir}/tsne_plots",exist_ok=True)
with open_logfile(f'{output_dir}/embedding_exploration_log.txt'):
print_configpy(config)
# make the disinct embeddings plot
fusion_v_parts_favorites(dimred_types, output_dir)
tf_and_kinase_fusions_plot(dimred_types, output_dir)
if __name__ == "__main__":
main()