Spaces:
Runtime error
Runtime error
from aif360.detectors.mdss.ScoringFunctions.ScoringFunction import ScoringFunction | |
from aif360.detectors.mdss.generator import get_entire_subset, get_random_subset | |
from aif360.detectors.mdss.ScoringFunctions import ( | |
Bernoulli, | |
BerkJones, | |
Gaussian, | |
ScoringFunction, | |
Poisson, | |
) | |
import pandas as pd | |
import numpy as np | |
class MDSS(object): | |
def __init__(self, scoring_function: ScoringFunction): | |
self.scoring_function = scoring_function | |
def get_aggregates(self, coordinates: pd.DataFrame, outcomes: pd.Series, expectations: pd.Series, | |
current_subset: dict, column_name: str, penalty: float): | |
""" | |
Conditioned on the current subsets of values for all other attributes, | |
compute the summed outcome (observed_sum = \sum_i y_i) and all expectations p_i | |
for each value of the current attribute. | |
Also use additive linear-time subset scanning to compute the set of distinct thresholds | |
for which different subsets of attribute values have positive scores. Note that the number | |
of such thresholds will be linear rather than exponential in the arity of the attribute. | |
:param coordinates: data frame containing having as columns the covariates/features | |
:param expectations: data series containing the expectations/expected outcomes | |
:param outcomes: data series containing the outcomes/observed outcomes | |
:param current_subset: current subset to compute aggregates | |
:param column_name: attribute name to scan over | |
:param penalty: penalty coefficient | |
:return: dictionary of aggregates, sorted thresholds (roots), observed sum of the subset, array of observed | |
expectations | |
""" | |
# compute the subset of records matching the current subgroup along all other dimensions | |
# temp_df includes the covariates x_i, outcome y_i, and predicted expectation p_i for each matching record | |
if current_subset: | |
to_choose = coordinates[current_subset.keys()].isin(current_subset).all(axis=1) | |
temp_df = pd.concat([coordinates.loc[to_choose], outcomes[to_choose], expectations[to_choose]], axis=1) | |
else: | |
temp_df = pd.concat([coordinates, outcomes, expectations], axis=1) | |
# these wil be used to keep track of the aggregate values and the distinct thresholds to be considered | |
aggregates = {} | |
thresholds = set() | |
scoring_function = self.scoring_function | |
# consider each distinct value of the given attribute (column_name) | |
for name, group in temp_df.groupby(column_name): | |
# compute the sum of outcomes \sum_i y_i | |
observed_sum = group.iloc[:, -2].sum() | |
# all expectations p_i | |
expectations = group.iloc[:, -1].values | |
# compute q_min and q_max for the attribute value | |
exist, q_mle, q_min, q_max = scoring_function.compute_qs(observed_sum, expectations, penalty) | |
# Add to aggregates, and add q_min and q_max to thresholds. | |
# Note that thresholds is a set so duplicates will be removed automatically. | |
if exist: | |
aggregates[name] = { | |
'q_mle': q_mle, | |
'q_min': q_min, | |
'q_max': q_max, | |
'observed_sum': observed_sum, | |
'expectations': expectations | |
} | |
thresholds.update([q_min, q_max]) | |
# We also keep track of the summed outcomes \sum_i y_i and the expectations p_i for the case where _ | |
# all_ values of that attribute are considered (regardless of whether they contribute positively to score). | |
# This is necessary because of the way we compute the penalty term: including all attribute values, equivalent | |
# to ignoring the attribute, has the lowest penalty (of 0) and thus we need to score that subset as well. | |
all_observed_sum = temp_df.iloc[:, -2].sum() | |
all_expectations = temp_df.iloc[:, -1].values | |
return [aggregates, sorted(thresholds), all_observed_sum, all_expectations] | |
def choose_aggregates(self, aggregates: dict, thresholds: list, penalty: float, all_observed_sum: float, | |
all_expectations: list): | |
""" | |
Having previously computed the aggregates and the distinct q thresholds | |
to consider in the get_aggregates function,we are now ready to choose the best | |
subset of attribute values for the given attribute. | |
For each range defined by these thresholds, we will choose all of the positive contributions, | |
compute the MLE value of q, and the corresponding score. | |
We then pick the best q and score over all of the ranges considered. | |
:param aggregates: dictionary of aggregates. For each feature value, it has q_mle, q_min, q_max, observed_sum, | |
and the expectations | |
:param thresholds: sorted thresholds (roots) | |
:param penalty: penalty coefficient | |
:param all_observed_sum: sum of observed binary outcomes for all i | |
:param all_expectations: data series containing all the expectations/expected outcomes | |
:return: | |
""" | |
# initialize | |
best_score = 0 | |
best_names = [] | |
scoring_function = self.scoring_function | |
# for each threshold | |
for i in range(len(thresholds) - 1): | |
threshold = (thresholds[i] + thresholds[i + 1]) / 2 | |
observed_sum = 0.0 | |
expectations = [] | |
names = [] | |
# keep only the aggregates which have a positive contribution to the score in that q range | |
# we must keep track of the sum of outcome values as well as all predicted expectations | |
for key, value in aggregates.items(): | |
if (value['q_min'] < threshold) & (value['q_max'] > threshold): | |
names.append(key) | |
observed_sum += value['observed_sum'] | |
expectations = expectations + value['expectations'].tolist() | |
if len(expectations) == 0: | |
continue | |
# compute the MLE value of q, making sure to only consider the desired direction (positive or negative) | |
expectations = np.asarray(expectations) | |
current_q_mle = scoring_function.qmle(observed_sum, expectations) | |
# Compute the score for the given subset at the MLE value of q. | |
# Notice that each included value gets a penalty, so the total penalty | |
# is multiplied by the number of included values. | |
current_interval_score = scoring_function.score(observed_sum, expectations, penalty * len(names), current_q_mle) | |
# keep track of the best score, best q, and best subset of attribute values found so far | |
if current_interval_score > best_score: | |
best_score = current_interval_score | |
best_names = names | |
# Now we also have to consider the case of including all attribute values, | |
# including those that never make positive contributions to the score. | |
# Note that the penalty term is 0 in this case. (We are neglecting penalties | |
# from all other attributes, just considering the current attribute.) | |
# compute the MLE value of q, making sure to only consider the desired direction (positive or negative) | |
current_q_mle = scoring_function.qmle(all_observed_sum, all_expectations) | |
# Compute the score for the given subset at the MLE value of q. | |
# Again, the penalty (for that attribute) is 0 when all attribute values are included. | |
current_score = scoring_function.score(all_observed_sum, all_expectations, 0, current_q_mle) | |
# Keep track of the best score, best q, and best subset of attribute values found. | |
# Note that if the best subset contains all values of the given attribute, | |
# we return an empty list for best_names. | |
if current_score > best_score: | |
best_score = current_score | |
best_names = [] | |
return [best_names, best_score] | |
def score_current_subset(self, coordinates: pd.DataFrame, expectations: pd.Series, outcomes: pd.Series, | |
current_subset: dict, penalty: float): | |
""" | |
Just scores the subset without performing ALTSS. | |
We still need to determine the MLE value of q. | |
:param coordinates: data frame containing having as columns the covariates/features | |
:param expectations: data series containing the expectations/expected outcomes | |
:param outcomes: data series containing the outcomes/observed outcomes | |
:param current_subset: current subset to be scored | |
:param penalty: penalty coefficient | |
:return: penalized score of subset | |
""" | |
# compute the subset of records matching the current subgroup along all dimensions | |
# temp_df includes the covariates x_i, outcome y_i, and predicted expectation p_i for each matching record | |
if current_subset: | |
to_choose = coordinates[current_subset.keys()].isin(current_subset).all(axis=1) | |
temp_df = pd.concat([coordinates.loc[to_choose], outcomes[to_choose], expectations[to_choose]], axis=1) | |
else: | |
temp_df = pd.concat([coordinates, outcomes, expectations], axis=1) | |
scoring_function = self.scoring_function | |
# we must keep track of the sum of outcome values as well as all predicted expectations | |
observed_sum = temp_df.iloc[:, -2].sum() | |
expectations = temp_df.iloc[:, -1].values | |
# compute the MLE value of q, making sure to only consider the desired direction (positive or negative) | |
current_q_mle = scoring_function.qmle(observed_sum, expectations) | |
# total_penalty = penalty * sum of list lengths in current_subset | |
total_penalty = 0 | |
for key, values in current_subset.items(): | |
total_penalty += len(values) | |
total_penalty *= penalty | |
# Compute and return the penalized score | |
penalized_score = scoring_function.score(observed_sum, expectations, total_penalty, current_q_mle) | |
return np.round(penalized_score, 4) | |
def scan(self, coordinates: pd.DataFrame, expectations: pd.Series, outcomes: pd.Series, penalty: float, | |
num_iters: int, verbose: bool = False, seed: int = 0, mode: str = 'binary'): | |
""" | |
:param coordinates: data frame containing having as columns the covariates/features | |
:param expectations: data series containing the expectations/expected outcomes | |
:param outcomes: data series containing the outcomes/observed outcomes | |
:param penalty: penalty coefficient | |
:param num_iters: number of iteration | |
:param verbose: logging flag | |
:param seed: numpy seed. Default equals 0 | |
:param mode: one of ['binary', 'continuous', 'nominal', 'ordinal']. Defaults to binary. | |
:return: [best subset, best score] | |
""" | |
np.random.seed(seed) | |
# Reset indexes | |
coordinates = coordinates.reset_index(drop = True) | |
expectations = expectations.reset_index(drop = True) | |
outcomes = outcomes.reset_index(drop = True) | |
assert len(coordinates) == len(expectations) == len(outcomes), \ | |
f'Lengths of coordinates, expectations, and outcomes should be equal.' | |
# Check that the appropriate scoring function is used | |
if isinstance(self.scoring_function, BerkJones): | |
modes = ["binary", "continuous", "nominal", "ordinal"] | |
assert mode in modes, f"Expected one of {modes} for BerkJones, got {mode}." | |
# Ensure that BerkJones only work in Autostrat mode | |
unique_expectations = expectations.unique() | |
if isinstance(self.scoring_function, BerkJones) and len(unique_expectations) != 1: | |
raise Exception( | |
"BerkJones scorer supports scanning in autostrat mode only." | |
) | |
# Bin the continuous outcomes column for Berk Jones in continuous mode | |
alpha = self.scoring_function.alpha | |
direction = self.scoring_function.direction | |
if mode == "continuous": | |
quantile = outcomes.quantile(alpha) | |
outcomes = (outcomes > quantile).apply(int) | |
# Flip outcomes to scan in the negative direction for BerkJones | |
# This is equivalent to switching the p-values | |
if direction == "negative": | |
outcomes = 1 - outcomes | |
if isinstance(self.scoring_function, Bernoulli): | |
modes = ["binary", "nominal"] | |
assert mode in modes, f"Expected one of {modes} for Bernoulli, got {mode}." | |
if isinstance(self.scoring_function, Gaussian): | |
assert mode == 'continuous', f"Expected continuous, got {mode}." | |
# Set variance for Gaussian | |
self.scoring_function.var = expectations.var() | |
# Move entire distribution to the positive axis | |
shift = np.abs(expectations.min()) + np.abs(outcomes.min()) | |
outcomes = outcomes + shift | |
expectations = expectations + shift | |
if isinstance(self.scoring_function, Poisson): | |
modes = ["binary", "ordinal"] | |
assert mode in modes, f"Expected one of {modes} for Poisson, got {mode}." | |
# initialize | |
best_subset = {} | |
best_score = -1e10 | |
best_scores = [] | |
for i in range(num_iters): | |
# flags indicates that the method has optimized over subsets for a given attribute. | |
# The iteration ends when it cannot further increase score by optimizing over | |
# subsets of any attribute, i.e., when all flags are 1. | |
flags = np.empty(len(coordinates.columns)) | |
flags.fill(0) | |
# Starting subset. Note that we start with all values for the first iteration | |
# and random values for succeeding iterations. | |
current_subset = get_entire_subset() if (i == 0) \ | |
else get_random_subset(coordinates, np.random.rand(1).item(), 10) | |
# score the entire population | |
current_score = self.score_current_subset( | |
coordinates=coordinates, | |
expectations=expectations, | |
outcomes=outcomes, | |
penalty=penalty, | |
current_subset=current_subset | |
) | |
while flags.sum() < len(coordinates.columns): | |
# choose random attribute that we haven't scanned yet | |
attribute_number_to_scan = np.random.choice(len(coordinates.columns)) | |
while flags[attribute_number_to_scan]: | |
attribute_number_to_scan = np.random.choice(len(coordinates.columns)) | |
attribute_to_scan = coordinates.columns.values[attribute_number_to_scan] | |
# clear current subset of attribute values for that subset | |
if attribute_to_scan in current_subset: | |
del current_subset[attribute_to_scan] | |
# call get_aggregates and choose_aggregates to find best subset of attribute values | |
aggregates, thresholds, all_observed_sum, all_expectations = self.get_aggregates( | |
coordinates=coordinates, | |
outcomes=outcomes, | |
expectations=expectations, | |
current_subset=current_subset, | |
column_name=attribute_to_scan, | |
penalty=penalty | |
) | |
temp_names, temp_score = self.choose_aggregates( | |
aggregates=aggregates, | |
thresholds=thresholds, | |
penalty=penalty, | |
all_observed_sum=all_observed_sum, | |
all_expectations=all_expectations | |
) | |
temp_subset = current_subset.copy() | |
# if temp_names is not empty (or null) | |
if temp_names: | |
temp_subset[attribute_to_scan] = temp_names | |
# Note that this call to score_current_subset ensures that | |
# we are penalizing complexity for all attribute values. | |
# The value of temp_score computed by choose_aggregates | |
# above includes only the penalty for the current attribute. | |
temp_score = self.score_current_subset( | |
coordinates=coordinates, | |
expectations=expectations, | |
outcomes=outcomes, | |
penalty=penalty, | |
current_subset=temp_subset | |
) | |
# reset flags to 0 if we have improved score | |
if temp_score > current_score + 1E-6: | |
flags.fill(0) | |
# sanity check to make sure score has not decreased | |
# sanity check may not apply to Gaussian in penalized mode (TODO: to check Maths again) | |
if not isinstance(self.scoring_function, Gaussian) and penalty > 0: | |
assert ( | |
temp_score >= current_score - 1e-6 | |
), "WARNING SCORE HAS DECREASED from %.6f to %.6f" % ( | |
current_score, | |
temp_score, | |
) | |
flags[attribute_number_to_scan] = 1 | |
current_subset = temp_subset | |
current_score = temp_score | |
# print out results for current iteration | |
if verbose: | |
print("Subset found on iteration", i + 1, "of", num_iters, "with score", current_score, ":") | |
print(current_subset) | |
# update best_score and best_subset if necessary | |
if current_score > best_score: | |
best_subset = current_subset.copy() | |
best_score = current_score | |
if verbose: | |
print("Best score is now", best_score) | |
elif verbose: | |
print("Current score of", current_score, "does not beat best score of", best_score) | |
best_scores.append(best_score) | |
return best_subset, best_score | |