""" |
Combine AMLSim outputs into a single dataset |
""" |
import os |
import sys |
from collections import Counter |
import csv |
import json |
import datetime |
from dateutil.parser import parse |
class Schema: |
def __init__(self, json_file, base_date): |
self._base_date = base_date |
with open(json_file, "r") as rf: |
self.data = json.load(rf) |
self.acct_num_cols = None |
self.acct_names = list() |
self.acct_defaults = list() |
self.acct_types = list() |
self.acct_name2idx = dict() |
self.acct_id_idx = None |
self.acct_name_idx = None |
self.acct_balance_idx = None |
self.acct_start_idx = None |
self.acct_end_idx = None |
self.acct_sar_idx = None |
self.acct_model_idx = None |
self.acct_bank_idx = None |
self.tx_num_cols = None |
self.tx_names = list() |
self.tx_defaults = list() |
self.tx_types = list() |
self.tx_name2idx = dict() |
self.tx_id_idx = None |
self.tx_time_idx = None |
self.tx_amount_idx = None |
self.tx_type_idx = None |
self.tx_orig_idx = None |
self.tx_dest_idx = None |
self.tx_sar_idx = None |
self.tx_alert_idx = None |
self.alert_acct_num_cols = None |
self.alert_acct_names = list() |
self.alert_acct_defaults = list() |
self.alert_acct_types = list() |
self.alert_acct_name2idx = dict() |
self.alert_acct_alert_idx = None |
self.alert_acct_reason_idx = None |
self.alert_acct_id_idx = None |
self.alert_acct_name_idx = None |
self.alert_acct_sar_idx = None |
self.alert_acct_model_idx = None |
self.alert_acct_schedule_idx = None |
self.alert_acct_bank_idx = None |
self.alert_tx_num_cols = None |
self.alert_tx_names = list() |
self.alert_tx_defaults = list() |
self.alert_tx_types = list() |
self.alert_tx_name2idx = dict() |
self.alert_tx_alert_idx = None |
self.alert_tx_alert_type_idx = None |
self.alert_tx_sar_idx = None |
self.alert_tx_tx_idx = None |
self.alert_tx_orig_idx = None |
self.alert_tx_dest_idx = None |
self.alert_tx_tx_type_idx = None |
self.alert_tx_amount_idx = None |
self.alert_tx_date_idx = None |
self._parse() |
def _parse(self): |
acct_data = self.data["account"] |
tx_data = self.data["transaction"] |
alert_tx_data = self.data["alert_tx"] |
alert_acct_data = self.data["alert_member"] |
self.acct_num_cols = len(acct_data) |
self.tx_num_cols = len(tx_data) |
self.alert_tx_num_cols = len(alert_tx_data) |
self.alert_acct_num_cols = len(alert_acct_data) |
for idx, col in enumerate(acct_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.acct_names.append(name) |
self.acct_defaults.append(default) |
self.acct_types.append(v_type) |
self.acct_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "account_id": |
self.acct_id_idx = idx |
elif d_type == "account_name": |
self.acct_name_idx = idx |
elif d_type == "initial_balance": |
self.acct_balance_idx = idx |
elif d_type == "start_time": |
self.acct_start_idx = idx |
elif d_type == "end_time": |
self.acct_end_idx = idx |
elif d_type == "sar_flag": |
self.acct_sar_idx = idx |
elif d_type == "model_id": |
self.acct_model_idx = idx |
elif d_type == "bank_id": |
self.acct_bank_idx = idx |
for idx, col in enumerate(tx_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.tx_names.append(name) |
self.tx_defaults.append(default) |
self.tx_types.append(v_type) |
self.tx_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "transaction_id": |
self.tx_id_idx = idx |
elif d_type == "timestamp": |
self.tx_time_idx = idx |
elif d_type == "amount": |
self.tx_amount_idx = idx |
elif d_type == "transaction_type": |
self.tx_type_idx = idx |
elif d_type == "orig_id": |
self.tx_orig_idx = idx |
elif d_type == "dest_id": |
self.tx_dest_idx = idx |
elif d_type == "sar_flag": |
self.tx_sar_idx = idx |
elif d_type == "alert_id": |
self.tx_alert_idx = idx |
for idx, col in enumerate(alert_acct_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.alert_acct_names.append(name) |
self.alert_acct_defaults.append(default) |
self.alert_acct_types.append(v_type) |
self.alert_acct_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "alert_id": |
self.alert_acct_alert_idx = idx |
elif d_type == "alert_type": |
self.alert_acct_reason_idx = idx |
elif d_type == "account_id": |
self.alert_acct_id_idx = idx |
elif d_type == "account_name": |
self.alert_acct_name_idx = idx |
elif d_type == "sar_flag": |
self.alert_acct_sar_idx = idx |
elif d_type == "model_id": |
self.alert_acct_model_idx = idx |
elif d_type == "schedule_id": |
self.alert_acct_schedule_idx = idx |
elif d_type == "bank_id": |
self.alert_acct_bank_idx = idx |
for idx, col in enumerate(alert_tx_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.alert_tx_names.append(name) |
self.alert_tx_defaults.append(default) |
self.alert_tx_types.append(v_type) |
self.alert_tx_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "alert_id": |
self.alert_tx_alert_idx = idx |
elif d_type == "alert_type": |
self.alert_tx_alert_type_idx = idx |
elif d_type == "sar_flag": |
self.alert_tx_sar_idx = idx |
elif d_type == "transaction_id": |
self.alert_tx_tx_idx = idx |
elif d_type == "orig_id": |
self.alert_tx_orig_idx = idx |
elif d_type == "dest_id": |
self.alert_tx_dest_idx = idx |
elif d_type == "transaction_type": |
self.alert_tx_tx_type_idx = idx |
elif d_type == "amount": |
self.alert_tx_amount_idx = idx |
elif d_type == "timestamp": |
self.alert_tx_date_idx = idx |
def days2date(self, _days): |
"""Get date as ISO 8601 format from days from the "base_date". If failed, return an empty string. |
:param _days: Days from the "base_date" |
:return: Date as ISO 8601 format |
""" |
try: |
num_days = int(_days) |
except ValueError: |
return "" |
dt = self._base_date + datetime.timedelta(num_days) |
return dt.isoformat() + "Z" |
def get_acct_row(self, acct_id, acct_name, init_balance, start_str, end_str, is_sar, model_id, bank_id, **attr): |
row = list(self.acct_defaults) |
row[self.acct_id_idx] = acct_id |
row[self.acct_name_idx] = acct_name |
row[self.acct_balance_idx] = init_balance |
try: |
start = int(start_str) |
if start >= 0: |
row[self.acct_start_idx] = start |
except ValueError: |
pass |
try: |
end = int(end_str) |
if end > 0: |
row[self.acct_end_idx] = end |
except ValueError: |
pass |
row[self.acct_sar_idx] = is_sar |
row[self.acct_model_idx] = model_id |
row[self.acct_bank_idx] = bank_id |
for name, value in attr.items(): |
if name in self.acct_name2idx: |
idx = self.acct_name2idx[name] |
row[idx] = value |
return row |
def get_tx_row(self, _tx_id, _timestamp, _amount, _tx_type, _orig, _dest, _is_sar, _alert_id, **attr): |
row = list(self.tx_defaults) |
row[self.tx_id_idx] = _tx_id |
row[self.tx_time_idx] = _timestamp |
row[self.tx_amount_idx] = _amount |
row[self.tx_type_idx] = _tx_type |
row[self.tx_orig_idx] = _orig |
row[self.tx_dest_idx] = _dest |
row[self.tx_sar_idx] = _is_sar |
row[self.tx_alert_idx] = _alert_id |
for name, value in attr.items(): |
if name in self.tx_name2idx: |
idx = self.tx_name2idx[name] |
row[idx] = value |
return row |
def get_alert_acct_row(self, _alert_id, _reason, _acct_id, _acct_name, _is_sar, |
_model_id, _schedule_id, _bank_id, **attr): |
row = list(self.alert_acct_defaults) |
row[self.alert_acct_alert_idx] = _alert_id |
row[self.alert_acct_reason_idx] = _reason |
row[self.alert_acct_id_idx] = _acct_id |
row[self.alert_acct_name_idx] = _acct_name |
row[self.alert_acct_sar_idx] = _is_sar |
row[self.alert_acct_model_idx] = _model_id |
row[self.alert_acct_schedule_idx] = _schedule_id |
row[self.alert_acct_bank_idx] = _bank_id |
for name, value in attr.items(): |
if name in self.alert_acct_name2idx: |
idx = self.alert_acct_name2idx[name] |
row[idx] = value |
return row |
def get_alert_tx_row(self, _alert_id, _alert_type, _is_sar, _tx_id, _orig, _dest, |
_tx_type, _amount, _timestamp, **attr): |
row = list(self.alert_tx_defaults) |
row[self.alert_tx_alert_idx] = _alert_id |
row[self.alert_tx_alert_type_idx] = _alert_type |
row[self.alert_tx_sar_idx] = _is_sar |
row[self.alert_tx_tx_idx] = _tx_id |
row[self.alert_tx_orig_idx] = _orig |
row[self.alert_tx_dest_idx] = _dest |
row[self.alert_tx_tx_type_idx] = _tx_type |
row[self.alert_tx_amount_idx] = _amount |
row[self.alert_tx_date_idx] = _timestamp |
for name, value in attr.items(): |
if name in self.alert_tx_name2idx: |
idx = self.alert_tx_name2idx[name] |
row[idx] = value |
return row |
def load_output_conf_json(conf_json): |
with open(conf_json, "r") as rf: |
conf = json.load(rf) |
out_dir = os.path.join(conf["output"]["directory"], conf["general"]["simulation_name"]) |
acct_file = conf["output"]["accounts"] |
tx_file = conf["output"]["transactions"] |
cash_file = conf["output"]["cash_transactions"] |
alert_acct_file = conf["output"]["alert_members"] |
alert_tx_file = conf["output"]["alert_transactions"] |
acct_path = os.path.join(out_dir, acct_file) |
tx_path = os.path.join(out_dir, tx_file) |
cash_path = os.path.join(out_dir, cash_file) |
alert_acct_path = os.path.join(out_dir, alert_acct_file) |
alert_tx_path = os.path.join(out_dir, alert_tx_file) |
schema_path = os.path.join(conf["input"]["directory"], conf["input"]["schema"]) |
base_date = parse(conf["general"]["base_date"]) |
schema = Schema(schema_path, base_date) |
return acct_path, tx_path, cash_path, alert_acct_path, alert_tx_path, schema |
def load_input_conf_json(conf_json): |
with open(conf_json, "r") as rf: |
conf = json.load(rf) |
in_dir = conf["input"]["directory"] |
acct_file = conf["input"]["accounts"] |
alert_file = conf["input"]["alert_patterns"] |
deg_file = conf["input"]["degree"] |
tx_file = conf["input"]["transaction_type"] |
acct_path = os.path.join(in_dir, acct_file) |
alert_path = os.path.join(in_dir, alert_file) |
deg_path = os.path.join(in_dir, deg_file) |
tx_path = os.path.join(in_dir, tx_file) |
return acct_path, alert_path, deg_path, tx_path |
class Combiner: |
def __init__(self, out_conf_json, out_sim_name=None): |
self.last_acct_id = 0 |
self.last_tx_id = 0 |
self.last_alert_id = 0 |
with open(out_conf_json, "r") as rf: |
out_conf = json.load(rf) |
in_dir = out_conf["input"]["directory"] |
os.makedirs(in_dir, exist_ok=True) |
self.in_acct_path, self.in_alert_path, self.in_deg_path, self.in_tx_path = load_input_conf_json(out_conf_json) |
with open(self.in_acct_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(["count", "min_balance", "max_balance", "start_day", "end_day", |
"country", "business_type", "model", "bank_id"]) |
with open(self.in_alert_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(["count", "type", "schedule_id", "min_accounts", "max_accounts", |
"min_amount", "max_amount", "min_period", "max_period", "bank_id", "is_sar"]) |
with open(self.in_tx_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(["Type", "Frequency"]) |
self.in_deg = Counter() |
self.out_deg = Counter() |
if out_sim_name is None: |
out_sim_name = out_conf["general"]["simulation_name"] |
out_dir = os.path.join(out_conf["output"]["directory"], out_sim_name) |
os.makedirs(out_dir, exist_ok=True) |
self.out_acct_path, self.out_tx_path, self.out_cash_path, \ |
self.out_alert_acct_path, self.out_alert_tx_path, self.out_schema = load_output_conf_json(out_conf_json) |
with open(self.out_acct_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(self.out_schema.acct_names) |
with open(self.out_tx_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(self.out_schema.tx_names) |
with open(self.out_cash_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(self.out_schema.tx_names) |
with open(self.out_alert_acct_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(self.out_schema.alert_acct_names) |
with open(self.out_alert_tx_path, "w") as wf: |
writer = csv.writer(wf, lineterminator='\n') |
writer.writerow(self.out_schema.alert_tx_names) |
def append_input_data(self, in_conf_json): |
in_acct_path, in_alert_path, in_deg_path, in_tx_path = load_input_conf_json(in_conf_json) |
wf = open(self.in_acct_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_acct_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
writer.writerow(row) |
rf.close() |
wf.close() |
wf = open(self.in_alert_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_alert_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
writer.writerow(row) |
rf.close() |
wf.close() |
with open(in_deg_path, "r") as rf: |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
deg = int(row[0]) |
in_num = int(row[1]) |
out_num = int(row[2]) |
self.in_deg[deg] += in_num |
self.out_deg[deg] += out_num |
wf = open(self.in_tx_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_tx_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
writer.writerow(row) |
rf.close() |
wf.close() |
def write_degrees(self): |
degrees = sorted(set(self.in_deg.keys()) | set(self.out_deg.keys())) |
with open(self.in_deg_path, "w") as wf: |
writer = csv.writer(wf) |
writer.writerow(["Count", "In-degree", "Out-degree"]) |
for d in degrees: |
writer.writerow([d, self.in_deg[d], self.out_deg[d]]) |
def append_output_data(self, in_conf_json): |
in_acct_path, in_tx_path, in_cash_path, in_alert_acct_path, in_alert_tx_path, in_schema = \ |
load_output_conf_json(in_conf_json) |
max_acct_id = 0 |
max_tx_id = 0 |
max_alert_id = 0 |
wf = open(self.out_acct_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_acct_path, "r") |
reader = csv.reader(rf) |
id_idx = in_schema.acct_id_idx |
name_idx = in_schema.acct_name_idx |
balance_idx = in_schema.acct_balance_idx |
start_idx = in_schema.acct_start_idx |
end_idx = in_schema.acct_end_idx |
sar_idx = in_schema.acct_sar_idx |
model_idx = in_schema.acct_model_idx |
bank_idx = in_schema.acct_bank_idx |
next(reader) |
for row in reader: |
acct_id = int(row[id_idx]) + self.last_acct_id |
max_acct_id = max(max_acct_id, acct_id) |
acct_name = row[name_idx] |
balance = row[balance_idx] |
start = row[start_idx] |
end = row[end_idx] |
is_sar = row[sar_idx] |
model = row[model_idx] |
bank_id = row[bank_idx] |
output_row = self.out_schema.get_acct_row(acct_id, acct_name, balance, start, end, is_sar, model, bank_id) |
writer.writerow(output_row) |
rf.close() |
wf.close() |
id_idx = in_schema.tx_id_idx |
orig_idx = in_schema.tx_orig_idx |
bene_idx = in_schema.tx_dest_idx |
date_idx = in_schema.tx_time_idx |
amt_idx = in_schema.tx_amount_idx |
type_idx = in_schema.tx_type_idx |
sar_idx = in_schema.tx_sar_idx |
alert_idx = in_schema.tx_alert_idx |
wf = open(self.out_tx_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_tx_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
tx_id = int(row[id_idx]) + self.last_tx_id |
max_tx_id = max(max_tx_id, tx_id) |
orig_id = int(row[orig_idx]) + self.last_acct_id |
bene_id = int(row[bene_idx]) + self.last_acct_id |
date = row[date_idx] |
amount = row[amt_idx] |
tx_type = row[type_idx] |
is_sar = row[sar_idx] |
alert_id = int(row[alert_idx]) |
if alert_id >= 0 and self.last_alert_id is not None: |
alert_id += self.last_alert_id |
max_alert_id = max(max_alert_id, alert_id) |
output_row = self.out_schema.get_tx_row(tx_id, date, amount, tx_type, orig_id, bene_id, is_sar, alert_id) |
writer.writerow(output_row) |
rf.close() |
wf.close() |
wf = open(self.out_cash_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_cash_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
tx_id = int(row[id_idx]) + self.last_tx_id |
max_tx_id = max(max_tx_id, tx_id) |
orig_id = int(row[orig_idx]) + self.last_acct_id |
bene_id = int(row[bene_idx]) + self.last_acct_id |
date = row[date_idx] |
amount = row[amt_idx] |
tx_type = row[type_idx] |
is_sar = row[sar_idx] |
alert_id = int(row[alert_idx]) |
if alert_id >= 0 and self.last_alert_id is not None: |
alert_id += self.last_alert_id |
max_alert_id = max(max_alert_id, alert_id) |
output_row = self.out_schema.get_tx_row(tx_id, date, amount, tx_type, orig_id, bene_id, is_sar, alert_id) |
writer.writerow(output_row) |
rf.close() |
wf.close() |
alert_idx = in_schema.alert_acct_alert_idx |
acct_idx = in_schema.alert_acct_id_idx |
reason_idx = in_schema.alert_acct_reason_idx |
sar_idx = in_schema.alert_acct_sar_idx |
model_idx = in_schema.alert_acct_model_idx |
schedule_idx = in_schema.alert_acct_schedule_idx |
bank_idx = in_schema.alert_acct_bank_idx |
wf = open(self.out_alert_acct_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_alert_acct_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
if self.last_alert_id is None: |
writer.writerow(self.out_schema.alert_acct_names) |
self.last_alert_id = 0 |
alert_id = int(row[alert_idx]) + self.last_alert_id |
reason = row[reason_idx] |
acct_id = int(row[acct_idx]) + self.last_acct_id |
is_sar = row[sar_idx] |
model = row[model_idx] |
schedule = row[schedule_idx] |
bank_id = row[bank_idx] |
output_row = self.out_schema.get_alert_acct_row(alert_id, reason, acct_id, acct_id, is_sar, model, |
schedule, bank_id) |
writer.writerow(output_row) |
rf.close() |
wf.close() |
alert_idx = in_schema.alert_tx_alert_idx |
type_idx = in_schema.alert_tx_alert_type_idx |
sar_idx = in_schema.alert_tx_sar_idx |
tx_idx = in_schema.alert_tx_tx_idx |
orig_idx = in_schema.alert_tx_orig_idx |
bene_idx = in_schema.alert_tx_dest_idx |
tx_type_idx = in_schema.alert_tx_tx_type_idx |
amt_idx = in_schema.alert_tx_amount_idx |
date_idx = in_schema.alert_tx_date_idx |
wf = open(self.out_alert_tx_path, "a") |
writer = csv.writer(wf, lineterminator='\n') |
rf = open(in_alert_tx_path, "r") |
reader = csv.reader(rf) |
next(reader) |
for row in reader: |
alert_id = int(row[alert_idx]) + self.last_alert_id |
max_alert_id = max(max_alert_id, alert_id) |
alert_type = row[type_idx] |
is_sar = row[sar_idx] |
tx_id = int(row[tx_idx]) + self.last_tx_id |
orig_id = int(row[orig_idx]) + self.last_acct_id |
bene_id = int(row[bene_idx]) + self.last_acct_id |
tx_type = row[tx_type_idx] |
amount = row[amt_idx] |
date = row[date_idx] |
output_row = self.out_schema.get_alert_tx_row(alert_id, alert_type, is_sar, tx_id, orig_id, bene_id, |
tx_type, amount, date) |
writer.writerow(output_row) |
rf.close() |
wf.close() |
self.last_acct_id = (max_acct_id + 1) |
self.last_tx_id = (max_tx_id + 1) |
self.last_alert_id = (max_alert_id + 1) |
if __name__ == "__main__": |
argv = sys.argv |
argc = len(argv) |
if argc < 4 or argc % 2 == 0: |
print("Usage: python3 %s [OutputConfJSON] [OutputSimName] ([InputConfJSON] [Repetitions]...)" % argv[0]) |
exit(1) |
_conf_json = argv[1] |
_sim_name = argv[2] |
com = Combiner(_conf_json, _sim_name) |
for i in range(3, argc, 2): |
_in_conf_json = argv[i] |
_rep = int(argv[i+1]) |
for j in range(_rep): |
print("Loading %s: %d/%d" % (_in_conf_json, j, _rep)) |
com.append_input_data(_in_conf_json) |
com.append_output_data(_in_conf_json) |
com.write_degrees() |