|
import csv |
|
import json |
|
import sys |
|
import os |
|
import shutil |
|
import datetime |
|
from dateutil.parser import parse |
|
from random import random |
|
from collections import defaultdict, Counter |
|
|
|
from amlsim.account_data_type_lookup import AccountDataTypeLookup |
|
from faker import Faker |
|
import numpy as np |
|
|
|
|
|
def days_to_date(days): |
|
date = datetime.datetime(2017, 1, 1) + datetime.timedelta(days=days) |
|
return date.strftime("%Y%m%d") |
|
|
|
|
|
def get_simulator_name(csv_file): |
|
"""Convert log file name to the simulator name |
|
:param csv_file: Transaction log file name |
|
:return: Simulator name |
|
""" |
|
elements = csv_file.split("_") |
|
return "_".join(elements[:4]) |
|
|
|
|
|
def get_name(acct_id): |
|
return "Account" + str(acct_id) |
|
|
|
|
|
def get_bank(acct_id): |
|
return "Bank" + str(acct_id) |
|
|
|
|
|
CASH_TYPES = {"CASH-IN", "CASH-OUT"} |
|
|
|
|
|
class AMLTypology: |
|
"""Suspicious transaction and account group |
|
""" |
|
|
|
def __init__(self, reason): |
|
self.is_sar = False |
|
self.main_acct = None |
|
self.reason = reason |
|
self.transactions = dict() |
|
self.members = set() |
|
self.recorded_members = set() |
|
self.total_amount = 0.0 |
|
self.count = 0 |
|
|
|
def add_member(self, member, is_sar): |
|
self.members.add(member) |
|
if is_sar: |
|
self.is_sar = True |
|
self.main_acct = member |
|
|
|
def add_tx(self, tx_id, amount, days, orig_acct, dest_acct, orig_name, dest_name, attr): |
|
self.transactions[tx_id] = (amount, days, orig_acct, dest_acct, orig_name, dest_name, attr) |
|
self.total_amount += amount |
|
self.count += 1 |
|
|
|
def get_reason(self): |
|
return self.reason |
|
|
|
def get_start_date(self): |
|
min_days = min([tx[1] for tx in self.transactions.values()]) |
|
return days_to_date(min_days) |
|
|
|
def get_end_date(self): |
|
max_days = max([tx[1] for tx in self.transactions.values()]) |
|
return days_to_date(max_days) |
|
|
|
|
|
class Schema: |
|
def __init__(self, data, base_date): |
|
self._base_date = base_date |
|
|
|
self.data = data |
|
|
|
self.acct_num_cols = None |
|
self.acct_names = list() |
|
self.acct_defaults = list() |
|
self.acct_types = list() |
|
self.acct_name2idx = dict() |
|
self.acct_id_idx = None |
|
self.acct_name_idx = None |
|
self.acct_balance_idx = None |
|
self.acct_start_idx = None |
|
self.acct_end_idx = None |
|
self.acct_sar_idx = None |
|
self.acct_model_idx = None |
|
self.acct_bank_idx = None |
|
|
|
self.tx_num_cols = None |
|
self.tx_names = list() |
|
self.tx_defaults = list() |
|
self.tx_types = list() |
|
self.tx_name2idx = dict() |
|
self.tx_id_idx = None |
|
self.tx_time_idx = None |
|
self.tx_amount_idx = None |
|
self.tx_type_idx = None |
|
self.tx_orig_idx = None |
|
self.tx_dest_idx = None |
|
self.tx_sar_idx = None |
|
self.tx_alert_idx = None |
|
|
|
self.alert_acct_num_cols = None |
|
self.alert_acct_names = list() |
|
self.alert_acct_defaults = list() |
|
self.alert_acct_types = list() |
|
self.alert_acct_name2idx = dict() |
|
self.alert_acct_alert_idx = None |
|
self.alert_acct_reason_idx = None |
|
self.alert_acct_id_idx = None |
|
self.alert_acct_name_idx = None |
|
self.alert_acct_sar_idx = None |
|
self.alert_acct_model_idx = None |
|
self.alert_acct_schedule_idx = None |
|
self.alert_acct_bank_idx = None |
|
|
|
self.alert_tx_num_cols = None |
|
self.alert_tx_names = list() |
|
self.alert_tx_defaults = list() |
|
self.alert_tx_types = list() |
|
self.alert_tx_name2idx = dict() |
|
self.alert_tx_id_idx = None |
|
self.alert_tx_type_idx = None |
|
self.alert_tx_sar_idx = None |
|
self.alert_tx_idx = None |
|
self.alert_tx_orig_idx = None |
|
self.alert_tx_dest_idx = None |
|
self.alert_tx_tx_type_idx = None |
|
self.alert_tx_amount_idx = None |
|
self.alert_tx_time_idx = None |
|
|
|
self.party_ind_num_cols = None |
|
self.party_ind_names = list() |
|
self.party_ind_defaults = list() |
|
self.party_ind_types = list() |
|
self.party_ind_name2idx = dict() |
|
self.party_ind_id_idx = None |
|
|
|
self.party_org_num_cols = None |
|
self.party_org_names = list() |
|
self.party_org_defaults = list() |
|
self.party_org_types = list() |
|
self.party_org_name2idx = dict() |
|
self.party_org_id_idx = None |
|
|
|
self.acct_party_num_cols = None |
|
self.acct_party_names = list() |
|
self.acct_party_defaults = list() |
|
self.acct_party_types = list() |
|
self.acct_party_name2idx = dict() |
|
self.acct_party_mapping_idx = None |
|
self.acct_party_acct_idx = None |
|
self.acct_party_party_idx = None |
|
|
|
self.party_party_num_cols = None |
|
self.party_party_names = list() |
|
self.party_party_defaults = list() |
|
self.party_party_types = list() |
|
self.party_party_name2idx = dict() |
|
self.party_party_ref_idx = None |
|
self.party_party_first_idx = None |
|
self.party_party_second_idx = None |
|
self._parse() |
|
|
|
def _parse(self): |
|
acct_data = self.data["account"] |
|
tx_data = self.data["transaction"] |
|
alert_tx_data = self.data["alert_tx"] |
|
alert_acct_data = self.data["alert_member"] |
|
party_ind_data = self.data["party_individual"] |
|
party_org_data = self.data["party_organization"] |
|
acct_party_data = self.data["account_mapping"] |
|
party_party_data = self.data["resolved_entities"] |
|
|
|
self.acct_num_cols = len(acct_data) |
|
self.tx_num_cols = len(tx_data) |
|
self.alert_tx_num_cols = len(alert_tx_data) |
|
self.alert_acct_num_cols = len(alert_acct_data) |
|
self.party_ind_num_cols = len(party_ind_data) |
|
self.party_org_num_cols = len(party_org_data) |
|
self.acct_party_num_cols = len(acct_party_data) |
|
self.party_party_num_cols = len(party_party_data) |
|
|
|
|
|
for idx, col in enumerate(acct_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.acct_names.append(name) |
|
self.acct_defaults.append(default) |
|
self.acct_types.append(v_type) |
|
|
|
|
|
for idx, col in enumerate(tx_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.tx_names.append(name) |
|
self.tx_defaults.append(default) |
|
self.tx_types.append(v_type) |
|
self.tx_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "transaction_id": |
|
self.tx_id_idx = idx |
|
elif d_type == "timestamp": |
|
self.tx_time_idx = idx |
|
elif d_type == "amount": |
|
self.tx_amount_idx = idx |
|
elif d_type == "transaction_type": |
|
self.tx_type_idx = idx |
|
elif d_type == "orig_id": |
|
self.tx_orig_idx = idx |
|
elif d_type == "dest_id": |
|
self.tx_dest_idx = idx |
|
elif d_type == "sar_flag": |
|
self.tx_sar_idx = idx |
|
elif d_type == "alert_id": |
|
self.tx_alert_idx = idx |
|
|
|
|
|
for idx, col in enumerate(alert_acct_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.alert_acct_names.append(name) |
|
self.alert_acct_defaults.append(default) |
|
self.alert_acct_types.append(v_type) |
|
self.alert_acct_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "alert_id": |
|
self.alert_acct_alert_idx = idx |
|
elif d_type == "alert_type": |
|
self.alert_acct_reason_idx = idx |
|
elif d_type == "account_id": |
|
self.alert_acct_id_idx = idx |
|
elif d_type == "account_name": |
|
self.alert_acct_name_idx = idx |
|
elif d_type == "sar_flag": |
|
self.alert_acct_sar_idx = idx |
|
elif d_type == "model_id": |
|
self.alert_acct_model_idx = idx |
|
elif d_type == "schedule_id": |
|
self.alert_acct_schedule_idx = idx |
|
elif d_type == "bank_id": |
|
self.alert_acct_bank_idx = idx |
|
|
|
|
|
for idx, col in enumerate(alert_tx_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.alert_tx_names.append(name) |
|
self.alert_tx_defaults.append(default) |
|
self.alert_tx_types.append(v_type) |
|
self.alert_tx_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "alert_id": |
|
self.alert_tx_id_idx = idx |
|
elif d_type == "alert_type": |
|
self.alert_tx_type_idx = idx |
|
elif d_type == "sar_flag": |
|
self.alert_tx_sar_idx = idx |
|
elif d_type == "transaction_id": |
|
self.alert_tx_idx = idx |
|
elif d_type == "orig_id": |
|
self.alert_tx_orig_idx = idx |
|
elif d_type == "dest_id": |
|
self.alert_tx_dest_idx = idx |
|
elif d_type == "transaction_type": |
|
self.alert_tx_tx_type_idx = idx |
|
elif d_type == "amount": |
|
self.alert_tx_amount_idx = idx |
|
elif d_type == "timestamp": |
|
self.alert_tx_time_idx = idx |
|
|
|
|
|
for idx, col in enumerate(party_ind_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.party_ind_names.append(name) |
|
self.party_ind_defaults.append(default) |
|
self.party_ind_types.append(v_type) |
|
self.party_ind_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "party_id": |
|
self.party_ind_id_idx = idx |
|
|
|
|
|
for idx, col in enumerate(party_org_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.party_org_names.append(name) |
|
self.party_org_defaults.append(default) |
|
self.party_org_types.append(v_type) |
|
self.party_org_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "party_id": |
|
self.party_org_id_idx = idx |
|
|
|
|
|
for idx, col in enumerate(acct_party_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.acct_party_names.append(name) |
|
self.acct_party_defaults.append(default) |
|
self.acct_party_types.append(v_type) |
|
self.acct_party_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "mapping_id": |
|
self.acct_party_mapping_idx = idx |
|
elif d_type == "account_id": |
|
self.acct_party_acct_idx = idx |
|
elif d_type == "party_id": |
|
self.acct_party_party_idx = idx |
|
|
|
|
|
for idx, col in enumerate(party_party_data): |
|
name = col["name"] |
|
v_type = col.get("valueType", "string") |
|
d_type = col.get("dataType") |
|
default = col.get("defaultValue", "") |
|
|
|
self.party_party_names.append(name) |
|
self.party_party_defaults.append(default) |
|
self.party_party_types.append(v_type) |
|
self.party_party_name2idx[name] = idx |
|
|
|
if d_type is None: |
|
continue |
|
if d_type == "ref_id": |
|
self.party_party_ref_idx = idx |
|
elif d_type == "first_id": |
|
self.party_party_first_idx = idx |
|
elif d_type == "second_id": |
|
self.party_party_second_idx = idx |
|
|
|
def days2date(self, _days): |
|
"""Get date as ISO 8601 format from days from the "base_date". If failed, return an empty string. |
|
:param _days: Days from the "base_date" |
|
:return: Date as ISO 8601 format |
|
""" |
|
try: |
|
num_days = int(_days) |
|
except ValueError: |
|
return "" |
|
dt = self._base_date + datetime.timedelta(num_days) |
|
return dt.isoformat() + "Z" |
|
|
|
|
|
def get_tx_row(self, _tx_id, _timestamp, _amount, _tx_type, _orig, _dest, _is_sar, _alert_id, **attr): |
|
row = list(self.tx_defaults) |
|
row[self.tx_id_idx] = _tx_id |
|
row[self.tx_time_idx] = _timestamp |
|
row[self.tx_amount_idx] = _amount |
|
row[self.tx_type_idx] = _tx_type |
|
row[self.tx_orig_idx] = _orig |
|
row[self.tx_dest_idx] = _dest |
|
row[self.tx_sar_idx] = _is_sar |
|
row[self.tx_alert_idx] = _alert_id |
|
|
|
for name, value in attr.items(): |
|
if name in self.tx_name2idx: |
|
idx = self.tx_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.tx_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
def get_alert_acct_row(self, _alert_id, _reason, _acct_id, _acct_name, _is_sar, |
|
_model_id, _schedule_id, _bank_id, **attr): |
|
row = list(self.alert_acct_defaults) |
|
row[self.alert_acct_alert_idx] = _alert_id |
|
row[self.alert_acct_reason_idx] = _reason |
|
row[self.alert_acct_id_idx] = _acct_id |
|
row[self.alert_acct_name_idx] = _acct_name |
|
row[self.alert_acct_sar_idx] = _is_sar |
|
row[self.alert_acct_model_idx] = _model_id |
|
row[self.alert_acct_schedule_idx] = _schedule_id |
|
row[self.alert_acct_bank_idx] = _bank_id |
|
|
|
for name, value in attr.items(): |
|
if name in self.alert_acct_name2idx: |
|
idx = self.alert_acct_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.alert_acct_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
def get_alert_tx_row(self, _alert_id, _alert_type, _is_sar, _tx_id, _orig, _dest, |
|
_tx_type, _amount, _timestamp, **attr): |
|
row = list(self.alert_tx_defaults) |
|
row[self.alert_tx_id_idx] = _alert_id |
|
row[self.alert_tx_type_idx] = _alert_type |
|
row[self.alert_tx_sar_idx] = _is_sar |
|
row[self.alert_tx_idx] = _tx_id |
|
row[self.alert_tx_orig_idx] = _orig |
|
row[self.alert_tx_dest_idx] = _dest |
|
row[self.alert_tx_tx_type_idx] = _tx_type |
|
row[self.alert_tx_amount_idx] = _amount |
|
row[self.alert_tx_time_idx] = _timestamp |
|
|
|
for name, value in attr.items(): |
|
if name in self.alert_tx_name2idx: |
|
idx = self.alert_tx_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.alert_tx_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
def get_party_ind_row(self, _party_id, **attr): |
|
row = list(self.party_ind_defaults) |
|
row[self.party_ind_id_idx] = _party_id |
|
|
|
for name, value in attr.items(): |
|
if name in self.party_ind_name2idx: |
|
idx = self.party_ind_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.party_ind_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
def get_party_org_row(self, _party_id, **attr): |
|
row = list(self.party_org_defaults) |
|
row[self.party_org_id_idx] = _party_id |
|
|
|
for name, value in attr.items(): |
|
if name in self.party_org_name2idx: |
|
idx = self.party_org_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.party_org_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
def get_acct_party_row(self, _mapping_id, _acct_id, _party_id, **attr): |
|
row = list(self.acct_party_defaults) |
|
row[self.acct_party_mapping_idx] = _mapping_id |
|
row[self.acct_party_acct_idx] = _acct_id |
|
row[self.acct_party_party_idx] = _party_id |
|
|
|
for name, value in attr.items(): |
|
if name in self.acct_party_name2idx: |
|
idx = self.acct_party_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.acct_party_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
def get_party_party_row(self, _ref_id, _first_id, _second_id, **attr): |
|
row = list(self.party_party_defaults) |
|
row[self.party_party_ref_idx] = _ref_id |
|
row[self.party_party_first_idx] = _first_id |
|
row[self.party_party_second_idx] = _second_id |
|
|
|
for name, value in attr.items(): |
|
if name in self.party_party_name2idx: |
|
idx = self.party_party_name2idx[name] |
|
row[idx] = value |
|
|
|
for idx, v_type in enumerate(self.party_party_types): |
|
if v_type == "date": |
|
row[idx] = self.days2date(row[idx]) |
|
return row |
|
|
|
|
|
class LogConverter: |
|
|
|
def __init__(self, conf, sim_name=None, fake=None): |
|
self.reports = dict() |
|
self.org_types = dict() |
|
|
|
self.fake = fake |
|
|
|
general_conf = conf.get('general', {}) |
|
input_conf = conf.get('temporal', {}) |
|
output_conf = conf.get('output', {}) |
|
|
|
|
|
|
|
|
|
self.sim_name = sim_name if sim_name is not None else general_conf.get("simulation_name", "sample") |
|
print("Simulation name:", self.sim_name) |
|
|
|
self.input_dir = os.path.join(input_conf.get('directory', ''), self.sim_name) |
|
self.work_dir = os.path.join(output_conf.get('directory', ''), self.sim_name) |
|
if not os.path.isdir(self.work_dir): |
|
os.makedirs(self.work_dir) |
|
|
|
param_dir = conf.get('input', {}).get('directory', '') |
|
schema_file = conf.get('input', {}).get('schema', '') |
|
base_date_str = general_conf.get('base_date', '2017-01-01') |
|
base_date = parse(base_date_str) |
|
|
|
json_file = os.path.join(param_dir, schema_file) |
|
with open(json_file, "r") as rf: |
|
data = json.load(rf) |
|
self.schema = Schema(data, base_date) |
|
|
|
|
|
self.log_file = os.path.join(self.work_dir, output_conf["transaction_log"]) |
|
self.in_acct_file = input_conf["accounts"] |
|
self.group_file = input_conf["alert_members"] |
|
|
|
|
|
self.out_acct_file = output_conf["accounts"] |
|
self.tx_file = output_conf["transactions"] |
|
self.cash_tx_file = output_conf["cash_transactions"] |
|
self.sar_acct_file = output_conf["sar_accounts"] |
|
self.alert_tx_file = output_conf["alert_transactions"] |
|
self.alert_acct_file = output_conf["alert_members"] |
|
|
|
self.party_individual_file = output_conf["party_individuals"] |
|
self.party_organization_file = output_conf["party_organizations"] |
|
self.account_mapping_file = output_conf["account_mapping"] |
|
self.resolved_entities_file = output_conf["resolved_entities"] |
|
|
|
|
|
dia_log = output_conf["diameter_log"] |
|
src_dia_path = os.path.join(self.input_dir, dia_log) |
|
dst_dia_path = os.path.join(self.work_dir, dia_log) |
|
if os.path.exists(src_dia_path): |
|
shutil.copy(src_dia_path, dst_dia_path) |
|
|
|
def convert_acct_tx(self): |
|
print("Convert transaction list from %s to %s, %s and %s" % ( |
|
self.log_file, self.tx_file, self.cash_tx_file, self.alert_tx_file)) |
|
|
|
in_acct_f = open(os.path.join(self.input_dir, self.in_acct_file), "r") |
|
in_tx_f = open(self.log_file, "r") |
|
|
|
out_acct_f = open(os.path.join(self.work_dir, self.out_acct_file), "w") |
|
out_tx_f = open(os.path.join(self.work_dir, self.tx_file), "w") |
|
out_cash_tx_f = open(os.path.join(self.work_dir, self.cash_tx_file), "w") |
|
out_alert_tx_f = open(os.path.join(self.work_dir, self.alert_tx_file), "w") |
|
|
|
out_ind_f = open(os.path.join(self.work_dir, self.party_individual_file), "w") |
|
out_org_f = open(os.path.join(self.work_dir, self.party_organization_file), "w") |
|
out_map_f = open(os.path.join(self.work_dir, self.account_mapping_file), "w") |
|
out_ent_f = open(os.path.join(self.work_dir, self.resolved_entities_file), "w") |
|
|
|
|
|
reader = csv.reader(in_acct_f) |
|
acct_writer = csv.writer(out_acct_f) |
|
acct_writer.writerow(self.schema.acct_names) |
|
|
|
ind_writer = csv.writer(out_ind_f) |
|
ind_writer.writerow(self.schema.party_ind_names) |
|
org_writer = csv.writer(out_org_f) |
|
org_writer.writerow(self.schema.party_org_names) |
|
map_writer = csv.writer(out_map_f) |
|
map_writer.writerow(self.schema.acct_party_names) |
|
ent_writer = csv.writer(out_ent_f) |
|
ent_writer.writerow(self.schema.party_party_names) |
|
|
|
header = next(reader) |
|
|
|
mapping_id = 1 |
|
|
|
lookup = AccountDataTypeLookup() |
|
us_gen = self.fake['en_US'] |
|
|
|
for row in reader: |
|
output_row = list(self.schema.acct_defaults) |
|
|
|
acct_type = "" |
|
acct_id = "" |
|
|
|
gender = np.random.choice(['Male', 'Female'], p=[0.5, 0.5]) |
|
|
|
good_address = False |
|
while good_address == False: |
|
address = us_gen.address() |
|
split1 = address.split('\n') |
|
street_address = split1[0] |
|
split2 = split1[1].split(', ') |
|
if len(split2) == 2: |
|
good_address = True |
|
|
|
city = split2[0] |
|
split3 = split2[1].split(' ') |
|
state = split3[0] |
|
postcode = split3[1] |
|
|
|
|
|
for output_index, output_item in enumerate(self.schema.data['account']): |
|
if 'dataType' in output_item: |
|
output_type = output_item['dataType'] |
|
input_type = lookup.inputType(output_type) |
|
|
|
try: |
|
input_index = header.index(input_type) |
|
except ValueError: |
|
continue |
|
|
|
if output_type == "start_time": |
|
try: |
|
start = int(row[input_index]) |
|
if start >= 0: |
|
output_row[output_index] = start |
|
except ValueError: |
|
pass |
|
|
|
elif output_type == "end_time": |
|
try: |
|
end = int(row[input_index]) |
|
if end > 0: |
|
output_row[output_index] = end |
|
except ValueError: |
|
pass |
|
|
|
elif output_type == "account_id": |
|
acct_id = row[input_index] |
|
output_row[output_index] = acct_id |
|
|
|
elif output_type == "account_type": |
|
acct_type = row[input_index] |
|
output_row[output_index] = acct_type |
|
|
|
else: |
|
output_row[output_index] = row[input_index] |
|
|
|
if 'valueType' in output_item: |
|
if output_item['valueType'] == 'date': |
|
output_row[output_index] = self.schema.days2date(output_row[output_index]) |
|
|
|
|
|
if 'name' in output_item: |
|
if output_item['name'] == 'first_name': |
|
output_row[output_index] = us_gen.first_name_male() if gender == "Male" else us_gen.first_name_female() |
|
|
|
elif output_item['name'] == 'last_name': |
|
output_row[output_index] = us_gen.last_name_male() if gender == "Male" else us_gen.last_name_female() |
|
|
|
elif output_item['name'] == 'street_addr': |
|
output_row[output_index] = street_address |
|
|
|
elif output_item['name'] == 'city': |
|
output_row[output_index] = city |
|
|
|
elif output_item['name'] == 'state': |
|
output_row[output_index] = state |
|
|
|
elif output_item['name'] == 'country': |
|
output_row[output_index] = "US" |
|
|
|
elif output_item['name'] == 'zip': |
|
output_row[output_index] = postcode |
|
|
|
elif output_item['name'] == 'gender': |
|
output_row[output_index] = gender |
|
|
|
elif output_item['name'] == 'birth_date': |
|
output_row[output_index] = us_gen.date_of_birth() |
|
|
|
elif output_item['name'] == 'ssn': |
|
output_row[output_index] = us_gen.ssn() |
|
|
|
elif output_item['name'] == 'lat': |
|
output_row[output_index] = us_gen.latitude() |
|
|
|
elif output_item['name'] == 'lon': |
|
output_row[output_index] = us_gen.longitude() |
|
|
|
|
|
|
|
acct_writer.writerow(output_row) |
|
self.org_types[int(acct_id)] = acct_type |
|
|
|
|
|
is_individual = random() >= 0.5 |
|
party_id = str(acct_id) |
|
if is_individual: |
|
output_row = self.schema.get_party_ind_row(party_id) |
|
ind_writer.writerow(output_row) |
|
else: |
|
output_row = self.schema.get_party_org_row(party_id) |
|
org_writer.writerow(output_row) |
|
|
|
|
|
output_row = self.schema.get_acct_party_row(mapping_id, acct_id, party_id) |
|
map_writer.writerow(output_row) |
|
mapping_id += 1 |
|
|
|
in_acct_f.close() |
|
out_ind_f.close() |
|
out_org_f.close() |
|
out_map_f.close() |
|
out_ent_f.close() |
|
|
|
|
|
tx_set = set() |
|
cash_tx_set = set() |
|
|
|
|
|
reader = csv.reader(in_tx_f) |
|
tx_writer = csv.writer(out_tx_f) |
|
cash_tx_writer = csv.writer(out_cash_tx_f) |
|
alert_tx_writer = csv.writer(out_alert_tx_f) |
|
|
|
header = next(reader) |
|
indices = {name: index for index, name in enumerate(header)} |
|
num_columns = len(header) |
|
|
|
tx_header = self.schema.tx_names |
|
alert_header = self.schema.alert_tx_names |
|
tx_writer.writerow(tx_header) |
|
cash_tx_writer.writerow(tx_header) |
|
alert_tx_writer.writerow(alert_header) |
|
|
|
step_idx = indices["step"] |
|
amt_idx = indices["amount"] |
|
orig_idx = indices["nameOrig"] |
|
dest_idx = indices["nameDest"] |
|
sar_idx = indices["isSAR"] |
|
alert_idx = indices["alertID"] |
|
type_idx = indices["type"] |
|
|
|
tx_id = 1 |
|
for row in reader: |
|
if len(row) < num_columns: |
|
continue |
|
try: |
|
days = int(row[step_idx]) |
|
date_str = str(days) |
|
amount = row[amt_idx] |
|
orig_id = row[orig_idx] |
|
dest_id = row[dest_idx] |
|
sar_id = int(row[sar_idx]) |
|
alert_id = int(row[alert_idx]) |
|
|
|
is_sar = sar_id > 0 |
|
is_alert = alert_id >= 0 |
|
ttype = row[type_idx] |
|
except ValueError: |
|
continue |
|
|
|
attr = {name: row[index] for name, index in indices.items()} |
|
if ttype in CASH_TYPES: |
|
cash_tx = (orig_id, dest_id, ttype, amount, date_str) |
|
if cash_tx not in cash_tx_set: |
|
cash_tx_set.add(cash_tx) |
|
output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, |
|
is_sar, alert_id, **attr) |
|
cash_tx_writer.writerow(output_row) |
|
else: |
|
tx = (orig_id, dest_id, ttype, amount, date_str) |
|
if tx not in tx_set: |
|
output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, |
|
is_sar, alert_id, **attr) |
|
tx_writer.writerow(output_row) |
|
tx_set.add(tx) |
|
if is_alert: |
|
alert_type = self.reports.get(alert_id).get_reason() |
|
alert_row = self.schema.get_alert_tx_row(alert_id, alert_type, is_sar, tx_id, orig_id, dest_id, |
|
ttype, amount, date_str, **attr) |
|
alert_tx_writer.writerow(alert_row) |
|
|
|
if tx_id % 1000000 == 0: |
|
print("Converted %d transactions." % tx_id) |
|
tx_id += 1 |
|
|
|
in_tx_f.close() |
|
out_tx_f.close() |
|
out_cash_tx_f.close() |
|
out_alert_tx_f.close() |
|
|
|
|
|
deg_param = os.getenv("DEGREE") |
|
if deg_param: |
|
max_threshold = int(deg_param) |
|
pred = defaultdict(set) |
|
succ = defaultdict(set) |
|
for orig, dest, _, _, _ in tx_set: |
|
pred[dest].add(orig) |
|
succ[orig].add(dest) |
|
in_degrees = [len(nbs) for nbs in pred.values()] |
|
out_degrees = [len(nbs) for nbs in succ.values()] |
|
in_deg = Counter(in_degrees) |
|
out_deg = Counter(out_degrees) |
|
for th in range(2, max_threshold+1): |
|
num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
|
num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
|
print("Number of fan-in / fan-out patterns with", th, "neighbors", num_fan_in, "/", num_fan_out) |
|
|
|
def convert_alert_members(self): |
|
input_file = self.group_file |
|
output_file = self.alert_acct_file |
|
|
|
print("Load alert groups: %s" % input_file) |
|
rf = open(os.path.join(self.input_dir, input_file), "r") |
|
wf = open(os.path.join(self.work_dir, output_file), "w") |
|
reader = csv.reader(rf) |
|
header = next(reader) |
|
indices = {name: index for index, name in enumerate(header)} |
|
|
|
writer = csv.writer(wf) |
|
header = self.schema.alert_acct_names |
|
writer.writerow(header) |
|
|
|
for row in reader: |
|
reason = row[indices["reason"]] |
|
alert_id = int(row[indices["alertID"]]) |
|
account_id = int(row[indices["accountID"]]) |
|
is_sar = row[indices["isSAR"]].lower() == "true" |
|
model_id = row[indices["modelID"]] |
|
schedule_id = row[indices["scheduleID"]] |
|
bank_id = row[indices["bankID"]] |
|
|
|
if alert_id not in self.reports: |
|
self.reports[alert_id] = AMLTypology(reason) |
|
self.reports[alert_id].add_member(account_id, is_sar) |
|
|
|
attr = {name: row[index] for name, index in indices.items()} |
|
output_row = self.schema.get_alert_acct_row(alert_id, reason, account_id, account_id, is_sar, |
|
model_id, schedule_id, bank_id, **attr) |
|
writer.writerow(output_row) |
|
|
|
|
|
def output_sar_cases(self): |
|
"""Extract SAR account list involved in alert transactions from transaction log file |
|
""" |
|
input_file = self.log_file |
|
output_file = os.path.join(self.work_dir, self.sar_acct_file) |
|
|
|
print("Convert SAR typologies from %s to %s" % (input_file, output_file)) |
|
with open(input_file, "r") as rf: |
|
reader = csv.reader(rf) |
|
alerts = self.sar_accounts(reader) |
|
|
|
with open(output_file, "w") as wf: |
|
writer = csv.writer(wf) |
|
self.write_sar_accounts(writer, alerts) |
|
|
|
|
|
def sar_accounts(self, reader): |
|
header = next(reader) |
|
indices = {name: index for index, name in enumerate(header)} |
|
columns = len(header) |
|
|
|
tx_id = 0 |
|
for row in reader: |
|
if len(row) < columns: |
|
continue |
|
try: |
|
days = int(row[indices["step"]]) |
|
amount = float(row[indices["amount"]]) |
|
orig = int(row[indices["nameOrig"]]) |
|
dest = int(row[indices["nameDest"]]) |
|
alert_id = int(row[indices["alertID"]]) |
|
orig_name = "C_%d" % orig |
|
dest_name = "C_%d" % dest |
|
except ValueError: |
|
continue |
|
|
|
if alert_id >= 0 and alert_id in self.reports: |
|
attr = {name: row[index] for name, index in indices.items()} |
|
self.reports[alert_id].add_tx(tx_id, amount, days, orig, dest, orig_name, dest_name, attr) |
|
tx_id += 1 |
|
|
|
sar_accounts = list() |
|
count = 0 |
|
num_reports = len(self.reports) |
|
for sar_id, typology in self.reports.items(): |
|
if typology.count == 0: |
|
continue |
|
reason = typology.get_reason() |
|
is_sar = "YES" if typology.is_sar else "NO" |
|
for key, transaction in typology.transactions.items(): |
|
amount, step, orig_acct, dest_acct, orig_name, dest_name, attr = transaction |
|
|
|
if (self.account_recorded(orig_acct) |
|
and self.account_recorded(dest_acct)): |
|
continue |
|
if (not self.account_recorded(orig_acct)): |
|
acct_id = orig_acct |
|
cust_id = orig_name |
|
typology.recorded_members.add(acct_id) |
|
sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) |
|
if (not self.account_recorded(dest_acct)): |
|
acct_id = dest_acct |
|
cust_id = dest_name |
|
typology.recorded_members.add(acct_id) |
|
sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) |
|
|
|
count += 1 |
|
if count % 100 == 0: |
|
print("SAR Typologies: %d/%d" % (count, num_reports)) |
|
return sar_accounts |
|
|
|
def org_type(self, acct_id): |
|
return "INDIVIDUAL" if self.org_types[acct_id] == "I" else "COMPANY" |
|
|
|
|
|
def write_sar_accounts(self, writer, sar_accounts): |
|
writer.writerow( |
|
["ALERT_ID", "ACCOUNT_ID", "CUSTOMER_ID", "EVENT_DATE", |
|
"ALERT_TYPE", "ACCOUNT_TYPE", "IS_SAR"]) |
|
|
|
for alert in sar_accounts: |
|
writer.writerow(alert) |
|
|
|
def account_recorded(self, acct_id): |
|
for sar_id, typology in self.reports.items(): |
|
if acct_id in typology.recorded_members: |
|
return True |
|
return False |
|
|
|
|
|
if __name__ == "__main__": |
|
argv = sys.argv |
|
|
|
if len(argv) < 2: |
|
print("Usage: python3 %s [ConfJSON]" % argv[0]) |
|
exit(1) |
|
|
|
_conf_json = argv[1] |
|
_sim_name = argv[2] if len(argv) >= 3 else None |
|
|
|
with open(_conf_json, "r") as rf: |
|
conf = json.load(rf) |
|
converter = LogConverter(conf, _sim_name) |
|
fake = Faker(['en_US']) |
|
Faker.seed(0) |
|
converter = LogConverter(conf, _sim_name, fake) |
|
converter.convert_alert_members() |
|
converter.convert_acct_tx() |
|
converter.output_sar_cases() |
|
|