""" |
Load output files from AMLSim and create NetworkX graph for analytics |
""" |
import os |
import sys |
import csv |
from datetime import datetime, timedelta |
from dateutil.parser import parse |
from collections import Counter |
import networkx as nx |
import json |
ACCT_SAR = "sar" |
TX_AMOUNT = "amount" |
TX_DATE = "date" |
def load_base_csv(acct_csv, tx_csv, schema_data): |
"""Load account and transaction list CSV from the transaction graph generator (before running AMLSim) |
:param acct_csv: Account list CSV |
:param tx_csv: Transaction list CSV |
:param schema_data: Schema data from JSON file |
:return: Base transaction network as a NetworkX graph object |
""" |
return None |
def load_result_csv(acct_csv: str, tx_csv: str, schema_data) -> nx.MultiDiGraph: |
"""Load account list CSV and transaction list CSV from AMLSim and generate transaction graph |
:param acct_csv: Account list CSV |
:param tx_csv: Transaction list CSV |
:param schema_data: Schema data from JSON |
:return: Transaction network as a NetworkX graph object |
""" |
acct_id_idx = None |
acct_sar_idx = None |
tx_src_idx = None |
tx_dst_idx = None |
tx_amt_idx = None |
tx_date_idx = None |
is_date_type = False |
base_date = datetime(1970, 1, 1) |
for idx, col in enumerate(schema_data["account"]): |
data_type = col.get("dataType") |
if data_type == "account_id": |
acct_id_idx = idx |
elif data_type == "sar_flag": |
acct_sar_idx = idx |
for idx, col in enumerate(schema_data["transaction"]): |
data_type = col.get("dataType") |
if data_type == "orig_id": |
tx_src_idx = idx |
elif data_type == "dest_id": |
tx_dst_idx = idx |
elif data_type == "amount": |
tx_amt_idx = idx |
elif data_type == "timestamp": |
tx_date_idx = idx |
is_date_type = col.get("valueType") == "date" |
_g = nx.MultiDiGraph() |
num_accts = 0 |
num_sar = 0 |
num_txs = 0 |
print("Load account list CSV file", acct_csv) |
with open(acct_csv, "r") as rf: |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
acct_id = row[acct_id_idx] |
is_sar = row[acct_sar_idx].lower() == "true" |
attr = {ACCT_SAR: is_sar} |
_g.add_node(acct_id, **attr) |
num_accts += 1 |
if is_sar: |
num_sar += 1 |
print("Number of total accounts: %d" % num_accts) |
print("Number of SAR accounts: %d (%.2f%%)" % (num_sar, num_sar/num_accts*100)) |
print("Loading transaction list CSV file", tx_csv) |
with open(tx_csv, "r") as rf: |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
src_id = row[tx_src_idx] |
dst_id = row[tx_dst_idx] |
amount = float(row[tx_amt_idx]) |
date = parse(row[tx_date_idx]) if is_date_type else base_date + timedelta(int(row[tx_date_idx])) |
date_str = date.strftime("%Y-%m-%d") |
attr = {TX_AMOUNT: amount, TX_DATE: date_str} |
_g.add_edge(src_id, dst_id, **attr) |
num_txs += 1 |
if num_txs % 100000 == 0: |
print("Loaded %d transactions" % num_txs) |
print("Number of transactions: %d" % num_txs) |
return _g |
def load_alert_csv(_g, alert_acct_csv, alert_tx_csv, schema_data): |
"""Load alert member and transaction lists |
""" |
acct_id_idx = None |
class __TransactionGraphLoader: |
def __init__(self, _conf_json): |
with open(_conf_json, "r") as rf: |
self.conf = json.load(rf) |
schema_json = os.path.join(self.conf["input"]["directory"], self.conf["input"]["schema"]) |
with open(schema_json, "r") as rf: |
self.schema = json.load(rf) |
self.output_conf = self.conf["output"] |
self.g = nx.MultiDiGraph() |
self.sim_name = self.conf["general"]["simulation_name"] |
def get_graph(self): |
return self.g |
def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): |
"""Count number of "hub" accounts by degree |
""" |
in_deg = Counter(self.g.in_degree().values()) |
out_deg = Counter(self.g.out_degree().values()) |
for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
print("\tNumber of fan-in / fan-out patterns with", th, "or more neighbors:", num_fan_in, "/", num_fan_out) |
class BaseGraphLoader(__TransactionGraphLoader): |
def __init__(self, _conf_json): |
super(BaseGraphLoader, self).__init__(_conf_json) |
class ResultGraphLoader(__TransactionGraphLoader): |
def __init__(self, _conf_json): |
super(ResultGraphLoader, self).__init__(_conf_json) |
output_dir = os.path.join(self.output_conf["directory"], self.sim_name) |
acct_file = self.output_conf["accounts"] |
tx_file = self.output_conf["transactions"] |
alert_acct_file = self.output_conf["alert_members"] |
alert_tx_file = self.output_conf["alert_transactions"] |
acct_path = os.path.join(output_dir, acct_file) |
tx_path = os.path.join(output_dir, tx_file) |
self.g = load_result_csv(acct_path, tx_path, self.schema) |
self.num_normal_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if not flag]) |
self.num_sar_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if flag]) |
def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): |
super(ResultGraphLoader, self).count_hub_accounts(min_degree, max_degree) |
normal_in_deg = Counter([v for k, v in self.g.in_degree().items() if not self.g.node[k][ACCT_SAR]]) |
normal_out_deg = Counter([v for k, v in self.g.out_degree().items() if not self.g.node[k][ACCT_SAR]]) |
sar_in_deg = Counter([v for k, v in self.g.in_degree().items() if self.g.node[k][ACCT_SAR]]) |
sar_out_deg = Counter([v for k, v in self.g.out_degree().items() if self.g.node[k][ACCT_SAR]]) |
print("Number of fan-in / fan-out patterns for %d normal accounts" % self.num_normal_accts) |
for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
num_fan_in = sum([c for d, c in normal_in_deg.items() if d >= th]) |
num_fan_out = sum([c for d, c in normal_out_deg.items() if d >= th]) |
ratio_fan_in = num_fan_in / self.num_normal_accts |
ratio_fan_out = num_fan_out / self.num_normal_accts |
print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % |
(th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) |
print("Number of fan-in / fan-out patterns for %d SAR accounts" % self.num_sar_accts) |
for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
num_fan_in = sum([c for d, c in sar_in_deg.items() if d >= th]) |
num_fan_out = sum([c for d, c in sar_out_deg.items() if d >= th]) |
ratio_fan_in = num_fan_in / self.num_sar_accts |
ratio_fan_out = num_fan_out / self.num_sar_accts |
print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % |
(th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) |
if __name__ == "__main__": |
argv = sys.argv |
if len(argv) < 2: |
print("Usage: python3 %s [ConfJSON]" % argv[0]) |
exit(1) |
conf_json = argv[1] |
rgl = ResultGraphLoader(conf_json) |
rgl.count_hub_accounts(5, 25) |