|
import os |
|
import math |
|
|
|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
from tqdm.auto import tqdm |
|
from rdkit.ML.Scoring.Scoring import CalcBEDROC |
|
from sklearn.metrics import accuracy_score, balanced_accuracy_score, roc_auc_score, average_precision_score, \ |
|
matthews_corrcoef, precision_score, recall_score, f1_score, confusion_matrix |
|
|
|
def specificity_score(true_labels, predicted_labels): |
|
tn, fp, _, _ = confusion_matrix(true_labels, predicted_labels).ravel() |
|
specificity = tn / (tn + fp) |
|
return specificity |
|
|
|
MAIN_DIR = '' |
|
|
|
def balanced_mcc_score(sensitivity, specificity, prevalence): |
|
"""Returns the Matthews' correlation coefficient at the given |
|
sensitivity, specificity and prevalence. |
|
|
|
Parameters |
|
---------- |
|
sensitivity : float |
|
The sensitivity of the model |
|
specificity : float |
|
The specificity of the model |
|
prevalence : float |
|
The prevalence of the test set |
|
|
|
Returns |
|
------ |
|
float |
|
Matthews' correlation coefficient as a float |
|
""" |
|
numerator = sensitivity + specificity - 1 |
|
denominatorFirstTerm = sensitivity + (1 - specificity)*(1 - prevalence) / prevalence |
|
denominatorSecondTerm = specificity + (1 -sensitivity)*prevalence/(1 - prevalence) |
|
denominator = math.sqrt(denominatorFirstTerm * denominatorSecondTerm) |
|
|
|
if sensitivity == 1 and specificity == 0: |
|
denominator = 1 |
|
if sensitivity == 0 and specificity == 1: |
|
denominator = 1. |
|
|
|
return(numerator / denominator) |
|
|
|
def ef_top_per(predictions, prevalance, top_frac=0.01): |
|
|
|
n = int(len(predictions) * top_frac) |
|
predictions = sorted(predictions, reverse=True)[:n] |
|
f = np.sum(np.round(predictions)) / n |
|
return f / prevalance |
|
|
|
def compute_metrics(df): |
|
""" |
|
Compute a set of classification metric for single set of predictions. |
|
|
|
Args: |
|
df : dataframe with true labels in 'Label' column and probabilistic predictions in 'Prediction' column |
|
|
|
Returns: |
|
df_metrics: dataframe with metrics in 'Metric' column and values in 'Value' column |
|
""" |
|
true_labels = df['Label'] |
|
prevalance = sum(true_labels) / len(true_labels) |
|
predictions = df['Prediction'] |
|
|
|
|
|
|
|
|
|
acc = accuracy_score(true_labels, predictions.round()) |
|
bacc = balanced_accuracy_score(true_labels, predictions.round()) |
|
precision = precision_score(true_labels, predictions.round(), zero_division=0.0) |
|
recall = recall_score(true_labels, predictions.round()) |
|
specificity = specificity_score(true_labels, predictions.round()) |
|
mcc = matthews_corrcoef(true_labels, predictions.round()) |
|
bmcc = balanced_mcc_score(recall, specificity, prevalance) |
|
f1 = f1_score(true_labels, predictions.round()) |
|
|
|
auc = roc_auc_score(true_labels, predictions) |
|
ap = average_precision_score(true_labels, predictions) |
|
dap = ap - prevalance |
|
scores = df.sort_values(by='Prediction', ascending=False)[['Label', 'Prediction']].values |
|
bedroc = CalcBEDROC(scores, 0, 20) |
|
ef = ef_top_per(predictions, prevalance, 0.01) |
|
|
|
metrics_dict = {'ACC': acc, 'BACC': bacc, 'MCC': mcc, 'BMCC': bmcc, 'Precision': precision, 'Recall': recall, 'F1-score': f1, |
|
'AUC': auc, 'dAP': dap, 'BEDROC': bedroc, 'EF-1%' : ef} |
|
df_metrics = pd.DataFrame(metrics_dict.items(), columns=['Metric', 'Value']) |
|
|
|
|
|
return df_metrics |
|
|
|
|
|
def get_metrics( |
|
tasks : list[str] = ['AID', 'UID'], |
|
models : list[str] = ['MHNfs', 'RF'], |
|
settings : list[str] = ['1+1x3', '1+3x3', '1+7x3', '2+2x3', '2+6x3', '2+14x3', '4+4x3', '4+12x3', '4+28x3', '8+8x3', '8+24x3', '8+56x3'], |
|
overwrite: bool = False): |
|
|
|
""" |
|
Computes classification metrics for each combination. |
|
""" |
|
|
|
file = f'{MAIN_DIR}/results_used.csv.gz' |
|
|
|
if overwrite: |
|
df = pd.DataFrame() |
|
else: |
|
df = pd.read_csv(file) |
|
|
|
path_preprocessed = "" |
|
df_pubchem = pd.read_csv(path_preprocessed) |
|
|
|
for task in tasks: |
|
for model in models: |
|
for setting in settings: |
|
dir = f'{MAIN_DIR}/predictions/{model}/{task}/{setting}' |
|
try: |
|
targets = [x[:-4] for x in os.listdir(dir)] |
|
pubchem_targets = df_pubchem[task].astype(str).unique().tolist() |
|
|
|
for target in tqdm(targets, desc=f'{task} - {model} - {setting}'): |
|
|
|
if target not in pubchem_targets: |
|
continue |
|
|
|
|
|
if not overwrite and any((df['Model'] == model) & (df['Setting'] == setting) & (df['Task'] == task) & (df['TID'] == target)): |
|
continue |
|
|
|
|
|
df_task = pd.read_csv(f'{dir}/{target}.csv') |
|
|
|
|
|
try: |
|
org = df_pubchem.loc[df_pubchem[task] == target, 'Organism'].values[0] |
|
l1 = df_pubchem.loc[df_pubchem[task] == target, 'L1'].values[0] |
|
except: |
|
org = df_pubchem.loc[df_pubchem[task] == int(target), 'Organism'].values[0] |
|
l1 = df_pubchem.loc[df_pubchem[task] == int(target), 'L1'].values[0] |
|
if l1 == None: |
|
print(target, l1) |
|
|
|
|
|
for fold in df_task.Fold.unique(): |
|
metrics = (compute_metrics(df_task[df_task.Fold == fold]).assign( |
|
Model=model, Task=task, TID=target, Organism=org, L1=l1, Setting=setting, Fold=fold, |
|
) |
|
).rename(columns={'Target' : task}) |
|
df = pd.concat([df, metrics], ignore_index=True) |
|
except Exception as e: |
|
print(e) |
|
raise e |
|
|
|
df.to_csv(file, index=False) |
|
|
|
if __name__ == '__main__': |
|
|
|
get_metrics(settings=['1+7x3', '2+6x3', '4+4x3', '2+14x3', '4+12x3','8+8x3'], overwrite=True) |
|
|