|
import os |
|
import sys |
|
import csv |
|
import json |
|
from collections import defaultdict |
|
import networkx as nx |
|
from datetime import datetime |
|
import logging |
|
|
|
|
|
def col2idx(cols): |
|
result = dict() |
|
for i, col in enumerate(cols): |
|
result[col] = i |
|
return result |
|
|
|
|
|
def load_alert_param(_alert_param_csv): |
|
"""Load an alert parameter file |
|
:param _alert_param_csv: Alert parameter CSV file |
|
:return: dict of line number of the parameter file and parameter set as dict |
|
""" |
|
with open(_alert_param_csv, "r") as _rf: |
|
reader = csv.reader(_rf) |
|
header = next(reader) |
|
name2idx = col2idx(header) |
|
count_idx = name2idx["count"] |
|
type_idx = name2idx["type"] |
|
schedule_idx = name2idx["schedule_id"] |
|
min_acct_idx = name2idx["min_accounts"] |
|
max_acct_idx = name2idx["max_accounts"] |
|
min_amt_idx = name2idx["min_amount"] |
|
max_amt_idx = name2idx["max_amount"] |
|
min_period_idx = name2idx["min_period"] |
|
max_period_idx = name2idx["max_period"] |
|
bank_idx = name2idx["bank_id"] |
|
sar_idx = name2idx["is_sar"] |
|
|
|
param_data = dict() |
|
line_num = 2 |
|
for row in reader: |
|
count = int(row[count_idx]) |
|
alert_type = row[type_idx] |
|
is_ordered = int(row[schedule_idx]) > 0 |
|
accounts = (int(row[min_acct_idx]), int(row[max_acct_idx])) |
|
amount = (float(row[min_amt_idx]), float(row[max_amt_idx])) |
|
period = (int(row[min_period_idx]), int(row[max_period_idx])) |
|
is_multiple_banks = row[bank_idx] == "" |
|
is_sar = row[sar_idx].lower() == "true" |
|
params = {"count": count, "type": alert_type, "ordered": is_ordered, |
|
"accounts": accounts, "amount": amount, "period": period, |
|
"multiple_banks": is_multiple_banks, "sar": is_sar} |
|
param_data[line_num] = params |
|
line_num += 1 |
|
|
|
return param_data |
|
|
|
|
|
def load_alert_tx(_alert_tx_schema, _alert_tx_csv): |
|
"""Load an alert-related transaction CSV file and construct subgraphs |
|
:param _alert_tx_schema: |
|
:param _alert_tx_csv: |
|
:return: dict of alert ID and alert transaction subgraph |
|
""" |
|
alert_idx = None |
|
type_idx = None |
|
orig_idx = None |
|
bene_idx = None |
|
amt_idx = None |
|
date_idx = None |
|
for i, col in enumerate(_alert_tx_schema): |
|
data_type = col.get("dataType") |
|
if data_type == "alert_id": |
|
alert_idx = i |
|
elif data_type == "alert_type": |
|
type_idx = i |
|
elif data_type == "orig_id": |
|
orig_idx = i |
|
elif data_type == "dest_id": |
|
bene_idx = i |
|
elif data_type == "amount": |
|
amt_idx = i |
|
elif data_type == "timestamp": |
|
date_idx = i |
|
|
|
alert_graphs = defaultdict(nx.DiGraph) |
|
with open(_alert_tx_csv, "r") as _rf: |
|
reader = csv.reader(_rf) |
|
next(reader) |
|
for row in reader: |
|
alert_id = row[alert_idx] |
|
alert_type = row[type_idx] |
|
orig_id = row[orig_idx] |
|
bene_id = row[bene_idx] |
|
amount = float(row[amt_idx]) |
|
date_str = row[date_idx].split("T")[0] |
|
date = datetime.strptime(date_str, "%Y-%m-%d") |
|
alert_graphs[alert_id].add_edge(orig_id, bene_id, amount=amount, date=date) |
|
alert_graphs[alert_id].graph["alert_id"] = alert_id |
|
alert_graphs[alert_id].graph["alert_type"] = alert_type |
|
|
|
return alert_graphs |
|
|
|
|
|
def satisfies_params(alert_sub_g, param): |
|
"""Check whether the given alert subgraph satisfies the given parameter |
|
:param alert_sub_g: Alert subgraph |
|
:param param: Alert parameters as dict from a parameter file |
|
:return: If the subgraph satisfies all of the given parameter, return True. |
|
""" |
|
alert_id = alert_sub_g.graph["alert_id"] |
|
num_accounts = alert_sub_g.number_of_nodes() |
|
tx_attrs = [attr for _, _, attr in alert_sub_g.edges(data=True)] |
|
start_date = min([attr["date"] for attr in tx_attrs]) |
|
end_date = max([attr["date"] for attr in tx_attrs]) |
|
period = (end_date - start_date).days + 1 |
|
init_amount = [attr["amount"] for attr in tx_attrs if attr["date"] == start_date][0] |
|
alert_type = param["type"] |
|
|
|
if alert_type == "cycle" and not is_cycle(alert_sub_g): |
|
return False |
|
elif alert_type == "scatter_gather" and not is_scatter_gather(alert_sub_g): |
|
return False |
|
elif alert_type == "gather_scatter" and not is_gather_scatter(alert_sub_g): |
|
return False |
|
|
|
min_acct, max_acct = param["accounts"] |
|
if not min_acct <= num_accounts <= max_acct: |
|
logging.info("Alert %s: The number of accounts %d is not within [%d, %d]" |
|
% (alert_id, num_accounts, min_acct, max_acct)) |
|
return False |
|
|
|
min_amt, max_amt = param["amount"] |
|
if not min_amt <= init_amount <= max_amt: |
|
logging.info("Alert %s: initial amount %f is not within [%f, %f]" % (alert_id, init_amount, min_amt, max_amt)) |
|
return False |
|
|
|
min_period, max_period = param["period"] |
|
if not min_period <= period <= max_period: |
|
logging.info("Alert %s: period %d is not within [%d, %d]" % (alert_id, period, min_period, max_period)) |
|
return False |
|
|
|
return True |
|
|
|
|
|
def is_cycle(alert_sub_g: nx.DiGraph, is_ordered: bool = True): |
|
alert_id = alert_sub_g.graph["alert_id"] |
|
edges = alert_sub_g.edges(data=True) |
|
cycles = list(nx.simple_cycles(alert_sub_g)) |
|
if len(cycles) != 1: |
|
logging.info("Alert %s is not a cycle pattern" % alert_id) |
|
return False |
|
if is_ordered: |
|
edges.sort(key=lambda e: e[2]["date"]) |
|
next_orig = None |
|
next_amt = sys.float_info.max |
|
next_date = datetime.strptime("1970-01-01", "%Y-%m-%d") |
|
for orig, bene, attr in edges: |
|
if next_orig is not None and orig != next_orig: |
|
logging.info("Alert %s is not a cycle pattern" % alert_id) |
|
return False |
|
else: |
|
next_orig = bene |
|
|
|
amount = attr["amount"] |
|
if amount == next_amt: |
|
logging.info("Alert %s cycle transaction amounts are unordered" % alert_id) |
|
return False |
|
else: |
|
next_amt = amount |
|
|
|
date = attr["date"] |
|
if date < next_date: |
|
logging.info("Alert %s cycle transactions are chronologically unordered" % alert_id) |
|
return False |
|
else: |
|
next_date = date |
|
return True |
|
|
|
|
|
def is_scatter_gather(alert_sub_g: nx.DiGraph, is_ordered: bool = True): |
|
alert_id = alert_sub_g.graph["alert_id"] |
|
num_accts = alert_sub_g.number_of_nodes() |
|
num_mid = num_accts - 2 |
|
out_degrees = alert_sub_g.out_degree() |
|
in_degrees = alert_sub_g.in_degree() |
|
orig = None |
|
bene = None |
|
mid_accts = list() |
|
for n, out_d in out_degrees.items(): |
|
in_d = in_degrees[n] |
|
if out_d == num_mid: |
|
orig = n |
|
if in_d != 0: |
|
logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
|
% (alert_id, in_d, n, out_d)) |
|
return False |
|
elif out_d == 0: |
|
bene = n |
|
if in_d != num_mid: |
|
logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
|
% (alert_id, in_d, n, out_d)) |
|
return False |
|
elif out_d == 1: |
|
mid_accts.append(n) |
|
if in_d != 1: |
|
logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
|
% (alert_id, in_d, n, out_d)) |
|
return False |
|
else: |
|
logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
|
% (alert_id, in_d, n, out_d)) |
|
return False |
|
if len(mid_accts) != num_mid: |
|
logging.info("Not a scatter-gather pattern: " + alert_id) |
|
return False |
|
|
|
if is_ordered: |
|
for mid in mid_accts: |
|
scatter_attr = alert_sub_g.get_edge_data(orig, mid) |
|
gather_attr = alert_sub_g.get_edge_data(mid, bene) |
|
if scatter_attr is None: |
|
logging.info("Alert %s is not a scatter-gather pattern: scatter edge %s -> %s not found" |
|
% (alert_id, orig, mid)) |
|
return False |
|
elif gather_attr is None: |
|
logging.info("Alert %s is not a scatter-gather pattern: gather edge %s -> %s not found" |
|
% (alert_id, mid, bene)) |
|
|
|
scatter_date = scatter_attr["date"] |
|
gather_date = gather_attr["date"] |
|
if scatter_date > gather_date: |
|
logging.info("Alert %s scatter-gather transactions are chronologically unordered" % alert_id) |
|
return False |
|
scatter_amount = scatter_attr["amount"] |
|
gather_amount = gather_attr["amount"] |
|
if scatter_amount <= gather_amount: |
|
logging.info("Alert %s scatter-gather transaction amounts are unordered" % alert_id) |
|
return False |
|
|
|
return True |
|
|
|
|
|
def is_gather_scatter(alert_sub_g: nx.DiGraph, is_ordered: bool = True): |
|
alert_id = alert_sub_g.graph["alert_id"] |
|
num_accts = alert_sub_g.number_of_nodes() |
|
out_degrees = alert_sub_g.out_degree() |
|
in_degrees = alert_sub_g.in_degree() |
|
|
|
orig_accts = [n for n, d in out_degrees.items() if d == 1 and in_degrees[n] == 0] |
|
bene_accts = [n for n, d in in_degrees.items() if d == 1 and out_degrees[n] == 0] |
|
num_orig = len(orig_accts) |
|
num_bene = len(bene_accts) |
|
hub_accts = [n for n, d in out_degrees.items() if d == num_bene and in_degrees[n] == num_orig] |
|
if len(hub_accts) != 1 or (num_orig + num_bene + 1) != num_accts: |
|
logging.info("Alert %s is not a gather-scatter pattern" % alert_id) |
|
return False |
|
|
|
hub = hub_accts[0] |
|
last_gather_date = datetime.strptime("1970-01-01", "%Y-%m-%d") |
|
total_gather_amount = 0.0 |
|
for orig in orig_accts: |
|
attr = alert_sub_g.get_edge_data(orig, hub) |
|
if attr is None: |
|
logging.info("Alert %s is not a gather-scatter pattern: gather edge %s -> %s not found" |
|
% (alert_id, orig, hub)) |
|
return False |
|
date = attr["date"] |
|
amount = attr["amount"] |
|
last_gather_date = max(last_gather_date, date) |
|
total_gather_amount += amount |
|
|
|
if is_ordered: |
|
max_scatter_amount = total_gather_amount / num_bene |
|
for bene in bene_accts: |
|
attr = alert_sub_g.get_edge_data(hub, bene) |
|
if attr is None: |
|
return False |
|
date = attr["date"] |
|
amount = attr["amount"] |
|
if date < last_gather_date: |
|
logging.info("Alert %s gather-scatter transactions are chronologically unordered " % alert_id) |
|
return False |
|
elif max_scatter_amount <= amount: |
|
logging.info("Alert %s gather-scatter transaction amounts are unordered" % alert_id) |
|
return False |
|
|
|
return True |
|
|
|
|
|
class AlertValidator: |
|
|
|
def __init__(self, conf_json, sim_name=None): |
|
with open(conf_json, "r") as rf: |
|
self.conf = json.load(rf) |
|
|
|
self.sim_name = sim_name if sim_name is not None else self.conf["general"]["simulation_name"] |
|
self.input_dir = self.conf["input"]["directory"] |
|
self.output_dir = os.path.join(self.conf["output"]["directory"], self.sim_name) |
|
schema_json = self.conf["input"]["schema"] |
|
schema_path = os.path.join(self.input_dir, schema_json) |
|
with open(schema_path, "r") as rf: |
|
self.schema = json.load(rf) |
|
|
|
log_file = os.path.join(self.output_dir, "alert_validations.log") |
|
logging.basicConfig(filename=log_file, filemode="w", level=logging.INFO) |
|
|
|
|
|
self.alert_param_file = self.conf["input"]["alert_patterns"] |
|
alert_param_path = os.path.join(self.input_dir, self.alert_param_file) |
|
schema_file = self.conf["input"]["schema"] |
|
schema_path = os.path.join(self.input_dir, schema_file) |
|
self.alert_params = load_alert_param(alert_param_path) |
|
|
|
|
|
alert_tx_file = self.conf["output"]["alert_transactions"] |
|
alert_tx_path = os.path.join(self.output_dir, alert_tx_file) |
|
with open(schema_path, "r") as _rf: |
|
schema = json.load(_rf) |
|
self.alert_graphs = load_alert_tx(schema["alert_tx"], alert_tx_path) |
|
|
|
def validate_single(self, alert_id): |
|
if alert_id not in self.alert_graphs: |
|
raise KeyError("No such alert ID: " + alert_id) |
|
sub_g = self.alert_graphs[alert_id] |
|
alert_type = sub_g.graph["alert_type"] |
|
for line_num, param in self.alert_params.items(): |
|
if param["type"] != alert_type: |
|
continue |
|
if satisfies_params(sub_g, param): |
|
logging.info("The alert %s subgraph matches the parameter %s:%d, data %s" % |
|
(alert_id, self.alert_param_file, line_num, str(param))) |
|
return True |
|
else: |
|
logging.warning("The alert subgraph (ID:%s, Type:%s) does not match any parameter sets" |
|
% (alert_id, alert_type)) |
|
return False |
|
|
|
def validate_all(self): |
|
num_alerts = len(self.alert_graphs) |
|
num_matched = 0 |
|
for alert_id in self.alert_graphs.keys(): |
|
if self.validate_single(alert_id): |
|
num_matched += 1 |
|
num_unmatched = num_alerts - num_matched |
|
print("Total number of alerts: %d, matched: %d, unmatched: %d" % (num_alerts, num_matched, num_unmatched)) |
|
|
|
|
|
if __name__ == "__main__": |
|
argv = sys.argv |
|
if len(argv) < 2: |
|
print("Usage: python3 %s [ConfJson] [SimName]" % argv[0]) |
|
exit(1) |
|
|
|
_conf_json = argv[1] |
|
_sim_name = argv[2] if len(argv) >= 3 else None |
|
av = AlertValidator(_conf_json, _sim_name) |
|
av.validate_all() |
|
|