AMLSim / scripts /convert_logs.py
dingyiz's picture
Upload folder using huggingface_hub
2795186 verified
raw
history blame
38.1 kB
import csv
import json
import sys
import os
import shutil
import datetime
from dateutil.parser import parse
from random import random
from collections import defaultdict, Counter
from amlsim.account_data_type_lookup import AccountDataTypeLookup
from faker import Faker
import numpy as np
def days_to_date(days):
date = datetime.datetime(2017, 1, 1) + datetime.timedelta(days=days)
return date.strftime("%Y%m%d")
def get_simulator_name(csv_file):
"""Convert log file name to the simulator name
:param csv_file: Transaction log file name
:return: Simulator name
"""
elements = csv_file.split("_")
return "_".join(elements[:4])
def get_name(acct_id):
return "Account" + str(acct_id)
def get_bank(acct_id):
return "Bank" + str(acct_id)
CASH_TYPES = {"CASH-IN", "CASH-OUT"}
class AMLTypology:
"""Suspicious transaction and account group
"""
def __init__(self, reason):
self.is_sar = False # SAR flag
self.main_acct = None # Main account ID
self.reason = reason # Description of the SAR
self.transactions = dict() # Transaction ID, attributes
self.members = set() # Accounts involved in the alert transactions
self.recorded_members = set() # Accounts that have already been recorded. Avoid duplicates
self.total_amount = 0.0 # Total transaction amount
self.count = 0 # Number of transactions
def add_member(self, member, is_sar):
self.members.add(member)
if is_sar:
self.is_sar = True
self.main_acct = member
def add_tx(self, tx_id, amount, days, orig_acct, dest_acct, orig_name, dest_name, attr):
self.transactions[tx_id] = (amount, days, orig_acct, dest_acct, orig_name, dest_name, attr)
self.total_amount += amount
self.count += 1
def get_reason(self):
return self.reason
def get_start_date(self):
min_days = min([tx[1] for tx in self.transactions.values()])
return days_to_date(min_days)
def get_end_date(self):
max_days = max([tx[1] for tx in self.transactions.values()])
return days_to_date(max_days)
class Schema:
def __init__(self, data, base_date):
self._base_date = base_date
self.data = data
self.acct_num_cols = None
self.acct_names = list()
self.acct_defaults = list()
self.acct_types = list()
self.acct_name2idx = dict()
self.acct_id_idx = None
self.acct_name_idx = None
self.acct_balance_idx = None
self.acct_start_idx = None
self.acct_end_idx = None
self.acct_sar_idx = None
self.acct_model_idx = None
self.acct_bank_idx = None
self.tx_num_cols = None
self.tx_names = list()
self.tx_defaults = list()
self.tx_types = list()
self.tx_name2idx = dict()
self.tx_id_idx = None
self.tx_time_idx = None
self.tx_amount_idx = None
self.tx_type_idx = None
self.tx_orig_idx = None
self.tx_dest_idx = None
self.tx_sar_idx = None
self.tx_alert_idx = None
self.alert_acct_num_cols = None
self.alert_acct_names = list()
self.alert_acct_defaults = list()
self.alert_acct_types = list()
self.alert_acct_name2idx = dict()
self.alert_acct_alert_idx = None
self.alert_acct_reason_idx = None
self.alert_acct_id_idx = None
self.alert_acct_name_idx = None
self.alert_acct_sar_idx = None
self.alert_acct_model_idx = None
self.alert_acct_schedule_idx = None
self.alert_acct_bank_idx = None
self.alert_tx_num_cols = None
self.alert_tx_names = list()
self.alert_tx_defaults = list()
self.alert_tx_types = list()
self.alert_tx_name2idx = dict()
self.alert_tx_id_idx = None
self.alert_tx_type_idx = None
self.alert_tx_sar_idx = None
self.alert_tx_idx = None
self.alert_tx_orig_idx = None
self.alert_tx_dest_idx = None
self.alert_tx_tx_type_idx = None
self.alert_tx_amount_idx = None
self.alert_tx_time_idx = None
self.party_ind_num_cols = None
self.party_ind_names = list()
self.party_ind_defaults = list()
self.party_ind_types = list()
self.party_ind_name2idx = dict()
self.party_ind_id_idx = None
self.party_org_num_cols = None
self.party_org_names = list()
self.party_org_defaults = list()
self.party_org_types = list()
self.party_org_name2idx = dict()
self.party_org_id_idx = None
self.acct_party_num_cols = None
self.acct_party_names = list()
self.acct_party_defaults = list()
self.acct_party_types = list()
self.acct_party_name2idx = dict()
self.acct_party_mapping_idx = None
self.acct_party_acct_idx = None
self.acct_party_party_idx = None
self.party_party_num_cols = None
self.party_party_names = list()
self.party_party_defaults = list()
self.party_party_types = list()
self.party_party_name2idx = dict()
self.party_party_ref_idx = None
self.party_party_first_idx = None
self.party_party_second_idx = None
self._parse()
def _parse(self):
acct_data = self.data["account"]
tx_data = self.data["transaction"]
alert_tx_data = self.data["alert_tx"]
alert_acct_data = self.data["alert_member"]
party_ind_data = self.data["party_individual"]
party_org_data = self.data["party_organization"]
acct_party_data = self.data["account_mapping"]
party_party_data = self.data["resolved_entities"]
self.acct_num_cols = len(acct_data)
self.tx_num_cols = len(tx_data)
self.alert_tx_num_cols = len(alert_tx_data)
self.alert_acct_num_cols = len(alert_acct_data)
self.party_ind_num_cols = len(party_ind_data)
self.party_org_num_cols = len(party_org_data)
self.acct_party_num_cols = len(acct_party_data)
self.party_party_num_cols = len(party_party_data)
# Account list
for idx, col in enumerate(acct_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.acct_names.append(name)
self.acct_defaults.append(default)
self.acct_types.append(v_type)
# Transaction list
for idx, col in enumerate(tx_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.tx_names.append(name)
self.tx_defaults.append(default)
self.tx_types.append(v_type)
self.tx_name2idx[name] = idx
if d_type is None:
continue
if d_type == "transaction_id":
self.tx_id_idx = idx
elif d_type == "timestamp":
self.tx_time_idx = idx
elif d_type == "amount":
self.tx_amount_idx = idx
elif d_type == "transaction_type":
self.tx_type_idx = idx
elif d_type == "orig_id":
self.tx_orig_idx = idx
elif d_type == "dest_id":
self.tx_dest_idx = idx
elif d_type == "sar_flag":
self.tx_sar_idx = idx
elif d_type == "alert_id":
self.tx_alert_idx = idx
# Alert member list
for idx, col in enumerate(alert_acct_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.alert_acct_names.append(name)
self.alert_acct_defaults.append(default)
self.alert_acct_types.append(v_type)
self.alert_acct_name2idx[name] = idx
if d_type is None:
continue
if d_type == "alert_id":
self.alert_acct_alert_idx = idx
elif d_type == "alert_type":
self.alert_acct_reason_idx = idx
elif d_type == "account_id":
self.alert_acct_id_idx = idx
elif d_type == "account_name":
self.alert_acct_name_idx = idx
elif d_type == "sar_flag":
self.alert_acct_sar_idx = idx
elif d_type == "model_id":
self.alert_acct_model_idx = idx
elif d_type == "schedule_id":
self.alert_acct_schedule_idx = idx
elif d_type == "bank_id":
self.alert_acct_bank_idx = idx
# Alert transaction list
for idx, col in enumerate(alert_tx_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.alert_tx_names.append(name)
self.alert_tx_defaults.append(default)
self.alert_tx_types.append(v_type)
self.alert_tx_name2idx[name] = idx
if d_type is None:
continue
if d_type == "alert_id":
self.alert_tx_id_idx = idx
elif d_type == "alert_type":
self.alert_tx_type_idx = idx
elif d_type == "sar_flag":
self.alert_tx_sar_idx = idx
elif d_type == "transaction_id":
self.alert_tx_idx = idx
elif d_type == "orig_id":
self.alert_tx_orig_idx = idx
elif d_type == "dest_id":
self.alert_tx_dest_idx = idx
elif d_type == "transaction_type":
self.alert_tx_tx_type_idx = idx
elif d_type == "amount":
self.alert_tx_amount_idx = idx
elif d_type == "timestamp":
self.alert_tx_time_idx = idx
# Individual party list
for idx, col in enumerate(party_ind_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.party_ind_names.append(name)
self.party_ind_defaults.append(default)
self.party_ind_types.append(v_type)
self.party_ind_name2idx[name] = idx
if d_type is None:
continue
if d_type == "party_id":
self.party_ind_id_idx = idx
# Individual party list
for idx, col in enumerate(party_org_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.party_org_names.append(name)
self.party_org_defaults.append(default)
self.party_org_types.append(v_type)
self.party_org_name2idx[name] = idx
if d_type is None:
continue
if d_type == "party_id":
self.party_org_id_idx = idx
# Account-Party list
for idx, col in enumerate(acct_party_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.acct_party_names.append(name)
self.acct_party_defaults.append(default)
self.acct_party_types.append(v_type)
self.acct_party_name2idx[name] = idx
if d_type is None:
continue
if d_type == "mapping_id":
self.acct_party_mapping_idx = idx
elif d_type == "account_id":
self.acct_party_acct_idx = idx
elif d_type == "party_id":
self.acct_party_party_idx = idx
# Party-Party list
for idx, col in enumerate(party_party_data):
name = col["name"]
v_type = col.get("valueType", "string")
d_type = col.get("dataType")
default = col.get("defaultValue", "")
self.party_party_names.append(name)
self.party_party_defaults.append(default)
self.party_party_types.append(v_type)
self.party_party_name2idx[name] = idx
if d_type is None:
continue
if d_type == "ref_id":
self.party_party_ref_idx = idx
elif d_type == "first_id":
self.party_party_first_idx = idx
elif d_type == "second_id":
self.party_party_second_idx = idx
def days2date(self, _days):
"""Get date as ISO 8601 format from days from the "base_date". If failed, return an empty string.
:param _days: Days from the "base_date"
:return: Date as ISO 8601 format
"""
try:
num_days = int(_days)
except ValueError:
return ""
dt = self._base_date + datetime.timedelta(num_days)
return dt.isoformat() + "Z" # UTC
def get_tx_row(self, _tx_id, _timestamp, _amount, _tx_type, _orig, _dest, _is_sar, _alert_id, **attr):
row = list(self.tx_defaults)
row[self.tx_id_idx] = _tx_id
row[self.tx_time_idx] = _timestamp
row[self.tx_amount_idx] = _amount
row[self.tx_type_idx] = _tx_type
row[self.tx_orig_idx] = _orig
row[self.tx_dest_idx] = _dest
row[self.tx_sar_idx] = _is_sar
row[self.tx_alert_idx] = _alert_id
for name, value in attr.items():
if name in self.tx_name2idx:
idx = self.tx_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.tx_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
def get_alert_acct_row(self, _alert_id, _reason, _acct_id, _acct_name, _is_sar,
_model_id, _schedule_id, _bank_id, **attr):
row = list(self.alert_acct_defaults)
row[self.alert_acct_alert_idx] = _alert_id
row[self.alert_acct_reason_idx] = _reason
row[self.alert_acct_id_idx] = _acct_id
row[self.alert_acct_name_idx] = _acct_name
row[self.alert_acct_sar_idx] = _is_sar
row[self.alert_acct_model_idx] = _model_id
row[self.alert_acct_schedule_idx] = _schedule_id
row[self.alert_acct_bank_idx] = _bank_id
for name, value in attr.items():
if name in self.alert_acct_name2idx:
idx = self.alert_acct_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.alert_acct_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
def get_alert_tx_row(self, _alert_id, _alert_type, _is_sar, _tx_id, _orig, _dest,
_tx_type, _amount, _timestamp, **attr):
row = list(self.alert_tx_defaults)
row[self.alert_tx_id_idx] = _alert_id
row[self.alert_tx_type_idx] = _alert_type
row[self.alert_tx_sar_idx] = _is_sar
row[self.alert_tx_idx] = _tx_id
row[self.alert_tx_orig_idx] = _orig
row[self.alert_tx_dest_idx] = _dest
row[self.alert_tx_tx_type_idx] = _tx_type
row[self.alert_tx_amount_idx] = _amount
row[self.alert_tx_time_idx] = _timestamp
for name, value in attr.items():
if name in self.alert_tx_name2idx:
idx = self.alert_tx_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.alert_tx_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
def get_party_ind_row(self, _party_id, **attr):
row = list(self.party_ind_defaults)
row[self.party_ind_id_idx] = _party_id
for name, value in attr.items():
if name in self.party_ind_name2idx:
idx = self.party_ind_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.party_ind_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
def get_party_org_row(self, _party_id, **attr):
row = list(self.party_org_defaults)
row[self.party_org_id_idx] = _party_id
for name, value in attr.items():
if name in self.party_org_name2idx:
idx = self.party_org_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.party_org_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
def get_acct_party_row(self, _mapping_id, _acct_id, _party_id, **attr):
row = list(self.acct_party_defaults)
row[self.acct_party_mapping_idx] = _mapping_id
row[self.acct_party_acct_idx] = _acct_id
row[self.acct_party_party_idx] = _party_id
for name, value in attr.items():
if name in self.acct_party_name2idx:
idx = self.acct_party_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.acct_party_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
def get_party_party_row(self, _ref_id, _first_id, _second_id, **attr):
row = list(self.party_party_defaults)
row[self.party_party_ref_idx] = _ref_id
row[self.party_party_first_idx] = _first_id
row[self.party_party_second_idx] = _second_id
for name, value in attr.items():
if name in self.party_party_name2idx:
idx = self.party_party_name2idx[name]
row[idx] = value
for idx, v_type in enumerate(self.party_party_types):
if v_type == "date":
row[idx] = self.days2date(row[idx]) # convert days to date
return row
class LogConverter:
def __init__(self, conf, sim_name=None, fake=None):
self.reports = dict() # SAR ID and transaction subgraph
self.org_types = dict() # ID, organization type
self.fake = fake
general_conf = conf.get('general', {})
input_conf = conf.get('temporal', {}) # Input directory of this converter is temporal directory
output_conf = conf.get('output', {})
# self.sim_name = os.getenv("SIMULATION_NAME")
# if self.sim_name is None:
# self.sim_name = general_conf["simulation_name"]
self.sim_name = sim_name if sim_name is not None else general_conf.get("simulation_name", "sample")
print("Simulation name:", self.sim_name)
self.input_dir = os.path.join(input_conf.get('directory', ''), self.sim_name)
self.work_dir = os.path.join(output_conf.get('directory', ''), self.sim_name)
if not os.path.isdir(self.work_dir):
os.makedirs(self.work_dir)
param_dir = conf.get('input', {}).get('directory', '')
schema_file = conf.get('input', {}).get('schema', '')
base_date_str = general_conf.get('base_date', '2017-01-01')
base_date = parse(base_date_str)
json_file = os.path.join(param_dir, schema_file)
with open(json_file, "r") as rf:
data = json.load(rf)
self.schema = Schema(data, base_date)
# Input files
self.log_file = os.path.join(self.work_dir, output_conf["transaction_log"])
self.in_acct_file = input_conf["accounts"] # Account list file from the transaction graph generator
self.group_file = input_conf["alert_members"] # Alert account list file from the transaction graph generator
# Output files
self.out_acct_file = output_conf["accounts"] # All account list file
self.tx_file = output_conf["transactions"] # All transaction list file
self.cash_tx_file = output_conf["cash_transactions"] # Cash transaction list file
self.sar_acct_file = output_conf["sar_accounts"] # SAR account list file
self.alert_tx_file = output_conf["alert_transactions"] # Alert transaction list file
self.alert_acct_file = output_conf["alert_members"] # Alert account list file
self.party_individual_file = output_conf["party_individuals"]
self.party_organization_file = output_conf["party_organizations"]
self.account_mapping_file = output_conf["account_mapping"]
self.resolved_entities_file = output_conf["resolved_entities"]
# Copy the diameter CSV file if it exists
dia_log = output_conf["diameter_log"]
src_dia_path = os.path.join(self.input_dir, dia_log)
dst_dia_path = os.path.join(self.work_dir, dia_log)
if os.path.exists(src_dia_path):
shutil.copy(src_dia_path, dst_dia_path)
def convert_acct_tx(self):
print("Convert transaction list from %s to %s, %s and %s" % (
self.log_file, self.tx_file, self.cash_tx_file, self.alert_tx_file))
in_acct_f = open(os.path.join(self.input_dir, self.in_acct_file), "r") # Input account file
in_tx_f = open(self.log_file, "r") # Transaction log file from the Java simulator
out_acct_f = open(os.path.join(self.work_dir, self.out_acct_file), "w") # Output account file
out_tx_f = open(os.path.join(self.work_dir, self.tx_file), "w") # Output transaction file
out_cash_tx_f = open(os.path.join(self.work_dir, self.cash_tx_file), "w") # Output cash transaction file
out_alert_tx_f = open(os.path.join(self.work_dir, self.alert_tx_file), "w") # Output alert transaction file
out_ind_f = open(os.path.join(self.work_dir, self.party_individual_file), "w") # Party individuals
out_org_f = open(os.path.join(self.work_dir, self.party_organization_file), "w") # Party organizations
out_map_f = open(os.path.join(self.work_dir, self.account_mapping_file), "w") # Account mappings
out_ent_f = open(os.path.join(self.work_dir, self.resolved_entities_file), "w") # Resolved entities
# Load account list
reader = csv.reader(in_acct_f)
acct_writer = csv.writer(out_acct_f)
acct_writer.writerow(self.schema.acct_names) # write header
ind_writer = csv.writer(out_ind_f)
ind_writer.writerow(self.schema.party_ind_names)
org_writer = csv.writer(out_org_f)
org_writer.writerow(self.schema.party_org_names)
map_writer = csv.writer(out_map_f)
map_writer.writerow(self.schema.acct_party_names)
ent_writer = csv.writer(out_ent_f)
ent_writer.writerow(self.schema.party_party_names)
header = next(reader)
mapping_id = 1 # Mapping ID for account-alert list
lookup = AccountDataTypeLookup()
us_gen = self.fake['en_US']
for row in reader:
output_row = list(self.schema.acct_defaults)
acct_type = ""
acct_id = ""
gender = np.random.choice(['Male', 'Female'], p=[0.5, 0.5])
good_address = False
while good_address == False:
address = us_gen.address()
split1 = address.split('\n')
street_address = split1[0]
split2 = split1[1].split(', ')
if len(split2) == 2:
good_address = True
city = split2[0]
split3 = split2[1].split(' ')
state = split3[0]
postcode = split3[1]
for output_index, output_item in enumerate(self.schema.data['account']):
if 'dataType' in output_item:
output_type = output_item['dataType']
input_type = lookup.inputType(output_type)
try:
input_index = header.index(input_type)
except ValueError:
continue
if output_type == "start_time":
try:
start = int(row[input_index])
if start >= 0:
output_row[output_index] = start
except ValueError: # If failed, keep the default value
pass
elif output_type == "end_time":
try:
end = int(row[input_index])
if end > 0:
output_row[output_index] = end
except ValueError: # If failed, keep the default value
pass
elif output_type == "account_id":
acct_id = row[input_index]
output_row[output_index] = acct_id
elif output_type == "account_type":
acct_type = row[input_index]
output_row[output_index] = acct_type
else:
output_row[output_index] = row[input_index]
if 'valueType' in output_item:
if output_item['valueType'] == 'date':
output_row[output_index] = self.schema.days2date(output_row[output_index])
if 'name' in output_item:
if output_item['name'] == 'first_name':
output_row[output_index] = us_gen.first_name_male() if gender == "Male" else us_gen.first_name_female()
elif output_item['name'] == 'last_name':
output_row[output_index] = us_gen.last_name_male() if gender == "Male" else us_gen.last_name_female()
elif output_item['name'] == 'street_addr':
output_row[output_index] = street_address
elif output_item['name'] == 'city':
output_row[output_index] = city
elif output_item['name'] == 'state':
output_row[output_index] = state
elif output_item['name'] == 'country':
output_row[output_index] = "US"
elif output_item['name'] == 'zip':
output_row[output_index] = postcode
elif output_item['name'] == 'gender':
output_row[output_index] = gender
elif output_item['name'] == 'birth_date':
output_row[output_index] = us_gen.date_of_birth()
elif output_item['name'] == 'ssn':
output_row[output_index] = us_gen.ssn()
elif output_item['name'] == 'lat':
output_row[output_index] = us_gen.latitude()
elif output_item['name'] == 'lon':
output_row[output_index] = us_gen.longitude()
acct_writer.writerow(output_row)
self.org_types[int(acct_id)] = acct_type
# Write a party row per account
is_individual = random() >= 0.5 # 50%: individual, 50%: organization
party_id = str(acct_id)
if is_individual: # Individual
output_row = self.schema.get_party_ind_row(party_id)
ind_writer.writerow(output_row)
else:
output_row = self.schema.get_party_org_row(party_id)
org_writer.writerow(output_row)
# Write account-party mapping row
output_row = self.schema.get_acct_party_row(mapping_id, acct_id, party_id)
map_writer.writerow(output_row)
mapping_id += 1
in_acct_f.close()
out_ind_f.close()
out_org_f.close()
out_map_f.close()
out_ent_f.close()
# Avoid duplicated transaction CSV rows in the log file
tx_set = set()
cash_tx_set = set()
# Load transaction log from the Java simulator
reader = csv.reader(in_tx_f)
tx_writer = csv.writer(out_tx_f)
cash_tx_writer = csv.writer(out_cash_tx_f)
alert_tx_writer = csv.writer(out_alert_tx_f)
header = next(reader)
indices = {name: index for index, name in enumerate(header)}
num_columns = len(header)
tx_header = self.schema.tx_names
alert_header = self.schema.alert_tx_names
tx_writer.writerow(tx_header)
cash_tx_writer.writerow(tx_header)
alert_tx_writer.writerow(alert_header)
step_idx = indices["step"]
amt_idx = indices["amount"]
orig_idx = indices["nameOrig"]
dest_idx = indices["nameDest"]
sar_idx = indices["isSAR"]
alert_idx = indices["alertID"]
type_idx = indices["type"]
tx_id = 1
for row in reader:
if len(row) < num_columns:
continue
try:
days = int(row[step_idx])
date_str = str(days) # days_to_date(days)
amount = row[amt_idx] # transaction amount
orig_id = row[orig_idx] # originator ID
dest_id = row[dest_idx] # beneficiary ID
sar_id = int(row[sar_idx]) # SAR transaction index
alert_id = int(row[alert_idx]) # Alert ID
is_sar = sar_id > 0
is_alert = alert_id >= 0
ttype = row[type_idx]
except ValueError:
continue
attr = {name: row[index] for name, index in indices.items()}
if ttype in CASH_TYPES: # Cash transactions
cash_tx = (orig_id, dest_id, ttype, amount, date_str)
if cash_tx not in cash_tx_set:
cash_tx_set.add(cash_tx)
output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id,
is_sar, alert_id, **attr)
cash_tx_writer.writerow(output_row)
else: # Account-to-account transactions including alert transactions
tx = (orig_id, dest_id, ttype, amount, date_str)
if tx not in tx_set:
output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id,
is_sar, alert_id, **attr)
tx_writer.writerow(output_row)
tx_set.add(tx)
if is_alert: # Alert transactions
alert_type = self.reports.get(alert_id).get_reason()
alert_row = self.schema.get_alert_tx_row(alert_id, alert_type, is_sar, tx_id, orig_id, dest_id,
ttype, amount, date_str, **attr)
alert_tx_writer.writerow(alert_row)
if tx_id % 1000000 == 0:
print("Converted %d transactions." % tx_id)
tx_id += 1
in_tx_f.close()
out_tx_f.close()
out_cash_tx_f.close()
out_alert_tx_f.close()
# Count degrees (fan-in/out patterns)
deg_param = os.getenv("DEGREE")
if deg_param:
max_threshold = int(deg_param)
pred = defaultdict(set) # Account, Predecessors
succ = defaultdict(set) # Account, Successors
for orig, dest, _, _, _ in tx_set:
pred[dest].add(orig)
succ[orig].add(dest)
in_degrees = [len(nbs) for nbs in pred.values()]
out_degrees = [len(nbs) for nbs in succ.values()]
in_deg = Counter(in_degrees)
out_deg = Counter(out_degrees)
for th in range(2, max_threshold+1):
num_fan_in = sum([c for d, c in in_deg.items() if d >= th])
num_fan_out = sum([c for d, c in out_deg.items() if d >= th])
print("Number of fan-in / fan-out patterns with", th, "neighbors", num_fan_in, "/", num_fan_out)
def convert_alert_members(self):
input_file = self.group_file
output_file = self.alert_acct_file
print("Load alert groups: %s" % input_file)
rf = open(os.path.join(self.input_dir, input_file), "r")
wf = open(os.path.join(self.work_dir, output_file), "w")
reader = csv.reader(rf)
header = next(reader)
indices = {name: index for index, name in enumerate(header)}
writer = csv.writer(wf)
header = self.schema.alert_acct_names
writer.writerow(header)
for row in reader:
reason = row[indices["reason"]]
alert_id = int(row[indices["alertID"]])
account_id = int(row[indices["accountID"]])
is_sar = row[indices["isSAR"]].lower() == "true"
model_id = row[indices["modelID"]]
schedule_id = row[indices["scheduleID"]]
bank_id = row[indices["bankID"]]
if alert_id not in self.reports:
self.reports[alert_id] = AMLTypology(reason)
self.reports[alert_id].add_member(account_id, is_sar)
attr = {name: row[index] for name, index in indices.items()}
output_row = self.schema.get_alert_acct_row(alert_id, reason, account_id, account_id, is_sar,
model_id, schedule_id, bank_id, **attr)
writer.writerow(output_row)
def output_sar_cases(self):
"""Extract SAR account list involved in alert transactions from transaction log file
"""
input_file = self.log_file
output_file = os.path.join(self.work_dir, self.sar_acct_file)
print("Convert SAR typologies from %s to %s" % (input_file, output_file))
with open(input_file, "r") as rf:
reader = csv.reader(rf)
alerts = self.sar_accounts(reader)
with open(output_file, "w") as wf:
writer = csv.writer(wf)
self.write_sar_accounts(writer, alerts)
def sar_accounts(self, reader):
header = next(reader)
indices = {name: index for index, name in enumerate(header)}
columns = len(header)
tx_id = 0
for row in reader:
if len(row) < columns:
continue
try:
days = int(row[indices["step"]])
amount = float(row[indices["amount"]])
orig = int(row[indices["nameOrig"]])
dest = int(row[indices["nameDest"]])
alert_id = int(row[indices["alertID"]])
orig_name = "C_%d" % orig
dest_name = "C_%d" % dest
except ValueError:
continue
if alert_id >= 0 and alert_id in self.reports: # SAR transactions
attr = {name: row[index] for name, index in indices.items()}
self.reports[alert_id].add_tx(tx_id, amount, days, orig, dest, orig_name, dest_name, attr)
tx_id += 1
sar_accounts = list()
count = 0
num_reports = len(self.reports)
for sar_id, typology in self.reports.items():
if typology.count == 0:
continue
reason = typology.get_reason()
is_sar = "YES" if typology.is_sar else "NO" # SAR or false alert
for key, transaction in typology.transactions.items():
amount, step, orig_acct, dest_acct, orig_name, dest_name, attr = transaction
if (self.account_recorded(orig_acct)
and self.account_recorded(dest_acct)):
continue
if (not self.account_recorded(orig_acct)):
acct_id = orig_acct
cust_id = orig_name
typology.recorded_members.add(acct_id)
sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar))
if (not self.account_recorded(dest_acct)):
acct_id = dest_acct
cust_id = dest_name
typology.recorded_members.add(acct_id)
sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar))
count += 1
if count % 100 == 0:
print("SAR Typologies: %d/%d" % (count, num_reports))
return sar_accounts
def org_type(self, acct_id):
return "INDIVIDUAL" if self.org_types[acct_id] == "I" else "COMPANY"
def write_sar_accounts(self, writer, sar_accounts):
writer.writerow(
["ALERT_ID", "ACCOUNT_ID", "CUSTOMER_ID", "EVENT_DATE",
"ALERT_TYPE", "ACCOUNT_TYPE", "IS_SAR"])
for alert in sar_accounts:
writer.writerow(alert)
def account_recorded(self, acct_id):
for sar_id, typology in self.reports.items():
if acct_id in typology.recorded_members:
return True
return False
if __name__ == "__main__":
argv = sys.argv
if len(argv) < 2:
print("Usage: python3 %s [ConfJSON]" % argv[0])
exit(1)
_conf_json = argv[1]
_sim_name = argv[2] if len(argv) >= 3 else None
with open(_conf_json, "r") as rf:
conf = json.load(rf)
converter = LogConverter(conf, _sim_name)
fake = Faker(['en_US'])
Faker.seed(0)
converter = LogConverter(conf, _sim_name, fake)
converter.convert_alert_members()
converter.convert_acct_tx()
converter.output_sar_cases()