""" Load output files from AMLSim and create NetworkX graph for analytics """ import os import sys import csv from datetime import datetime, timedelta from dateutil.parser import parse from collections import Counter import networkx as nx import json # Account (vertex) and transaction (edge) attribute keys ACCT_SAR = "sar" TX_AMOUNT = "amount" TX_DATE = "date" DEGREE_STEP = 5 # Interval of degrees def load_base_csv(acct_csv, tx_csv, schema_data): """Load account and transaction list CSV from the transaction graph generator (before running AMLSim) :param acct_csv: Account list CSV :param tx_csv: Transaction list CSV :param schema_data: Schema data from JSON file :return: Base transaction network as a NetworkX graph object """ return None def load_result_csv(acct_csv: str, tx_csv: str, schema_data) -> nx.MultiDiGraph: """Load account list CSV and transaction list CSV from AMLSim and generate transaction graph :param acct_csv: Account list CSV :param tx_csv: Transaction list CSV :param schema_data: Schema data from JSON :return: Transaction network as a NetworkX graph object """ acct_id_idx = None acct_sar_idx = None tx_src_idx = None tx_dst_idx = None tx_amt_idx = None tx_date_idx = None is_date_type = False base_date = datetime(1970, 1, 1) for idx, col in enumerate(schema_data["account"]): data_type = col.get("dataType") if data_type == "account_id": acct_id_idx = idx elif data_type == "sar_flag": acct_sar_idx = idx for idx, col in enumerate(schema_data["transaction"]): data_type = col.get("dataType") if data_type == "orig_id": tx_src_idx = idx elif data_type == "dest_id": tx_dst_idx = idx elif data_type == "amount": tx_amt_idx = idx elif data_type == "timestamp": tx_date_idx = idx is_date_type = col.get("valueType") == "date" _g = nx.MultiDiGraph() num_accts = 0 num_sar = 0 num_txs = 0 # Load account list CSV print("Load account list CSV file", acct_csv) with open(acct_csv, "r") as rf: reader = csv.reader(rf) next(reader) # Skip header for row in reader: acct_id = row[acct_id_idx] # Account ID is_sar = row[acct_sar_idx].lower() == "true" # SAR flag attr = {ACCT_SAR: is_sar} _g.add_node(acct_id, **attr) num_accts += 1 if is_sar: num_sar += 1 print("Number of total accounts: %d" % num_accts) print("Number of SAR accounts: %d (%.2f%%)" % (num_sar, num_sar/num_accts*100)) # Load transaction list CSV print("Loading transaction list CSV file", tx_csv) with open(tx_csv, "r") as rf: reader = csv.reader(rf) next(reader) # Skip header for row in reader: src_id = row[tx_src_idx] # Originator account ID dst_id = row[tx_dst_idx] # Beneficiary account ID amount = float(row[tx_amt_idx]) # TX_AMOUNT date = parse(row[tx_date_idx]) if is_date_type else base_date + timedelta(int(row[tx_date_idx])) date_str = date.strftime("%Y-%m-%d") attr = {TX_AMOUNT: amount, TX_DATE: date_str} _g.add_edge(src_id, dst_id, **attr) num_txs += 1 if num_txs % 100000 == 0: print("Loaded %d transactions" % num_txs) print("Number of transactions: %d" % num_txs) return _g def load_alert_csv(_g, alert_acct_csv, alert_tx_csv, schema_data): """Load alert member and transaction lists """ acct_id_idx = None class __TransactionGraphLoader: def __init__(self, _conf_json): with open(_conf_json, "r") as rf: self.conf = json.load(rf) schema_json = os.path.join(self.conf["input"]["directory"], self.conf["input"]["schema"]) with open(schema_json, "r") as rf: self.schema = json.load(rf) self.output_conf = self.conf["output"] self.g = nx.MultiDiGraph() self.sim_name = self.conf["general"]["simulation_name"] def get_graph(self): return self.g def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): """Count number of "hub" accounts by degree """ in_deg = Counter(self.g.in_degree().values()) # in-degree, count out_deg = Counter(self.g.out_degree().values()) # out-degree, count for th in range(min_degree, max_degree + 1, DEGREE_STEP): num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) print("\tNumber of fan-in / fan-out patterns with", th, "or more neighbors:", num_fan_in, "/", num_fan_out) class BaseGraphLoader(__TransactionGraphLoader): def __init__(self, _conf_json): super(BaseGraphLoader, self).__init__(_conf_json) class ResultGraphLoader(__TransactionGraphLoader): def __init__(self, _conf_json): super(ResultGraphLoader, self).__init__(_conf_json) # Create a transaction graph from output files output_dir = os.path.join(self.output_conf["directory"], self.sim_name) acct_file = self.output_conf["accounts"] tx_file = self.output_conf["transactions"] alert_acct_file = self.output_conf["alert_members"] alert_tx_file = self.output_conf["alert_transactions"] acct_path = os.path.join(output_dir, acct_file) tx_path = os.path.join(output_dir, tx_file) self.g = load_result_csv(acct_path, tx_path, self.schema) self.num_normal_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if not flag]) self.num_sar_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if flag]) def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): super(ResultGraphLoader, self).count_hub_accounts(min_degree, max_degree) # Extract the same statistical data for normal and alert account vertices normal_in_deg = Counter([v for k, v in self.g.in_degree().items() if not self.g.node[k][ACCT_SAR]]) normal_out_deg = Counter([v for k, v in self.g.out_degree().items() if not self.g.node[k][ACCT_SAR]]) sar_in_deg = Counter([v for k, v in self.g.in_degree().items() if self.g.node[k][ACCT_SAR]]) sar_out_deg = Counter([v for k, v in self.g.out_degree().items() if self.g.node[k][ACCT_SAR]]) print("Number of fan-in / fan-out patterns for %d normal accounts" % self.num_normal_accts) for th in range(min_degree, max_degree + 1, DEGREE_STEP): num_fan_in = sum([c for d, c in normal_in_deg.items() if d >= th]) num_fan_out = sum([c for d, c in normal_out_deg.items() if d >= th]) ratio_fan_in = num_fan_in / self.num_normal_accts ratio_fan_out = num_fan_out / self.num_normal_accts print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) print("Number of fan-in / fan-out patterns for %d SAR accounts" % self.num_sar_accts) for th in range(min_degree, max_degree + 1, DEGREE_STEP): num_fan_in = sum([c for d, c in sar_in_deg.items() if d >= th]) num_fan_out = sum([c for d, c in sar_out_deg.items() if d >= th]) ratio_fan_in = num_fan_in / self.num_sar_accts ratio_fan_out = num_fan_out / self.num_sar_accts print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) if __name__ == "__main__": argv = sys.argv if len(argv) < 2: print("Usage: python3 %s [ConfJSON]" % argv[0]) exit(1) conf_json = argv[1] # Base transaction network (as input of the simulator) analytics # bgl = BaseGraphLoader(conf_json) # Generated transaction network analysis as the final result rgl = ResultGraphLoader(conf_json) rgl.count_hub_accounts(5, 25)