Spaces:
Runtime error
Runtime error
File size: 7,557 Bytes
d2a8669 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
from typing import Union
from aif360.detectors.mdss.ScoringFunctions import (
Bernoulli,
BerkJones,
Gaussian,
ScoringFunction,
Poisson,
)
from aif360.detectors.mdss.MDSS import MDSS
import pandas as pd
def bias_scan(
data: pd.DataFrame,
observations: pd.Series,
expectations: Union[pd.Series, pd.DataFrame] = None,
favorable_value: Union[str, float] = None,
overpredicted: bool = True,
scoring: Union[str, ScoringFunction] = "Bernoulli",
num_iters: int = 10,
penalty: float = 1e-17,
mode: str = "binary",
**kwargs,
):
"""
scan to find the highest scoring subset of records
:param data (dataframe): the dataset (containing the features) the model was trained on
:param observations (series): ground truth (correct) target values
:param expectations (series, dataframe, optional): pandas series estimated targets
as returned by a model for binary, continuous and ordinal modes.
If mode is nominal, this is a dataframe with columns containing expectations for each nominal class.
If None, model is assumed to be a dumb model that predicts the mean of the targets
or 1/(num of categories) for nominal mode.
:param favorable_value(str, float, optional): Should be high or low or float if the mode in [binary, ordinal, or continuous].
If float, value has to be minimum or maximum in the observations column. Defaults to high if None for these modes.
Support for float left in to keep the intuition clear in binary classification tasks.
If mode is nominal, favorable values should be one of the unique categories in the observations.
Defaults to a one-vs-all scan if None for nominal mode.
:param overpredicted (bool, optional): flag for group to scan for.
True means we scan for a group whose expectations/predictions are systematically higher than observed.
In other words, True means we scan for a group whose observeed is systematically lower than the expectations.
False means we scan for a group whose expectations/predictions are systematically lower than observed.
In other words, False means we scan for a group whose observed is systematically higher than the expectations.
:param scoring (str or class): One of 'Bernoulli', 'Gaussian', 'Poisson', or 'BerkJones' or subclass of
:class:`aif360.metrics.mdss.ScoringFunctions.ScoringFunction`.
:param num_iters (int, optional): number of iterations (random restarts). Should be positive.
:param penalty (float,optional): penalty term. Should be positive. The penalty term as with any regularization parameter may need to be
tuned for ones use case. The higher the penalty, the less complex (number of features and feature values) the
highest scoring subset that gets returned is.
:param mode: one of ['binary', 'continuous', 'nominal', 'ordinal']. Defaults to binary.
In nominal mode, up to 10 categories are supported by default.
To increase this, pass in keyword argument max_nominal = integer value.
:returns: the highest scoring subset and the score or dict of the highest scoring subset and the score for each category in nominal mode
"""
# Ensure correct mode is passed in.
modes = ["binary", "continuous", "nominal", "ordinal"]
assert mode in modes, f"Expected one of {modes}, got {mode}."
# Set correct favorable value (this tells us if higher or lower is better)
min_val, max_val = observations.min(), observations.max()
uniques = list(observations.unique())
if favorable_value == 'high':
favorable_value = max_val
elif favorable_value == 'low':
favorable_value = min_val
elif favorable_value is None:
if mode in ["binary", "ordinal", "continuous"]:
favorable_value = max_val # Default to higher is better
elif mode == "nominal":
favorable_value = "flag-all" # Default to scan through all categories
assert favorable_value in [
"flag-all",
*uniques,
], f"Expected one of {uniques}, got {favorable_value}."
assert favorable_value in [
min_val,
max_val,
"flag-all",
*uniques,
], f"Favorable_value should be high, low, or one of categories {uniques}, got {favorable_value}."
# Set appropriate direction for scanner depending on mode and overppredicted flag
if mode in ["ordinal", "continuous"]:
if favorable_value == max_val:
kwargs["direction"] = "negative" if overpredicted else "positive"
else:
kwargs["direction"] = "positive" if overpredicted else "negative"
else:
kwargs["direction"] = "negative" if overpredicted else "positive"
# Set expectations to mean targets for non-nominal modes
if expectations is None and mode != "nominal":
expectations = pd.Series(observations.mean(), index=observations.index)
# Set appropriate scoring function
if scoring == "Bernoulli":
scoring = Bernoulli(**kwargs)
elif scoring == "BerkJones":
scoring = BerkJones(**kwargs)
elif scoring == "Gaussian":
scoring = Gaussian(**kwargs)
elif scoring == "Poisson":
scoring = Poisson(**kwargs)
else:
scoring = scoring(**kwargs)
if mode == "binary": # Flip observations if favorable_value is 0 in binary mode.
observations = pd.Series(observations == favorable_value, dtype=int)
elif mode == "nominal":
unique_outs = set(sorted(observations.unique()))
size_unique_outs = len(unique_outs)
if expectations is not None: # Set expectations to 1/(num of categories) for nominal mode
expectations_cols = set(sorted(expectations.columns))
assert (
unique_outs == expectations_cols
), f"Expected {unique_outs} in expectation columns, got {expectations_cols}"
else:
expectations = pd.Series(
1 / observations.nunique(), index=observations.index
)
max_nominal = kwargs.get("max_nominal", 10)
assert (
size_unique_outs <= max_nominal
), f"Nominal mode only support up to {max_nominal} labels, got {size_unique_outs}. Use keyword argument max_nominal to increase the limit."
if favorable_value != "flag-all": # If favorable flag is set, use one-vs-others strategy to scan, else use one-vs-all strategy
observations = observations.map({favorable_value: 1})
observations = observations.fillna(0)
if isinstance(expectations, pd.DataFrame):
expectations = expectations[favorable_value]
else:
results = {}
orig_observations = observations.copy()
orig_expectations = expectations.copy()
for unique in uniques:
observations = orig_observations.map({unique: 1})
observations = observations.fillna(0)
if isinstance(expectations, pd.DataFrame):
expectations = orig_expectations[unique]
scanner = MDSS(scoring)
result = scanner.scan(
data, expectations, observations, penalty, num_iters, mode=mode
)
results[unique] = result
return results
scanner = MDSS(scoring)
return scanner.scan(data, expectations, observations, penalty, num_iters, mode=mode)
|