import csv |
import json |
import sys |
import os |
import shutil |
import datetime |
from dateutil.parser import parse |
from random import random |
from collections import defaultdict, Counter |
from amlsim.account_data_type_lookup import AccountDataTypeLookup |
from faker import Faker |
import numpy as np |
def days_to_date(days): |
date = datetime.datetime(2017, 1, 1) + datetime.timedelta(days=days) |
return date.strftime("%Y%m%d") |
def get_simulator_name(csv_file): |
"""Convert log file name to the simulator name |
:param csv_file: Transaction log file name |
:return: Simulator name |
""" |
elements = csv_file.split("_") |
return "_".join(elements[:4]) |
def get_name(acct_id): |
return "Account" + str(acct_id) |
def get_bank(acct_id): |
return "Bank" + str(acct_id) |
class AMLTypology: |
"""Suspicious transaction and account group |
""" |
def __init__(self, reason): |
self.is_sar = False |
self.main_acct = None |
self.reason = reason |
self.transactions = dict() |
self.members = set() |
self.recorded_members = set() |
self.total_amount = 0.0 |
self.count = 0 |
def add_member(self, member, is_sar): |
self.members.add(member) |
if is_sar: |
self.is_sar = True |
self.main_acct = member |
def add_tx(self, tx_id, amount, days, orig_acct, dest_acct, orig_name, dest_name, attr): |
self.transactions[tx_id] = (amount, days, orig_acct, dest_acct, orig_name, dest_name, attr) |
self.total_amount += amount |
self.count += 1 |
def get_reason(self): |
return self.reason |
def get_start_date(self): |
min_days = min([tx[1] for tx in self.transactions.values()]) |
return days_to_date(min_days) |
def get_end_date(self): |
max_days = max([tx[1] for tx in self.transactions.values()]) |
return days_to_date(max_days) |
class Schema: |
def __init__(self, data, base_date): |
self._base_date = base_date |
self.data = data |
self.acct_num_cols = None |
self.acct_names = list() |
self.acct_defaults = list() |
self.acct_types = list() |
self.acct_name2idx = dict() |
self.acct_id_idx = None |
self.acct_name_idx = None |
self.acct_balance_idx = None |
self.acct_start_idx = None |
self.acct_end_idx = None |
self.acct_sar_idx = None |
self.acct_model_idx = None |
self.acct_bank_idx = None |
self.tx_num_cols = None |
self.tx_names = list() |
self.tx_defaults = list() |
self.tx_types = list() |
self.tx_name2idx = dict() |
self.tx_id_idx = None |
self.tx_time_idx = None |
self.tx_amount_idx = None |
self.tx_type_idx = None |
self.tx_orig_idx = None |
self.tx_dest_idx = None |
self.tx_sar_idx = None |
self.tx_alert_idx = None |
self.alert_acct_num_cols = None |
self.alert_acct_names = list() |
self.alert_acct_defaults = list() |
self.alert_acct_types = list() |
self.alert_acct_name2idx = dict() |
self.alert_acct_alert_idx = None |
self.alert_acct_reason_idx = None |
self.alert_acct_id_idx = None |
self.alert_acct_name_idx = None |
self.alert_acct_sar_idx = None |
self.alert_acct_model_idx = None |
self.alert_acct_schedule_idx = None |
self.alert_acct_bank_idx = None |
self.alert_tx_num_cols = None |
self.alert_tx_names = list() |
self.alert_tx_defaults = list() |
self.alert_tx_types = list() |
self.alert_tx_name2idx = dict() |
self.alert_tx_id_idx = None |
self.alert_tx_type_idx = None |
self.alert_tx_sar_idx = None |
self.alert_tx_idx = None |
self.alert_tx_orig_idx = None |
self.alert_tx_dest_idx = None |
self.alert_tx_tx_type_idx = None |
self.alert_tx_amount_idx = None |
self.alert_tx_time_idx = None |
self.party_ind_num_cols = None |
self.party_ind_names = list() |
self.party_ind_defaults = list() |
self.party_ind_types = list() |
self.party_ind_name2idx = dict() |
self.party_ind_id_idx = None |
self.party_org_num_cols = None |
self.party_org_names = list() |
self.party_org_defaults = list() |
self.party_org_types = list() |
self.party_org_name2idx = dict() |
self.party_org_id_idx = None |
self.acct_party_num_cols = None |
self.acct_party_names = list() |
self.acct_party_defaults = list() |
self.acct_party_types = list() |
self.acct_party_name2idx = dict() |
self.acct_party_mapping_idx = None |
self.acct_party_acct_idx = None |
self.acct_party_party_idx = None |
self.party_party_num_cols = None |
self.party_party_names = list() |
self.party_party_defaults = list() |
self.party_party_types = list() |
self.party_party_name2idx = dict() |
self.party_party_ref_idx = None |
self.party_party_first_idx = None |
self.party_party_second_idx = None |
self._parse() |
def _parse(self): |
acct_data = self.data["account"] |
tx_data = self.data["transaction"] |
alert_tx_data = self.data["alert_tx"] |
alert_acct_data = self.data["alert_member"] |
party_ind_data = self.data["party_individual"] |
party_org_data = self.data["party_organization"] |
acct_party_data = self.data["account_mapping"] |
party_party_data = self.data["resolved_entities"] |
self.acct_num_cols = len(acct_data) |
self.tx_num_cols = len(tx_data) |
self.alert_tx_num_cols = len(alert_tx_data) |
self.alert_acct_num_cols = len(alert_acct_data) |
self.party_ind_num_cols = len(party_ind_data) |
self.party_org_num_cols = len(party_org_data) |
self.acct_party_num_cols = len(acct_party_data) |
self.party_party_num_cols = len(party_party_data) |
for idx, col in enumerate(acct_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.acct_names.append(name) |
self.acct_defaults.append(default) |
self.acct_types.append(v_type) |
for idx, col in enumerate(tx_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.tx_names.append(name) |
self.tx_defaults.append(default) |
self.tx_types.append(v_type) |
self.tx_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "transaction_id": |
self.tx_id_idx = idx |
elif d_type == "timestamp": |
self.tx_time_idx = idx |
elif d_type == "amount": |
self.tx_amount_idx = idx |
elif d_type == "transaction_type": |
self.tx_type_idx = idx |
elif d_type == "orig_id": |
self.tx_orig_idx = idx |
elif d_type == "dest_id": |
self.tx_dest_idx = idx |
elif d_type == "sar_flag": |
self.tx_sar_idx = idx |
elif d_type == "alert_id": |
self.tx_alert_idx = idx |
for idx, col in enumerate(alert_acct_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.alert_acct_names.append(name) |
self.alert_acct_defaults.append(default) |
self.alert_acct_types.append(v_type) |
self.alert_acct_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "alert_id": |
self.alert_acct_alert_idx = idx |
elif d_type == "alert_type": |
self.alert_acct_reason_idx = idx |
elif d_type == "account_id": |
self.alert_acct_id_idx = idx |
elif d_type == "account_name": |
self.alert_acct_name_idx = idx |
elif d_type == "sar_flag": |
self.alert_acct_sar_idx = idx |
elif d_type == "model_id": |
self.alert_acct_model_idx = idx |
elif d_type == "schedule_id": |
self.alert_acct_schedule_idx = idx |
elif d_type == "bank_id": |
self.alert_acct_bank_idx = idx |
for idx, col in enumerate(alert_tx_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.alert_tx_names.append(name) |
self.alert_tx_defaults.append(default) |
self.alert_tx_types.append(v_type) |
self.alert_tx_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "alert_id": |
self.alert_tx_id_idx = idx |
elif d_type == "alert_type": |
self.alert_tx_type_idx = idx |
elif d_type == "sar_flag": |
self.alert_tx_sar_idx = idx |
elif d_type == "transaction_id": |
self.alert_tx_idx = idx |
elif d_type == "orig_id": |
self.alert_tx_orig_idx = idx |
elif d_type == "dest_id": |
self.alert_tx_dest_idx = idx |
elif d_type == "transaction_type": |
self.alert_tx_tx_type_idx = idx |
elif d_type == "amount": |
self.alert_tx_amount_idx = idx |
elif d_type == "timestamp": |
self.alert_tx_time_idx = idx |
for idx, col in enumerate(party_ind_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.party_ind_names.append(name) |
self.party_ind_defaults.append(default) |
self.party_ind_types.append(v_type) |
self.party_ind_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "party_id": |
self.party_ind_id_idx = idx |
for idx, col in enumerate(party_org_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.party_org_names.append(name) |
self.party_org_defaults.append(default) |
self.party_org_types.append(v_type) |
self.party_org_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "party_id": |
self.party_org_id_idx = idx |
for idx, col in enumerate(acct_party_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.acct_party_names.append(name) |
self.acct_party_defaults.append(default) |
self.acct_party_types.append(v_type) |
self.acct_party_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "mapping_id": |
self.acct_party_mapping_idx = idx |
elif d_type == "account_id": |
self.acct_party_acct_idx = idx |
elif d_type == "party_id": |
self.acct_party_party_idx = idx |
for idx, col in enumerate(party_party_data): |
name = col["name"] |
v_type = col.get("valueType", "string") |
d_type = col.get("dataType") |
default = col.get("defaultValue", "") |
self.party_party_names.append(name) |
self.party_party_defaults.append(default) |
self.party_party_types.append(v_type) |
self.party_party_name2idx[name] = idx |
if d_type is None: |
continue |
if d_type == "ref_id": |
self.party_party_ref_idx = idx |
elif d_type == "first_id": |
self.party_party_first_idx = idx |
elif d_type == "second_id": |
self.party_party_second_idx = idx |
def days2date(self, _days): |
"""Get date as ISO 8601 format from days from the "base_date". If failed, return an empty string. |
:param _days: Days from the "base_date" |
:return: Date as ISO 8601 format |
""" |
try: |
num_days = int(_days) |
except ValueError: |
return "" |
dt = self._base_date + datetime.timedelta(num_days) |
return dt.isoformat() + "Z" |
def get_tx_row(self, _tx_id, _timestamp, _amount, _tx_type, _orig, _dest, _is_sar, _alert_id, **attr): |
row = list(self.tx_defaults) |
row[self.tx_id_idx] = _tx_id |
row[self.tx_time_idx] = _timestamp |
row[self.tx_amount_idx] = _amount |
row[self.tx_type_idx] = _tx_type |
row[self.tx_orig_idx] = _orig |
row[self.tx_dest_idx] = _dest |
row[self.tx_sar_idx] = _is_sar |
row[self.tx_alert_idx] = _alert_id |
for name, value in attr.items(): |
if name in self.tx_name2idx: |
idx = self.tx_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.tx_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
def get_alert_acct_row(self, _alert_id, _reason, _acct_id, _acct_name, _is_sar, |
_model_id, _schedule_id, _bank_id, **attr): |
row = list(self.alert_acct_defaults) |
row[self.alert_acct_alert_idx] = _alert_id |
row[self.alert_acct_reason_idx] = _reason |
row[self.alert_acct_id_idx] = _acct_id |
row[self.alert_acct_name_idx] = _acct_name |
row[self.alert_acct_sar_idx] = _is_sar |
row[self.alert_acct_model_idx] = _model_id |
row[self.alert_acct_schedule_idx] = _schedule_id |
row[self.alert_acct_bank_idx] = _bank_id |
for name, value in attr.items(): |
if name in self.alert_acct_name2idx: |
idx = self.alert_acct_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.alert_acct_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
def get_alert_tx_row(self, _alert_id, _alert_type, _is_sar, _tx_id, _orig, _dest, |
_tx_type, _amount, _timestamp, **attr): |
row = list(self.alert_tx_defaults) |
row[self.alert_tx_id_idx] = _alert_id |
row[self.alert_tx_type_idx] = _alert_type |
row[self.alert_tx_sar_idx] = _is_sar |
row[self.alert_tx_idx] = _tx_id |
row[self.alert_tx_orig_idx] = _orig |
row[self.alert_tx_dest_idx] = _dest |
row[self.alert_tx_tx_type_idx] = _tx_type |
row[self.alert_tx_amount_idx] = _amount |
row[self.alert_tx_time_idx] = _timestamp |
for name, value in attr.items(): |
if name in self.alert_tx_name2idx: |
idx = self.alert_tx_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.alert_tx_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
def get_party_ind_row(self, _party_id, **attr): |
row = list(self.party_ind_defaults) |
row[self.party_ind_id_idx] = _party_id |
for name, value in attr.items(): |
if name in self.party_ind_name2idx: |
idx = self.party_ind_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.party_ind_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
def get_party_org_row(self, _party_id, **attr): |
row = list(self.party_org_defaults) |
row[self.party_org_id_idx] = _party_id |
for name, value in attr.items(): |
if name in self.party_org_name2idx: |
idx = self.party_org_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.party_org_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
def get_acct_party_row(self, _mapping_id, _acct_id, _party_id, **attr): |
row = list(self.acct_party_defaults) |
row[self.acct_party_mapping_idx] = _mapping_id |
row[self.acct_party_acct_idx] = _acct_id |
row[self.acct_party_party_idx] = _party_id |
for name, value in attr.items(): |
if name in self.acct_party_name2idx: |
idx = self.acct_party_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.acct_party_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
def get_party_party_row(self, _ref_id, _first_id, _second_id, **attr): |
row = list(self.party_party_defaults) |
row[self.party_party_ref_idx] = _ref_id |
row[self.party_party_first_idx] = _first_id |
row[self.party_party_second_idx] = _second_id |
for name, value in attr.items(): |
if name in self.party_party_name2idx: |
idx = self.party_party_name2idx[name] |
row[idx] = value |
for idx, v_type in enumerate(self.party_party_types): |
if v_type == "date": |
row[idx] = self.days2date(row[idx]) |
return row |
class LogConverter: |
def __init__(self, conf, sim_name=None, fake=None): |
self.reports = dict() |
self.org_types = dict() |
self.fake = fake |
general_conf = conf.get('general', {}) |
input_conf = conf.get('temporal', {}) |
output_conf = conf.get('output', {}) |
self.sim_name = sim_name if sim_name is not None else general_conf.get("simulation_name", "sample") |
print("Simulation name:", self.sim_name) |
self.input_dir = os.path.join(input_conf.get('directory', ''), self.sim_name) |
self.work_dir = os.path.join(output_conf.get('directory', ''), self.sim_name) |
if not os.path.isdir(self.work_dir): |
os.makedirs(self.work_dir) |
param_dir = conf.get('input', {}).get('directory', '') |
schema_file = conf.get('input', {}).get('schema', '') |
base_date_str = general_conf.get('base_date', '2017-01-01') |
base_date = parse(base_date_str) |
json_file = os.path.join(param_dir, schema_file) |
with open(json_file, "r") as rf: |
data = json.load(rf) |
self.schema = Schema(data, base_date) |
self.log_file = os.path.join(self.work_dir, output_conf["transaction_log"]) |
self.in_acct_file = input_conf["accounts"] |
self.group_file = input_conf["alert_members"] |
self.out_acct_file = output_conf["accounts"] |
self.tx_file = output_conf["transactions"] |
self.cash_tx_file = output_conf["cash_transactions"] |
self.sar_acct_file = output_conf["sar_accounts"] |
self.alert_tx_file = output_conf["alert_transactions"] |
self.alert_acct_file = output_conf["alert_members"] |
self.party_individual_file = output_conf["party_individuals"] |
self.party_organization_file = output_conf["party_organizations"] |
self.account_mapping_file = output_conf["account_mapping"] |
self.resolved_entities_file = output_conf["resolved_entities"] |
dia_log = output_conf["diameter_log"] |
src_dia_path = os.path.join(self.input_dir, dia_log) |
dst_dia_path = os.path.join(self.work_dir, dia_log) |
if os.path.exists(src_dia_path): |
shutil.copy(src_dia_path, dst_dia_path) |
def convert_acct_tx(self): |
print("Convert transaction list from %s to %s, %s and %s" % ( |
self.log_file, self.tx_file, self.cash_tx_file, self.alert_tx_file)) |
in_acct_f = open(os.path.join(self.input_dir, self.in_acct_file), "r") |
in_tx_f = open(self.log_file, "r") |
out_acct_f = open(os.path.join(self.work_dir, self.out_acct_file), "w") |
out_tx_f = open(os.path.join(self.work_dir, self.tx_file), "w") |
out_cash_tx_f = open(os.path.join(self.work_dir, self.cash_tx_file), "w") |
out_alert_tx_f = open(os.path.join(self.work_dir, self.alert_tx_file), "w") |
out_ind_f = open(os.path.join(self.work_dir, self.party_individual_file), "w") |
out_org_f = open(os.path.join(self.work_dir, self.party_organization_file), "w") |
out_map_f = open(os.path.join(self.work_dir, self.account_mapping_file), "w") |
out_ent_f = open(os.path.join(self.work_dir, self.resolved_entities_file), "w") |
reader = csv.reader(in_acct_f) |
acct_writer = csv.writer(out_acct_f) |
acct_writer.writerow(self.schema.acct_names) |
ind_writer = csv.writer(out_ind_f) |
ind_writer.writerow(self.schema.party_ind_names) |
org_writer = csv.writer(out_org_f) |
org_writer.writerow(self.schema.party_org_names) |
map_writer = csv.writer(out_map_f) |
map_writer.writerow(self.schema.acct_party_names) |
ent_writer = csv.writer(out_ent_f) |
ent_writer.writerow(self.schema.party_party_names) |
header = next(reader) |
mapping_id = 1 |
lookup = AccountDataTypeLookup() |
us_gen = self.fake['en_US'] |
for row in reader: |
output_row = list(self.schema.acct_defaults) |
acct_type = "" |
acct_id = "" |
gender = np.random.choice(['Male', 'Female'], p=[0.5, 0.5]) |
good_address = False |
while good_address == False: |
address = us_gen.address() |
split1 = address.split('\n') |
street_address = split1[0] |
split2 = split1[1].split(', ') |
if len(split2) == 2: |
good_address = True |
city = split2[0] |
split3 = split2[1].split(' ') |
state = split3[0] |
postcode = split3[1] |
for output_index, output_item in enumerate(self.schema.data['account']): |
if 'dataType' in output_item: |
output_type = output_item['dataType'] |
input_type = lookup.inputType(output_type) |
try: |
input_index = header.index(input_type) |
except ValueError: |
continue |
if output_type == "start_time": |
try: |
start = int(row[input_index]) |
if start >= 0: |
output_row[output_index] = start |
except ValueError: |
pass |
elif output_type == "end_time": |
try: |
end = int(row[input_index]) |
if end > 0: |
output_row[output_index] = end |
except ValueError: |
pass |
elif output_type == "account_id": |
acct_id = row[input_index] |
output_row[output_index] = acct_id |
elif output_type == "account_type": |
acct_type = row[input_index] |
output_row[output_index] = acct_type |
else: |
output_row[output_index] = row[input_index] |
if 'valueType' in output_item: |
if output_item['valueType'] == 'date': |
output_row[output_index] = self.schema.days2date(output_row[output_index]) |
if 'name' in output_item: |
if output_item['name'] == 'first_name': |
output_row[output_index] = us_gen.first_name_male() if gender == "Male" else us_gen.first_name_female() |
elif output_item['name'] == 'last_name': |
output_row[output_index] = us_gen.last_name_male() if gender == "Male" else us_gen.last_name_female() |
elif output_item['name'] == 'street_addr': |
output_row[output_index] = street_address |
elif output_item['name'] == 'city': |
output_row[output_index] = city |
elif output_item['name'] == 'state': |
output_row[output_index] = state |
elif output_item['name'] == 'country': |
output_row[output_index] = "US" |
elif output_item['name'] == 'zip': |
output_row[output_index] = postcode |
elif output_item['name'] == 'gender': |
output_row[output_index] = gender |
elif output_item['name'] == 'birth_date': |
output_row[output_index] = us_gen.date_of_birth() |
elif output_item['name'] == 'ssn': |
output_row[output_index] = us_gen.ssn() |
elif output_item['name'] == 'lat': |
output_row[output_index] = us_gen.latitude() |
elif output_item['name'] == 'lon': |
output_row[output_index] = us_gen.longitude() |
acct_writer.writerow(output_row) |
self.org_types[int(acct_id)] = acct_type |
is_individual = random() >= 0.5 |
party_id = str(acct_id) |
if is_individual: |
output_row = self.schema.get_party_ind_row(party_id) |
ind_writer.writerow(output_row) |
else: |
output_row = self.schema.get_party_org_row(party_id) |
org_writer.writerow(output_row) |
output_row = self.schema.get_acct_party_row(mapping_id, acct_id, party_id) |
map_writer.writerow(output_row) |
mapping_id += 1 |
in_acct_f.close() |
out_ind_f.close() |
out_org_f.close() |
out_map_f.close() |
out_ent_f.close() |
tx_set = set() |
cash_tx_set = set() |
reader = csv.reader(in_tx_f) |
tx_writer = csv.writer(out_tx_f) |
cash_tx_writer = csv.writer(out_cash_tx_f) |
alert_tx_writer = csv.writer(out_alert_tx_f) |
header = next(reader) |
indices = {name: index for index, name in enumerate(header)} |
num_columns = len(header) |
tx_header = self.schema.tx_names |
alert_header = self.schema.alert_tx_names |
tx_writer.writerow(tx_header) |
cash_tx_writer.writerow(tx_header) |
alert_tx_writer.writerow(alert_header) |
step_idx = indices["step"] |
amt_idx = indices["amount"] |
orig_idx = indices["nameOrig"] |
dest_idx = indices["nameDest"] |
sar_idx = indices["isSAR"] |
alert_idx = indices["alertID"] |
type_idx = indices["type"] |
tx_id = 1 |
for row in reader: |
if len(row) < num_columns: |
continue |
try: |
days = int(row[step_idx]) |
date_str = str(days) |
amount = row[amt_idx] |
orig_id = row[orig_idx] |
dest_id = row[dest_idx] |
sar_id = int(row[sar_idx]) |
alert_id = int(row[alert_idx]) |
is_sar = sar_id > 0 |
is_alert = alert_id >= 0 |
ttype = row[type_idx] |
except ValueError: |
continue |
attr = {name: row[index] for name, index in indices.items()} |
if ttype in CASH_TYPES: |
cash_tx = (orig_id, dest_id, ttype, amount, date_str) |
if cash_tx not in cash_tx_set: |
cash_tx_set.add(cash_tx) |
output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, |
is_sar, alert_id, **attr) |
cash_tx_writer.writerow(output_row) |
else: |
tx = (orig_id, dest_id, ttype, amount, date_str) |
if tx not in tx_set: |
output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, |
is_sar, alert_id, **attr) |
tx_writer.writerow(output_row) |
tx_set.add(tx) |
if is_alert: |
alert_type = self.reports.get(alert_id).get_reason() |
alert_row = self.schema.get_alert_tx_row(alert_id, alert_type, is_sar, tx_id, orig_id, dest_id, |
ttype, amount, date_str, **attr) |
alert_tx_writer.writerow(alert_row) |
if tx_id % 1000000 == 0: |
print("Converted %d transactions." % tx_id) |
tx_id += 1 |
in_tx_f.close() |
out_tx_f.close() |
out_cash_tx_f.close() |
out_alert_tx_f.close() |
deg_param = os.getenv("DEGREE") |
if deg_param: |
max_threshold = int(deg_param) |
pred = defaultdict(set) |
succ = defaultdict(set) |
for orig, dest, _, _, _ in tx_set: |
pred[dest].add(orig) |
succ[orig].add(dest) |
in_degrees = [len(nbs) for nbs in pred.values()] |
out_degrees = [len(nbs) for nbs in succ.values()] |
in_deg = Counter(in_degrees) |
out_deg = Counter(out_degrees) |
for th in range(2, max_threshold+1): |
num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
print("Number of fan-in / fan-out patterns with", th, "neighbors", num_fan_in, "/", num_fan_out) |
def convert_alert_members(self): |
input_file = self.group_file |
output_file = self.alert_acct_file |
print("Load alert groups: %s" % input_file) |
rf = open(os.path.join(self.input_dir, input_file), "r") |
wf = open(os.path.join(self.work_dir, output_file), "w") |
reader = csv.reader(rf) |
header = next(reader) |
indices = {name: index for index, name in enumerate(header)} |
writer = csv.writer(wf) |
header = self.schema.alert_acct_names |
writer.writerow(header) |
for row in reader: |
reason = row[indices["reason"]] |
alert_id = int(row[indices["alertID"]]) |
account_id = int(row[indices["accountID"]]) |
is_sar = row[indices["isSAR"]].lower() == "true" |
model_id = row[indices["modelID"]] |
schedule_id = row[indices["scheduleID"]] |
bank_id = row[indices["bankID"]] |
if alert_id not in self.reports: |
self.reports[alert_id] = AMLTypology(reason) |
self.reports[alert_id].add_member(account_id, is_sar) |
attr = {name: row[index] for name, index in indices.items()} |
output_row = self.schema.get_alert_acct_row(alert_id, reason, account_id, account_id, is_sar, |
model_id, schedule_id, bank_id, **attr) |
writer.writerow(output_row) |
def output_sar_cases(self): |
"""Extract SAR account list involved in alert transactions from transaction log file |
""" |
input_file = self.log_file |
output_file = os.path.join(self.work_dir, self.sar_acct_file) |
print("Convert SAR typologies from %s to %s" % (input_file, output_file)) |
with open(input_file, "r") as rf: |
reader = csv.reader(rf) |
alerts = self.sar_accounts(reader) |
with open(output_file, "w") as wf: |
writer = csv.writer(wf) |
self.write_sar_accounts(writer, alerts) |
def sar_accounts(self, reader): |
header = next(reader) |
indices = {name: index for index, name in enumerate(header)} |
columns = len(header) |
tx_id = 0 |
for row in reader: |
if len(row) < columns: |
continue |
try: |
days = int(row[indices["step"]]) |
amount = float(row[indices["amount"]]) |
orig = int(row[indices["nameOrig"]]) |
dest = int(row[indices["nameDest"]]) |
alert_id = int(row[indices["alertID"]]) |
orig_name = "C_%d" % orig |
dest_name = "C_%d" % dest |
except ValueError: |
continue |
if alert_id >= 0 and alert_id in self.reports: |
attr = {name: row[index] for name, index in indices.items()} |
self.reports[alert_id].add_tx(tx_id, amount, days, orig, dest, orig_name, dest_name, attr) |
tx_id += 1 |
sar_accounts = list() |
count = 0 |
num_reports = len(self.reports) |
for sar_id, typology in self.reports.items(): |
if typology.count == 0: |
continue |
reason = typology.get_reason() |
is_sar = "YES" if typology.is_sar else "NO" |
for key, transaction in typology.transactions.items(): |
amount, step, orig_acct, dest_acct, orig_name, dest_name, attr = transaction |
if (self.account_recorded(orig_acct) |
and self.account_recorded(dest_acct)): |
continue |
if (not self.account_recorded(orig_acct)): |
acct_id = orig_acct |
cust_id = orig_name |
typology.recorded_members.add(acct_id) |
sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) |
if (not self.account_recorded(dest_acct)): |
acct_id = dest_acct |
cust_id = dest_name |
typology.recorded_members.add(acct_id) |
sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) |
count += 1 |
if count % 100 == 0: |
print("SAR Typologies: %d/%d" % (count, num_reports)) |
return sar_accounts |
def org_type(self, acct_id): |
return "INDIVIDUAL" if self.org_types[acct_id] == "I" else "COMPANY" |
def write_sar_accounts(self, writer, sar_accounts): |
writer.writerow( |
for alert in sar_accounts: |
writer.writerow(alert) |
def account_recorded(self, acct_id): |
for sar_id, typology in self.reports.items(): |
if acct_id in typology.recorded_members: |
return True |
return False |
if __name__ == "__main__": |
argv = sys.argv |
if len(argv) < 2: |
print("Usage: python3 %s [ConfJSON]" % argv[0]) |
exit(1) |
_conf_json = argv[1] |
_sim_name = argv[2] if len(argv) >= 3 else None |
with open(_conf_json, "r") as rf: |
conf = json.load(rf) |
converter = LogConverter(conf, _sim_name) |
fake = Faker(['en_US']) |
Faker.seed(0) |
converter = LogConverter(conf, _sim_name, fake) |
converter.convert_alert_members() |
converter.convert_acct_tx() |
converter.output_sar_cases() |