File size: 8,450 Bytes
a6b1d00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import numpy as np
import pandas as pd
from typing import Callable, Union
from sklearn.metrics import (
    roc_auc_score,
    brier_score_loss,
    log_loss,
    accuracy_score,
    f1_score,
    average_precision_score,
)
from sklearn.calibration import calibration_curve
from fairlearn.metrics import make_derived_metric

true_positive_score = lambda y_true, y_pred: (y_true & y_pred).sum() / y_true.sum()
false_positive_score = (
    lambda y_true, y_pred: ((1 - y_true) & y_pred).sum() / ((1 - y_true)).sum()
)
false_negative_score = lambda y_true, y_pred: 1 - true_positive_score(y_true, y_pred)
Y_PRED_METRICS = (
    "auprc_diff",
    "auprc_ratio",
    "acc_diff",
    "f1_diff",
    "f1_ratio",
    "equalized_odds_diff",
    "equalized_odds_ratio",
)


def average_log_loss_score(y_true, y_pred):
    """Average log loss function."""
    return np.mean(log_loss(y_true, y_pred))


def miscalibration_score(y_true, y_pred, n_bins=10):
    """Miscalibration score. Calibration is the difference between the predicted and the true probability of the positive class."""
    prob_true, prob_pred = calibration_curve(y_true, y_pred, n_bins=n_bins)
    return np.mean(np.abs(prob_true - prob_pred))


def get_qf_from_str(
    metric: str, transform: str = "difference"
) -> Union[Callable[[pd.Series, pd.Series, pd.Series], float], str]:
    """Get the quality function from a string.

    Args:
        metric (str): Name of the metric. If None, the default metric is used.
        transform (str): Type of the metric. Can be "difference", "ratio" or other fairlearn supported transforms

    Returns: the quality function according to the selected metric - a defined function
    or a string (in case of equalized odds difference as it's already defined in fairsd)
    """
    # Preprocess metric string. If it ends with "diff" or "ratio", set transform accordingly
    if metric.split("_")[-1] == "diff":
        transform = "difference"
    elif metric.split("_")[-1] == "ratio":
        transform = "ratio"

    metric = trim_transform_from_str(metric).lower()

    if metric in ("equalized_odds", "eo", "eo_diff"):
        qf = (
            "equalized_odds_difference"
            if transform == "difference"
            else "equalized_odds_ratio"
        )
    elif metric in ("brier_score", "brier_score_loss"):
        qf = make_derived_metric(metric=brier_score_loss, transform=transform)
    elif metric in ("log_loss", "loss", "total_loss"):
        qf = make_derived_metric(metric=log_loss, transform=transform)
    elif metric in ("accuracy", "accuracy_score", "acc"):
        qf = make_derived_metric(metric=accuracy_score, transform=transform)
    elif metric in ("f1", "f1_score"):
        qf = make_derived_metric(metric=f1_score, transform=transform)
    elif metric in ("al", "average_loss", "average_log_loss"):
        qf = make_derived_metric(metric=average_log_loss_score, transform=transform)
    elif metric in ("roc_auc", "auroc", "auc_roc", "roc_auc_score"):
        qf = make_derived_metric(metric=roc_auc_score, transform=transform)
    elif metric in ("miscalibration", "miscal", "cal", "calibration"):
        qf = make_derived_metric(metric=miscalibration_score, transform=transform)
    elif metric in (
        "auprc",
        "pr_auc",
        "precision_recall_auc",
        "average_precision_score",
    ):
        qf = make_derived_metric(metric=average_precision_score, transform=transform)
    elif metric in ("false_positive_rate", "fpr"):
        qf = make_derived_metric(metric=false_positive_score, transform=transform)
    elif metric in ("true_positive_rate", "tpr"):
        qf = make_derived_metric(metric=true_positive_score, transform=transform)
    elif metric in ("fnr", "false_negative_rate"):
        qf = make_derived_metric(metric=false_negative_score, transform=transform)
    else:
        raise ValueError(
            f"Metric: {metric} not supported. "
            "Metric must be one of the following: "
            "equalized_odds, brier_score_loss, log_loss, accuracy_score, average_loss, "
            "roc_auc_diff, miscalibration_diff, auprc_diff, fpr_diff, tpr_diff"
        )

    return qf


def get_name_from_metric_str(metric: str) -> str:
    """Get the name of the metric from a string nicely formatted."""
    metric = trim_transform_from_str(metric)
    if metric in ("equalized_odds", "eo"):
        return "Equalized Odds"
    # Split words and Capitalize the first letters
    return " ".join(
        [
            word.upper()
            if word in ("auprc, auroc", "auc", "roc", "prc", "tpr", "fpr", "fnr")
            else word.capitalize()
            for word in metric.split("_")
        ]
    )


def trim_transform_from_str(metric: str) -> str:
    """Trim the transform from a string."""
    if metric.split("_")[-1] == "diff" or metric.split("_")[-1] == "ratio":
        metric = "_".join(metric.split("_")[:-1])
    return metric


def get_quality_metric_from_str(metric: str) -> Callable[[pd.Series, pd.Series], float]:
    """Get the quality metric from a string."""

    if metric.split("_")[-1] == "diff" or metric.split("_")[-1] == "ratio":
        metric = "_".join(metric.split("_")[:-1]).lower()

    if metric in ("equalized_odds", "eo"):
        # Get max of tpr and fpr
        return (
            lambda y_true, y_pred: 
            "TPR: "
            + str(true_positive_score(y_true, y_pred).round(3))
            + "; FPR: "
            + str(false_positive_score(y_true, y_pred).round(3))
        )
    elif metric in ("brier_score", "brier_score_loss"):
        quality_metric = brier_score_loss
    elif metric in ("log_loss", "loss", "total_loss"):
        quality_metric = log_loss
    elif metric in ("accuracy", "accuracy_score", "acc"):
        quality_metric = accuracy_score
    elif metric in ("f1", "f1_score"):
        quality_metric = f1_score
    elif metric in ("al", "average_loss", "average_log_loss"):
        quality_metric = average_log_loss_score
    elif metric in ("roc_auc", "auroc", "auc_roc", "roc_auc_score"):
        quality_metric = roc_auc_score
    elif metric in ("miscalibration", "miscal"):
        quality_metric = miscalibration_score
    elif metric in (
        "auprc",
        "pr_auc",
        "precision_recall_auc",
        "average_precision_score",
    ):
        quality_metric = average_precision_score
    elif metric in ("false_positive_rate", "fpr"):
        quality_metric = false_positive_score
    elif metric in ("true_positive_rate", "tpr"):
        quality_metric = true_positive_score
    elif metric in ("fnr", "false_negative_rate"):
        quality_metric = false_negative_score
    else:
        raise ValueError(
            f"Metric: {metric} not supported. "
            "Metric must be one of the following: "
            "equalized_odds, brier_score_loss, log_loss, accuracy_score, "
            "average_loss, roc_auc_diff, miscalibration_diff, auprc_diff, fpr_diff, tpr_diff"
        )

    return lambda y_true, y_pred: quality_metric(y_true, y_pred).round(3)


def sort_quality_metrics_df(
    result_set_df: pd.DataFrame, quality_metric: str
) -> pd.DataFrame:
    """Sort the result set dataframe by the quality metric."""
    # If quality_metric ends with ratio
    if quality_metric.split("_")[-1] == "ratio":
        # if ratios are below 1.0, the metric is more significant the lower it is so we sort in ascending order
        result_set_df = result_set_df.sort_values(by="quality", ascending=False)
    elif quality_metric.split("_")[-1] in ("difference", "diff"):
        # If metric is a loss (lower is better)
        if (
            "acc" in quality_metric
            or "au" in quality_metric
            or "f1" in quality_metric
        ):
            # Sort the result_set_df in descending order based on the metric_score
            result_set_df = result_set_df.sort_values(
                by="metric_score", ascending=True
            )
        # If max differences are below one, we are talking about difference in ratios, so we should show the results in ascending order
        else:
            # Sort the result_set_df in ascending order based on the metric_score
            result_set_df = result_set_df.sort_values(by="metric_score", ascending=False)
    else:
        raise ValueError(
            "Metric must be either a difference or a ratio! Provided metric:",
            quality_metric,
        )

    return result_set_df