|
""" |
|
Generate a base transaction graph used in the simulator |
|
""" |
|
|
|
import networkx as nx |
|
import numpy as np |
|
import itertools |
|
import random |
|
import csv |
|
import json |
|
import os |
|
import sys |
|
import logging |
|
|
|
import cProfile |
|
|
|
|
|
from collections import Counter, defaultdict |
|
from amlsim.nominator import Nominator |
|
from amlsim.normal_model import NormalModel |
|
|
|
from amlsim.random_amount import RandomAmount |
|
from amlsim.rounded_amount import RoundedAmount |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
MAIN_ACCT_KEY = "main_acct" |
|
IS_SAR_KEY = "is_sar" |
|
|
|
DEFAULT_MARGIN_RATIO = 0.1 |
|
|
|
|
|
|
|
def parse_int(value): |
|
""" Convert string to int |
|
:param value: string value |
|
:return: int value if the parameter can be converted to str, otherwise None |
|
""" |
|
try: |
|
return int(value) |
|
except (ValueError, TypeError): |
|
return None |
|
|
|
|
|
def parse_float(value): |
|
""" Convert string to amount (float) |
|
:param value: string value |
|
:return: float value if the parameter can be converted to float, otherwise None |
|
""" |
|
try: |
|
return float(value) |
|
except (ValueError, TypeError): |
|
return None |
|
|
|
|
|
def parse_flag(value): |
|
""" Convert string to boolean (True or false) |
|
:param value: string value |
|
:return: True if the value is equal to "true" (case-insensitive), otherwise False |
|
""" |
|
return type(value) == str and value.lower() == "true" |
|
|
|
|
|
def get_positive_or_none(value): |
|
""" Get positive value or None (used to parse simulation step parameters) |
|
:param value: Numerical value or None |
|
:return: If the value is positive, return this value. Otherwise, return None. |
|
""" |
|
if value is None: |
|
return None |
|
else: |
|
return value if value > 0 else None |
|
|
|
|
|
def directed_configuration_model(_in_deg, _out_deg, seed=0): |
|
"""Generate a directed random graph with the given degree sequences without self loop. |
|
Based on nx.generators.degree_seq.directed_configuration_model |
|
:param _in_deg: Each list entry corresponds to the in-degree of a node. |
|
:param _out_deg: Each list entry corresponds to the out-degree of a node. |
|
:param seed: Seed for random number generator |
|
:return: MultiDiGraph without self loop |
|
""" |
|
if not sum(_in_deg) == sum(_out_deg): |
|
raise nx.NetworkXError('Invalid degree sequences. Sequences must have equal sums.') |
|
|
|
random.seed(seed) |
|
n_in = len(_in_deg) |
|
n_out = len(_out_deg) |
|
if n_in < n_out: |
|
_in_deg.extend((n_out - n_in) * [0]) |
|
else: |
|
_out_deg.extend((n_in - n_out) * [0]) |
|
|
|
num_nodes = len(_in_deg) |
|
_g = nx.empty_graph(num_nodes, nx.MultiDiGraph()) |
|
if num_nodes == 0 or max(_in_deg) == 0: |
|
return _g |
|
|
|
in_tmp_list = list() |
|
out_tmp_list = list() |
|
for n in _g.nodes(): |
|
in_tmp_list.extend(_in_deg[n] * [n]) |
|
out_tmp_list.extend(_out_deg[n] * [n]) |
|
random.shuffle(in_tmp_list) |
|
random.shuffle(out_tmp_list) |
|
|
|
num_edges = len(in_tmp_list) |
|
for i in range(num_edges): |
|
_src = out_tmp_list[i] |
|
_dst = in_tmp_list[i] |
|
if _src == _dst: |
|
for j in range(i + 1, num_edges): |
|
if _src != in_tmp_list[j]: |
|
in_tmp_list[i], in_tmp_list[j] = in_tmp_list[j], in_tmp_list[i] |
|
break |
|
|
|
_g.add_edges_from(zip(out_tmp_list, in_tmp_list)) |
|
for idx, (_src, _dst) in enumerate(_g.edges()): |
|
if _src == _dst: |
|
logger.warning("Self loop from/to %d at %d" % (_src, idx)) |
|
return _g |
|
|
|
|
|
def get_degrees(deg_csv, num_v): |
|
""" |
|
:param deg_csv: Degree distribution parameter CSV file |
|
:param num_v: Number of total account vertices |
|
:return: In-degree and out-degree sequence list |
|
""" |
|
with open(deg_csv, "r") as rf: |
|
reader = csv.reader(rf) |
|
next(reader) |
|
return get_in_and_out_degrees(reader, num_v) |
|
|
|
|
|
def get_in_and_out_degrees(iterable, num_v): |
|
_in_deg = list() |
|
_out_deg = list() |
|
|
|
for row in iterable: |
|
if row[0].startswith("#"): |
|
continue |
|
count = int(row[0]) |
|
_in_deg.extend([int(row[1])] * count) |
|
_out_deg.extend([int(row[2])] * count) |
|
|
|
in_len, out_len = len(_in_deg), len(_out_deg) |
|
if in_len != out_len: |
|
raise ValueError("The length of in-degree (%d) and out-degree (%d) sequences must be same." |
|
% (in_len, out_len)) |
|
|
|
total_in_deg, total_out_deg = sum(_in_deg), sum(_out_deg) |
|
if total_in_deg != total_out_deg: |
|
raise ValueError("The sum of in-degree (%d) and out-degree (%d) must be same." |
|
% (total_in_deg, total_out_deg)) |
|
|
|
if num_v % in_len != 0: |
|
raise ValueError("The number of total accounts (%d) " |
|
"must be a multiple of the degree sequence length (%d)." |
|
% (num_v, in_len)) |
|
|
|
repeats = num_v // in_len |
|
_in_deg = _in_deg * repeats |
|
_out_deg = _out_deg * repeats |
|
|
|
return _in_deg, _out_deg |
|
|
|
|
|
class TransactionGenerator: |
|
|
|
def __init__(self, conf, sim_name=None): |
|
"""Initialize transaction network from parameter files. |
|
:param conf_file: JSON file as configurations |
|
:param sim_name: Simulation name (overrides the content in the `conf_json`) |
|
""" |
|
self.g = nx.DiGraph() |
|
self.num_accounts = 0 |
|
self.hubs = set() |
|
self.attr_names = list() |
|
self.bank_to_accts = defaultdict(set) |
|
self.acct_to_bank = dict() |
|
self.normal_model_counts = dict() |
|
self.normal_models = list() |
|
self.normal_model_id = 1 |
|
|
|
self.conf = conf |
|
|
|
general_conf = self.conf["general"] |
|
|
|
|
|
seed = general_conf.get("random_seed") |
|
env_seed = os.getenv("RANDOM_SEED") |
|
if env_seed is not None: |
|
seed = env_seed |
|
self.seed = seed if seed is None else int(seed) |
|
np.random.seed(self.seed) |
|
random.seed(self.seed) |
|
logger.info("Random seed: " + str(self.seed)) |
|
|
|
|
|
if sim_name is None: |
|
sim_name = general_conf["simulation_name"] |
|
logger.info("Simulation name: " + sim_name) |
|
|
|
self.total_steps = parse_int(general_conf["total_steps"]) |
|
|
|
|
|
default_conf = self.conf["default"] |
|
self.default_min_amount = parse_float(default_conf.get("min_amount")) |
|
self.default_max_amount = parse_float(default_conf.get("max_amount")) |
|
self.default_min_balance = parse_float(default_conf.get("min_balance")) |
|
self.default_max_balance = parse_float(default_conf.get("max_balance")) |
|
self.default_start_step = parse_int(default_conf.get("start_step")) |
|
self.default_end_step = parse_int(default_conf.get("end_step")) |
|
self.default_start_range = parse_int(default_conf.get("start_range")) |
|
self.default_end_range = parse_int(default_conf.get("end_range")) |
|
self.default_model = parse_int(default_conf.get("transaction_model")) |
|
|
|
|
|
self.margin_ratio = parse_float(default_conf.get("margin_ratio", DEFAULT_MARGIN_RATIO)) |
|
if not 0.0 <= self.margin_ratio <= 1.0: |
|
raise ValueError("Margin ratio in AML typologies (%f) must be within [0.0, 1.0]" % self.margin_ratio) |
|
|
|
self.default_bank_id = default_conf.get("bank_id") |
|
|
|
|
|
input_conf = self.conf["input"] |
|
self.input_dir = input_conf["directory"] |
|
self.account_file = input_conf["accounts"] |
|
self.alert_file = input_conf["alert_patterns"] |
|
self.normal_models_file = input_conf["normal_models"] |
|
self.degree_file = input_conf["degree"] |
|
self.type_file = input_conf["transaction_type"] |
|
self.is_aggregated = input_conf["is_aggregated_accounts"] |
|
|
|
|
|
output_conf = self.conf["temporal"] |
|
self.output_dir = os.path.join(output_conf["directory"], sim_name) |
|
self.out_tx_file = output_conf["transactions"] |
|
self.out_account_file = output_conf["accounts"] |
|
self.out_alert_member_file = output_conf["alert_members"] |
|
self.out_normal_models_file = output_conf["normal_models"] |
|
|
|
|
|
other_conf = self.conf["graph_generator"] |
|
self.degree_threshold = parse_int(other_conf["degree_threshold"]) |
|
high_risk_countries_str = other_conf.get("high_risk_countries", "") |
|
high_risk_business_str = other_conf.get("high_risk_business", "") |
|
self.high_risk_countries = set(high_risk_countries_str.split(",")) |
|
self.high_risk_business = set(high_risk_business_str.split(",")) |
|
|
|
self.edge_id = 0 |
|
self.alert_id = 0 |
|
self.alert_groups = dict() |
|
|
|
self.alert_types = {"fan_out": 1, "fan_in": 2, "cycle": 3, "bipartite": 4, "stack": 5, |
|
"random": 6, "scatter_gather": 7, "gather_scatter": 8} |
|
|
|
self.acct_file = os.path.join(self.input_dir, self.account_file) |
|
|
|
def get_types(type_csv): |
|
tx_types = list() |
|
with open(type_csv, "r") as _rf: |
|
reader = csv.reader(_rf) |
|
next(reader) |
|
for row in reader: |
|
if row[0].startswith("#"): |
|
continue |
|
ttype = row[0] |
|
tx_types.extend([ttype] * int(row[1])) |
|
return tx_types |
|
|
|
self.tx_types = get_types(os.path.join(self.input_dir, self.type_file)) |
|
|
|
def check_hub_exists(self): |
|
"""Validate whether one or more hub accounts exist as main accounts of AML typologies |
|
""" |
|
if not self.hubs: |
|
raise ValueError("No main account candidates found. " |
|
"Please try again with smaller value of the 'degree_threshold' parameter in conf.json.") |
|
|
|
def set_main_acct_candidates(self): |
|
""" Set self.hubs to be a set of hub nodes |
|
Throw an error if not done successfully. |
|
""" |
|
hub_list = self.hub_nodes() |
|
self.hubs = set(hub_list) |
|
self.check_hub_exists() |
|
|
|
|
|
def hub_nodes(self): |
|
"""Choose hub accounts with larger degree than the specified threshold |
|
as the main account candidates of alert transaction sets |
|
""" |
|
nodes = [n for n in self.g.nodes() |
|
if self.degree_threshold <= self.g.in_degree(n) |
|
or self.degree_threshold <= self.g.out_degree(n)] |
|
return nodes |
|
|
|
|
|
def check_account_exist(self, aid): |
|
"""Validate an existence of a specified account. If absent, it raises KeyError. |
|
:param aid: Account ID |
|
""" |
|
if not self.g.has_node(aid): |
|
raise KeyError("Account %s does not exist" % str(aid)) |
|
|
|
def check_account_absent(self, aid): |
|
"""Validate an absence of a specified account |
|
:param aid: Account ID |
|
:return: True if an account of the specified ID is not yet added |
|
""" |
|
if self.g.has_node(aid): |
|
logger.warning("Account %s already exists" % str(aid)) |
|
return False |
|
else: |
|
return True |
|
|
|
def get_all_bank_ids(self): |
|
"""Get a list of all bank IDs |
|
:return: Bank ID list |
|
""" |
|
return list(self.bank_to_accts.keys()) |
|
|
|
def get_typology_members(self, num, bank_id=""): |
|
"""Choose accounts randomly as members of AML typologies from one or multiple banks. |
|
:param num: Number of total account vertices (including the main account) |
|
:param bank_id: If specified, it chooses members from a single bank with the ID. |
|
If empty (default), it chooses members from all banks randomly. |
|
:return: Main account and list of member account IDs |
|
""" |
|
if num <= 1: |
|
raise ValueError("The number of members must be more than 1") |
|
|
|
if bank_id in self.bank_to_accts: |
|
bank_accts = self.bank_to_accts[bank_id] |
|
main_candidates = self.hubs & bank_accts |
|
main_acct = random.sample(main_candidates, 1)[0] |
|
self.remove_typology_candidate(main_acct) |
|
sub_accts = random.sample(bank_accts, num - 1) |
|
for n in sub_accts: |
|
self.remove_typology_candidate(n) |
|
|
|
members = [main_acct] + sub_accts |
|
return main_acct, members |
|
|
|
elif bank_id == "": |
|
self.check_hub_exists() |
|
main_acct = random.sample(self.hubs, 1)[0] |
|
self.remove_typology_candidate(main_acct) |
|
|
|
sub_accts = random.sample(self.acct_to_bank.keys(), num - 1) |
|
for n in sub_accts: |
|
self.remove_typology_candidate(n) |
|
members = [main_acct] + sub_accts |
|
return main_acct, members |
|
|
|
else: |
|
raise KeyError("No such bank ID: %s" % bank_id) |
|
|
|
def load_account_list(self): |
|
"""Load and add account vertices from a CSV file |
|
""" |
|
if self.is_aggregated: |
|
self.load_account_list_param() |
|
else: |
|
self.load_account_list_raw() |
|
|
|
def load_account_list_raw(self): |
|
"""Load and add account vertices from a CSV file with raw account info |
|
header: uuid,seq,first_name,last_name,street_addr,city,state,zip,gender,phone_number,birth_date,ssn |
|
:param acct_file: Raw account list file path |
|
""" |
|
if self.default_min_balance is None: |
|
raise KeyError("Option 'default_min_balance' is required to load raw account list") |
|
min_balance = self.default_min_balance |
|
|
|
if self.default_max_balance is None: |
|
raise KeyError("Option 'default_max_balance' is required to load raw account list") |
|
max_balance = self.default_max_balance |
|
|
|
start_day = get_positive_or_none(self.default_start_step) |
|
end_day = get_positive_or_none(self.default_end_step) |
|
start_range = get_positive_or_none(self.default_start_range) |
|
end_range = get_positive_or_none(self.default_end_range) |
|
default_model = self.default_model if self.default_model is not None else 1 |
|
|
|
self.attr_names.extend(["first_name", "last_name", "street_addr", "city", "state", "zip", |
|
"gender", "phone_number", "birth_date", "ssn", "lon", "lat"]) |
|
|
|
with open(self.acct_file, "r") as rf: |
|
reader = csv.reader(rf) |
|
header = next(reader) |
|
name2idx = {n: i for i, n in enumerate(header)} |
|
idx_aid = name2idx["uuid"] |
|
idx_first_name = name2idx["first_name"] |
|
idx_last_name = name2idx["last_name"] |
|
idx_street_addr = name2idx["street_addr"] |
|
idx_city = name2idx["city"] |
|
idx_state = name2idx["state"] |
|
idx_zip = name2idx["zip"] |
|
idx_gender = name2idx["gender"] |
|
idx_phone_number = name2idx["phone_number"] |
|
idx_birth_date = name2idx["birth_date"] |
|
idx_ssn = name2idx["ssn"] |
|
idx_lon = name2idx["lon"] |
|
idx_lat = name2idx["lat"] |
|
|
|
default_country = "US" |
|
default_acct_type = "I" |
|
|
|
count = 0 |
|
for row in reader: |
|
if row[0].startswith("#"): |
|
continue |
|
aid = row[idx_aid] |
|
first_name = row[idx_first_name] |
|
last_name = row[idx_last_name] |
|
street_addr = row[idx_street_addr] |
|
city = row[idx_city] |
|
state = row[idx_state] |
|
zip_code = row[idx_zip] |
|
gender = row[idx_gender] |
|
phone_number = row[idx_phone_number] |
|
birth_date = row[idx_birth_date] |
|
ssn = row[idx_ssn] |
|
lon = row[idx_lon] |
|
lat = row[idx_lat] |
|
model = default_model |
|
|
|
if start_day is not None and start_range is not None: |
|
start = start_day + random.randrange(start_range) |
|
else: |
|
start = -1 |
|
|
|
if end_day is not None and end_range is not None: |
|
end = end_day - random.randrange(end_range) |
|
else: |
|
end = -1 |
|
|
|
attr = {"first_name": first_name, "last_name": last_name, "street_addr": street_addr, |
|
"city": city, "state": state, "zip": zip_code, "gender": gender, |
|
"phone_number": phone_number, "birth_date": birth_date, "ssn": ssn, "lon": lon, "lat": lat} |
|
|
|
init_balance = random.uniform(min_balance, max_balance) |
|
self.add_account(aid, init_balance=init_balance, country=default_country, business=default_acct_type, is_sar=False, **attr) |
|
count += 1 |
|
|
|
def set_num_accounts(self): |
|
with open(self.acct_file, "r") as rf: |
|
reader = csv.reader(rf) |
|
|
|
header = next(reader) |
|
|
|
count = 0 |
|
for row in reader: |
|
if row[0].startswith("#"): |
|
continue |
|
num = int(row[header.index('count')]) |
|
count += num |
|
|
|
self.num_accounts = count |
|
|
|
|
|
def load_account_list_param(self): |
|
|
|
"""Load and add account vertices from a CSV file with aggregated parameters |
|
Each row may represent two or more accounts |
|
:param acct_file: Account parameter file path |
|
""" |
|
|
|
with open(self.acct_file, "r") as rf: |
|
reader = csv.reader(rf) |
|
|
|
header = next(reader) |
|
|
|
acct_id = 0 |
|
for row in reader: |
|
if row[0].startswith("#"): |
|
continue |
|
num = int(row[header.index('count')]) |
|
min_balance = parse_float(row[header.index('min_balance')]) |
|
max_balance = parse_float(row[header.index('max_balance')]) |
|
country = row[header.index('country')] |
|
business = row[header.index('business_type')] |
|
bank_id = row[header.index('bank_id')] |
|
if bank_id is None: |
|
bank_id = self.default_bank_id |
|
|
|
for i in range(num): |
|
init_balance = random.uniform(min_balance, max_balance) |
|
self.add_account(acct_id, init_balance=init_balance, country=country, business=business, bank_id=bank_id, is_sar=False, normal_models=list()) |
|
acct_id += 1 |
|
|
|
logger.info("Generated %d accounts." % self.num_accounts) |
|
|
|
def generate_normal_transactions(self): |
|
"""Generate a base directed graph from degree sequences |
|
TODO: Add options to call scale-free generator functions directly instead of loading degree CSV files |
|
:return: Directed graph as the base transaction graph (not complete transaction graph) |
|
""" |
|
deg_file = os.path.join(self.input_dir, self.degree_file) |
|
in_deg, out_deg = get_degrees(deg_file, self.num_accounts) |
|
G = directed_configuration_model(in_deg, out_deg, self.seed) |
|
G = nx.DiGraph(G) |
|
self.g = G |
|
|
|
logger.info("Add %d base transactions" % self.g.number_of_edges()) |
|
nodes = self.g.nodes() |
|
for src_i, dst_i in self.g.edges(): |
|
src = nodes[src_i] |
|
dst = nodes[dst_i] |
|
self.add_edge_info(src, dst) |
|
|
|
def add_account(self, acct_id, **attr): |
|
"""Add an account vertex |
|
:param acct_id: Account ID |
|
:param init_balance: Initial amount |
|
:param start: The day when the account opened |
|
:param end: The day when the account closed |
|
:param country: Country name |
|
:param business: Business type |
|
:param bank_id: Bank ID |
|
:param attr: Optional attributes- |
|
:return: |
|
""" |
|
|
|
if attr['bank_id'] is None: |
|
attr['bank_id'] = self.default_bank_id |
|
|
|
self.g.node[acct_id] = attr |
|
|
|
self.bank_to_accts[attr['bank_id']].add(acct_id) |
|
self.acct_to_bank[acct_id] = attr['bank_id'] |
|
|
|
|
|
def remove_typology_candidate(self, acct): |
|
"""Remove an account vertex from AML typology member candidates |
|
:param acct: Account ID |
|
""" |
|
self.hubs.discard(acct) |
|
bank_id = self.acct_to_bank[acct] |
|
del self.acct_to_bank[acct] |
|
self.bank_to_accts[bank_id].discard(acct) |
|
|
|
def add_edge_info(self, orig, bene): |
|
"""Adds info to edge. Based on add_transaction. |
|
Add transaction will go away eventually. |
|
:param orig: Originator account ID |
|
:param bene: Beneficiary account ID |
|
:return: |
|
""" |
|
self.check_account_exist(orig) |
|
self.check_account_exist(bene) |
|
if orig == bene: |
|
raise ValueError("Self loop from/to %s is not allowed for transaction networks" % str(orig)) |
|
self.g.edge[orig][bene]['edge_id'] = self.edge_id |
|
self.edge_id += 1 |
|
|
|
|
|
def add_subgraph(self, members, topology): |
|
"""Add subgraph from existing account vertices and given graph topology |
|
:param members: Account vertex list |
|
:param topology: Topology graph |
|
:return: |
|
""" |
|
if len(members) != topology.number_of_nodes(): |
|
raise nx.NetworkXError("The number of account vertices does not match") |
|
|
|
node_map = dict(zip(members, topology.nodes())) |
|
for e in topology.edges(): |
|
src = node_map[e[0]] |
|
dst = node_map[e[1]] |
|
self.g.add_edge(src, dst) |
|
self.add_edge_info(src, dst) |
|
|
|
def load_edgelist(self, members, csv_name): |
|
"""Load edgelist and add edges with existing account vertices |
|
:param members: Account vertex list |
|
:param csv_name: Edgelist file name |
|
:return: |
|
""" |
|
topology = nx.DiGraph() |
|
topology = nx.read_edgelist(csv_name, delimiter=",", create_using=topology) |
|
self.add_subgraph(members, topology) |
|
|
|
|
|
def mark_active_edges(self): |
|
nx.set_edge_attributes(self.g, 'active', False) |
|
for normal_model in self.normal_models: |
|
subgraph = self.g.subgraph(normal_model.node_ids) |
|
nx.set_edge_attributes(subgraph, 'active', True) |
|
|
|
|
|
def load_normal_models(self): |
|
"""Load a Normal Model parameter file |
|
""" |
|
normal_models_file = os.path.join(self.input_dir, self.normal_models_file) |
|
with open(normal_models_file, "r") as csvfile: |
|
reader = csv.reader(csvfile) |
|
self.read_normal_models(reader) |
|
|
|
|
|
def read_normal_models(self, reader): |
|
"""Parse the Normal Model parameter file |
|
""" |
|
header = next(reader) |
|
|
|
self.nominator = Nominator(self.g, self.degree_threshold) |
|
|
|
for row in reader: |
|
count = int(row[header.index('count')]) |
|
type = row[header.index('type')] |
|
schedule_id = int(row[header.index('schedule_id')]) |
|
min_accounts = int(row[header.index('min_accounts')]) |
|
max_accounts = int(row[header.index('max_accounts')]) |
|
min_period = int(row[header.index('min_period')]) |
|
max_period = int(row[header.index('max_period')]) |
|
bank_id = row[header.index('bank_id')] |
|
if bank_id is None: |
|
bank_id = self.default_bank_id |
|
|
|
self.nominator.initialize_count(type, count) |
|
|
|
|
|
def build_normal_models(self): |
|
while(self.nominator.has_more()): |
|
for type in self.nominator.types(): |
|
count = self.nominator.count(type) |
|
if count > 0: |
|
self.choose_normal_model(type) |
|
self.normal_model_id += 1 |
|
logger.info("Generated %d normal models." % len(self.normal_models)) |
|
logger.info("Normal model counts %s", self.nominator.used_count_dict) |
|
|
|
|
|
def choose_normal_model(self, type): |
|
if type == 'fan_in': |
|
self.fan_in_model(type) |
|
elif type == 'fan_out': |
|
self.fan_out_model(type) |
|
elif type == 'forward': |
|
self.forward_model(type) |
|
elif type == 'single': |
|
self.single_model(type) |
|
elif type == 'mutual': |
|
self.mutual_model(type) |
|
elif type == 'periodical': |
|
self.periodical_model(type) |
|
|
|
|
|
def fan_in_model(self, type): |
|
node_id = self.nominator.next(type) |
|
|
|
if node_id is None: |
|
return |
|
|
|
candidates = self.nominator.fan_in_breakdown(type, node_id) |
|
|
|
if not candidates: |
|
raise ValueError('should always be candidates') |
|
|
|
normal_models = self.nominator.normal_models_in_type_relationship(type, node_id, {node_id}) |
|
for nm in normal_models: |
|
nm.remove_node_ids(candidates) |
|
|
|
result_ids = candidates | { node_id } |
|
normal_model = NormalModel(self.normal_model_id, type, result_ids, node_id) |
|
|
|
for result_id in result_ids: |
|
self.g.node[result_id]['normal_models'].append(normal_model) |
|
|
|
self.normal_models.append(normal_model) |
|
|
|
self.nominator.post_fan_in(node_id, type) |
|
|
|
|
|
def fan_out_model(self, type): |
|
node_id = self.nominator.next(type) |
|
|
|
if node_id is None: |
|
return |
|
|
|
candidates = self.nominator.fan_out_breakdown(type, node_id) |
|
|
|
if not candidates: |
|
raise ValueError('should always be candidates') |
|
|
|
normal_models = self.nominator.normal_models_in_type_relationship(type, node_id, {node_id}) |
|
for nm in normal_models: |
|
nm.remove_node_ids(candidates) |
|
|
|
result_ids = candidates | { node_id } |
|
normal_model = NormalModel(self.normal_model_id, type, result_ids, node_id) |
|
for id in result_ids: |
|
self.g.node[id]['normal_models'].append(normal_model) |
|
|
|
self.normal_models.append(normal_model) |
|
|
|
self.nominator.post_fan_out(node_id, type) |
|
|
|
|
|
def forward_model(self, type): |
|
node_id = self.nominator.next(type) |
|
|
|
if node_id is None: |
|
return |
|
|
|
succ_ids = self.g.successors(node_id) |
|
pred_ids = self.g.predecessors(node_id) |
|
|
|
sets = [{node_id, pred_id, succ_id} for pred_id in pred_ids for succ_id in succ_ids] |
|
|
|
set = next( |
|
set for set in sets if not self.nominator.is_in_type_relationship(type, node_id, set) |
|
) |
|
normal_model = NormalModel(self.normal_model_id, type, list(set), node_id) |
|
for id in set: |
|
self.g.node[id]['normal_models'].append(normal_model) |
|
|
|
self.normal_models.append(normal_model) |
|
|
|
self.nominator.post_forward(node_id, type) |
|
|
|
|
|
def single_model(self, type): |
|
node_id = self.nominator.next(type) |
|
|
|
if node_id is None: |
|
return |
|
|
|
succ_ids = self.g.successors(node_id) |
|
succ_id = next(succ_id for succ_id in succ_ids if not self.nominator.is_in_type_relationship(type, node_id, {node_id, succ_id})) |
|
|
|
result_ids = { node_id, succ_id } |
|
normal_model = NormalModel(self.normal_model_id, type, result_ids, node_id) |
|
for id in result_ids: |
|
self.g.node[id]['normal_models'].append(normal_model) |
|
|
|
self.normal_models.append(normal_model) |
|
|
|
self.nominator.post_single(node_id, type) |
|
|
|
|
|
def periodical_model(self, type): |
|
node_id = self.nominator.next(type) |
|
|
|
if node_id is None: |
|
return |
|
|
|
succ_ids = self.g.successors(node_id) |
|
succ_id = next(succ_id for succ_id in succ_ids if not self.nominator.is_in_type_relationship(type, node_id, {node_id, succ_id})) |
|
|
|
result_ids = { node_id, succ_id } |
|
normal_model = NormalModel(self.normal_model_id, type, result_ids, node_id) |
|
for id in result_ids: |
|
self.g.node[id]['normal_models'].append(normal_model) |
|
|
|
self.normal_models.append(normal_model) |
|
|
|
self.nominator.post_periodical(node_id, type) |
|
|
|
|
|
def mutual_model(self, type): |
|
node_id = self.nominator.next(type) |
|
|
|
if node_id is None: |
|
return |
|
|
|
succ_ids = self.g.successors(node_id) |
|
succ_id = next(succ_id for succ_id in succ_ids if not self.nominator.is_in_type_relationship(type, node_id, {node_id, succ_id})) |
|
|
|
result_ids = { node_id, succ_id } |
|
normal_model = NormalModel(self.normal_model_id, type, result_ids, node_id) |
|
for id in result_ids: |
|
self.g.node[id]['normal_models'].append(normal_model) |
|
|
|
self.normal_models.append(normal_model) |
|
|
|
self.nominator.post_mutual(node_id, type) |
|
|
|
|
|
def load_alert_patterns(self): |
|
"""Load an AML typology parameter file |
|
:return: |
|
""" |
|
alert_file = os.path.join(self.input_dir, self.alert_file) |
|
|
|
idx_num = None |
|
idx_type = None |
|
idx_schedule = None |
|
idx_min_accts = None |
|
idx_max_accts = None |
|
idx_min_amt = None |
|
idx_max_amt = None |
|
idx_min_period = None |
|
idx_max_period = None |
|
idx_bank = None |
|
idx_sar = None |
|
|
|
with open(alert_file, "r") as rf: |
|
reader = csv.reader(rf) |
|
|
|
header = next(reader) |
|
for i, k in enumerate(header): |
|
if k == "count": |
|
idx_num = i |
|
elif k == "type": |
|
idx_type = i |
|
elif k == "schedule_id": |
|
idx_schedule = i |
|
elif k == "min_accounts": |
|
idx_min_accts = i |
|
elif k == "max_accounts": |
|
idx_max_accts = i |
|
elif k == "min_amount": |
|
idx_min_amt = i |
|
elif k == "max_amount": |
|
idx_max_amt = i |
|
elif k == "min_period": |
|
idx_min_period = i |
|
elif k == "max_period": |
|
idx_max_period = i |
|
elif k == "bank_id": |
|
idx_bank = i |
|
elif k == "is_sar": |
|
idx_sar = i |
|
else: |
|
logger.warning("Unknown column name in %s: %s" % (alert_file, k)) |
|
|
|
|
|
count = 0 |
|
for row in reader: |
|
if len(row) == 0 or row[0].startswith("#"): |
|
continue |
|
num_patterns = int(row[idx_num]) |
|
typology_name = row[idx_type] |
|
schedule = int(row[idx_schedule]) |
|
min_accts = int(row[idx_min_accts]) |
|
max_accts = int(row[idx_max_accts]) |
|
min_amount = parse_float(row[idx_min_amt]) |
|
max_amount = parse_float(row[idx_max_amt]) |
|
min_period = parse_int(row[idx_min_period]) |
|
max_period = parse_int(row[idx_max_period]) |
|
bank_id = row[idx_bank] if idx_bank is not None else "" |
|
is_sar = parse_flag(row[idx_sar]) |
|
|
|
if typology_name not in self.alert_types: |
|
logger.warning("Pattern type name (%s) must be one of %s" |
|
% (typology_name, str(self.alert_types.keys()))) |
|
continue |
|
|
|
for i in range(num_patterns): |
|
num_accts = random.randrange(min_accts, max_accts + 1) |
|
period = random.randrange(min_period, max_period + 1) |
|
self.add_aml_typology(is_sar, typology_name, num_accts, min_amount, max_amount, period, bank_id, schedule) |
|
count += 1 |
|
if count % 1000 == 0: |
|
logger.info("Created %d alerts" % count) |
|
|
|
def add_aml_typology(self, is_sar, typology_name, num_accounts, min_amount, max_amount, period, bank_id="", schedule=1): |
|
"""Add an AML typology transaction set |
|
:param is_sar: Whether the alerted transaction set is SAR (True) or false-alert (False) |
|
:param typology_name: Name of pattern type |
|
("fan_in", "fan_out", "cycle", "random", "stack", "scatter_gather" or "gather_scatter") |
|
:param num_accounts: Number of transaction members (accounts) |
|
:param min_amount: Minimum amount of the transaction |
|
:param max_amount: Maximum amount of the transaction |
|
:param period: Period (number of days) for all transactions |
|
:param bank_id: Bank ID which it chooses members from. If empty, it chooses members from all banks. |
|
:param schedule: AML pattern transaction schedule model ID |
|
""" |
|
|
|
def add_node(_acct, _bank_id): |
|
"""Set an attribute of bank ID to a member account |
|
:param _acct: Account ID |
|
:param _bank_id: Bank ID |
|
""" |
|
attr_dict = self.g.node[_acct] |
|
attr_dict[IS_SAR_KEY] = True |
|
|
|
sub_g.add_node(_acct, attr_dict) |
|
|
|
|
|
def add_main_acct(): |
|
"""Create a main account ID and a bank ID from hub accounts |
|
:return: main account ID and bank ID |
|
""" |
|
self.check_hub_exists() |
|
_main_acct = random.sample(self.hubs, 1)[0] |
|
_main_bank_id = self.acct_to_bank[_main_acct] |
|
self.remove_typology_candidate(_main_acct) |
|
add_node(_main_acct, _main_bank_id) |
|
return _main_acct, _main_bank_id |
|
|
|
def add_edge(_orig, _bene, _amount, _date): |
|
"""Add transaction edge to the AML typology subgraph as well as the whole transaction graph |
|
:param _orig: Originator account ID |
|
:param _bene: Beneficiary account ID |
|
:param _amount: Transaction amount |
|
:param _date: Transaction timestamp |
|
""" |
|
sub_g.add_edge(_orig, _bene, amount=_amount, date=_date) |
|
self.g.add_edge(_orig, _bene) |
|
self.add_edge_info(_orig, _bene) |
|
|
|
|
|
if bank_id == "" and len(self.bank_to_accts) >= 2: |
|
is_external = True |
|
elif bank_id != "" and bank_id not in self.bank_to_accts: |
|
raise KeyError("No such bank ID: %s" % bank_id) |
|
else: |
|
is_external = False |
|
|
|
start_date = random.randrange(0, self.total_steps - period + 1) |
|
end_date = start_date + period - 1 |
|
|
|
|
|
model_id = self.alert_types[typology_name] |
|
sub_g = nx.DiGraph(model_id=model_id, reason=typology_name, scheduleID=schedule, |
|
start=start_date, end=end_date) |
|
|
|
|
|
if typology_name == "fan_in": |
|
main_acct, main_bank_id = add_main_acct() |
|
num_neighbors = num_accounts - 1 |
|
amount = RoundedAmount(min_amount, max_amount).getAmount() |
|
|
|
if is_external: |
|
sub_bank_candidates = [b for b, nbs in self.bank_to_accts.items() |
|
if b != main_bank_id and len(nbs) >= num_neighbors] |
|
if not sub_bank_candidates: |
|
logger.warning("No banks with appropriate number of neighboring accounts found.") |
|
return |
|
sub_bank_id = random.choice(sub_bank_candidates) |
|
else: |
|
sub_bank_id = main_bank_id |
|
sub_accts = random.sample(self.bank_to_accts[sub_bank_id], num_neighbors) |
|
for n in sub_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, sub_bank_id) |
|
|
|
for orig in sub_accts: |
|
date = random.randrange(start_date, end_date + 1) |
|
add_edge(orig, main_acct, amount, date) |
|
|
|
elif typology_name == "fan_out": |
|
main_acct, main_bank_id = add_main_acct() |
|
num_neighbors = num_accounts - 1 |
|
amount = RoundedAmount(min_amount, max_amount).getAmount() |
|
|
|
if is_external: |
|
sub_bank_candidates = [b for b, nbs in self.bank_to_accts.items() |
|
if b != main_bank_id and len(nbs) >= num_neighbors] |
|
if not sub_bank_candidates: |
|
logger.warning("No banks with appropriate number of neighboring accounts found.") |
|
return |
|
sub_bank_id = random.choice(sub_bank_candidates) |
|
else: |
|
sub_bank_id = main_bank_id |
|
sub_accts = random.sample(self.bank_to_accts[sub_bank_id], num_neighbors) |
|
for n in sub_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, sub_bank_id) |
|
|
|
for bene in sub_accts: |
|
date = random.randrange(start_date, end_date + 1) |
|
add_edge(main_acct, bene, amount, date) |
|
|
|
elif typology_name == "bipartite": |
|
orig_bank_id = random.choice(self.get_all_bank_ids()) |
|
if is_external: |
|
bene_bank_id = random.choice([b for b in self.get_all_bank_ids() if b != orig_bank_id]) |
|
else: |
|
bene_bank_id = orig_bank_id |
|
|
|
num_orig_accts = num_accounts // 2 |
|
num_bene_accts = num_accounts - num_orig_accts |
|
|
|
orig_accts = random.sample(self.bank_to_accts[orig_bank_id], num_orig_accts) |
|
for n in orig_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, orig_bank_id) |
|
main_acct = orig_accts[0] |
|
|
|
bene_accts = random.sample(self.bank_to_accts[bene_bank_id], num_bene_accts) |
|
for n in bene_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, bene_bank_id) |
|
|
|
for orig, bene in itertools.product(orig_accts, bene_accts): |
|
amount = RandomAmount(min_amount, max_amount).getAmount() |
|
date = random.randrange(start_date, end_date + 1) |
|
add_edge(orig, bene, amount, date) |
|
|
|
elif typology_name == "stack": |
|
if is_external: |
|
if len(self.get_all_bank_ids()) >= 3: |
|
[orig_bank_id, mid_bank_id, bene_bank_id] = random.sample(self.get_all_bank_ids(), 3) |
|
else: |
|
[orig_bank_id, mid_bank_id] = random.sample(self.get_all_bank_ids(), 2) |
|
bene_bank_id = orig_bank_id |
|
else: |
|
orig_bank_id = mid_bank_id = bene_bank_id = random.sample(self.get_all_bank_ids(), 1)[0] |
|
|
|
|
|
num_orig_accts = num_mid_accts = num_accounts // 3 |
|
|
|
num_bene_accts = num_accounts - num_orig_accts * 2 |
|
|
|
orig_accts = random.sample(self.bank_to_accts[orig_bank_id], num_orig_accts) |
|
for n in orig_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, orig_bank_id) |
|
main_acct = orig_accts[0] |
|
|
|
mid_accts = random.sample(self.bank_to_accts[mid_bank_id], num_mid_accts) |
|
for n in mid_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, mid_bank_id) |
|
bene_accts = random.sample(self.bank_to_accts[bene_bank_id], num_bene_accts) |
|
for n in bene_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, bene_bank_id) |
|
|
|
for orig, bene in itertools.product(orig_accts, mid_accts): |
|
amount = RandomAmount(min_amount, max_amount).getAmount() |
|
date = random.randrange(start_date, end_date + 1) |
|
add_edge(orig, bene, amount, date) |
|
|
|
for orig, bene in itertools.product(mid_accts, bene_accts): |
|
amount = RandomAmount(min_amount, max_amount).getAmount() |
|
date = random.randrange(start_date, end_date + 1) |
|
add_edge(orig, bene, amount, date) |
|
|
|
elif typology_name == "random": |
|
amount = RandomAmount(min_amount, max_amount).getAmount() |
|
date = random.randrange(start_date, end_date + 1) |
|
|
|
if is_external: |
|
all_bank_ids = self.get_all_bank_ids() |
|
bank_id_iter = itertools.cycle(all_bank_ids) |
|
prev_acct = None |
|
main_acct = None |
|
for _ in range(num_accounts): |
|
bank_id = next(bank_id_iter) |
|
next_acct = random.sample(self.bank_to_accts[bank_id], 1)[0] |
|
if prev_acct is None: |
|
main_acct = next_acct |
|
else: |
|
add_edge(prev_acct, next_acct, amount, date) |
|
self.remove_typology_candidate(next_acct) |
|
add_node(next_acct, bank_id) |
|
prev_acct = next_acct |
|
|
|
else: |
|
main_acct, main_bank_id = add_main_acct() |
|
sub_accts = random.sample(self.bank_to_accts[main_bank_id], num_accounts - 1) |
|
for n in sub_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, main_bank_id) |
|
prev_acct = main_acct |
|
for _ in range(num_accounts - 1): |
|
next_acct = random.choice([n for n in sub_accts if n != prev_acct]) |
|
add_edge(prev_acct, next_acct, amount, date) |
|
prev_acct = next_acct |
|
|
|
elif typology_name == "cycle": |
|
amount = RandomAmount(min_amount, max_amount).getAmount() |
|
dates = sorted([random.randrange(start_date, end_date + 1) for _ in range(num_accounts)]) |
|
|
|
if is_external: |
|
all_accts = list() |
|
all_bank_ids = self.get_all_bank_ids() |
|
remain_num = num_accounts |
|
|
|
while all_bank_ids: |
|
num_accts_per_bank = remain_num // len(all_bank_ids) |
|
bank_id = all_bank_ids.pop() |
|
new_members = random.sample(self.bank_to_accts[bank_id], num_accts_per_bank) |
|
all_accts.extend(new_members) |
|
|
|
remain_num -= len(new_members) |
|
for n in new_members: |
|
self.remove_typology_candidate(n) |
|
add_node(n, bank_id) |
|
main_acct = all_accts[0] |
|
else: |
|
main_acct, main_bank_id = add_main_acct() |
|
sub_accts = random.sample(self.bank_to_accts[main_bank_id], num_accounts - 1) |
|
for n in sub_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, main_bank_id) |
|
all_accts = [main_acct] + sub_accts |
|
|
|
for i in range(num_accounts): |
|
orig_i = i |
|
bene_i = (i + 1) % num_accounts |
|
orig_acct = all_accts[orig_i] |
|
bene_acct = all_accts[bene_i] |
|
date = dates[i] |
|
|
|
add_edge(orig_acct, bene_acct, amount, date) |
|
margin = amount * self.margin_ratio |
|
amount = amount - margin |
|
|
|
elif typology_name == "scatter_gather": |
|
if is_external: |
|
if len(self.get_all_bank_ids()) >= 3: |
|
[orig_bank_id, mid_bank_id, bene_bank_id] = random.sample(self.get_all_bank_ids(), 3) |
|
else: |
|
[orig_bank_id, mid_bank_id] = random.sample(self.get_all_bank_ids(), 2) |
|
bene_bank_id = orig_bank_id |
|
else: |
|
orig_bank_id = mid_bank_id = bene_bank_id = random.sample(self.get_all_bank_ids(), 1)[0] |
|
|
|
main_acct = orig_acct = random.sample(self.bank_to_accts[orig_bank_id], 1)[0] |
|
self.remove_typology_candidate(orig_acct) |
|
add_node(orig_acct, orig_bank_id) |
|
mid_accts = random.sample(self.bank_to_accts[mid_bank_id], num_accounts - 2) |
|
for n in mid_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, mid_bank_id) |
|
bene_acct = random.sample(self.bank_to_accts[bene_bank_id], 1)[0] |
|
self.remove_typology_candidate(bene_acct) |
|
add_node(bene_acct, bene_bank_id) |
|
|
|
|
|
mid_date = (start_date + end_date) // 2 |
|
|
|
for i in range(len(mid_accts)): |
|
mid_acct = mid_accts[i] |
|
scatter_amount = RandomAmount(min_amount, max_amount).getAmount() |
|
margin = scatter_amount * self.margin_ratio |
|
amount = scatter_amount - margin |
|
scatter_date = random.randrange(start_date, mid_date) |
|
gather_date = random.randrange(mid_date, end_date + 1) |
|
|
|
add_edge(orig_acct, mid_acct, scatter_amount, scatter_date) |
|
add_edge(mid_acct, bene_acct, amount, gather_date) |
|
|
|
elif typology_name == "gather_scatter": |
|
if is_external: |
|
if len(self.get_all_bank_ids()) >= 3: |
|
[orig_bank_id, mid_bank_id, bene_bank_id] = random.sample(self.get_all_bank_ids(), 3) |
|
else: |
|
[orig_bank_id, mid_bank_id] = random.sample(self.get_all_bank_ids(), 2) |
|
bene_bank_id = orig_bank_id |
|
else: |
|
orig_bank_id = mid_bank_id = bene_bank_id = random.sample(self.get_all_bank_ids(), 1)[0] |
|
|
|
num_orig_accts = num_bene_accts = (num_accounts - 1) // 2 |
|
|
|
orig_accts = random.sample(self.bank_to_accts[orig_bank_id], num_orig_accts) |
|
for n in orig_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, orig_bank_id) |
|
main_acct = mid_acct = random.sample(self.bank_to_accts[mid_bank_id], 1)[0] |
|
self.remove_typology_candidate(mid_acct) |
|
add_node(mid_acct, mid_bank_id) |
|
bene_accts = random.sample(self.bank_to_accts[bene_bank_id], num_bene_accts) |
|
for n in bene_accts: |
|
self.remove_typology_candidate(n) |
|
add_node(n, bene_bank_id) |
|
|
|
accumulated_amount = 0.0 |
|
mid_date = (start_date + end_date) // 2 |
|
amount = RandomAmount(min_amount, max_amount).getAmount() |
|
|
|
for i in range(num_orig_accts): |
|
orig_acct = orig_accts[i] |
|
date = random.randrange(start_date, mid_date) |
|
add_edge(orig_acct, mid_acct, amount, date) |
|
accumulated_amount += amount |
|
|
|
|
|
for i in range(num_bene_accts): |
|
bene_acct = bene_accts[i] |
|
date = random.randrange(mid_date, end_date + 1) |
|
add_edge(mid_acct, bene_acct, amount, date) |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
logger.warning("Unknown AML typology name: %s" % typology_name) |
|
return |
|
|
|
|
|
sub_g.graph[MAIN_ACCT_KEY] = main_acct |
|
sub_g.graph[IS_SAR_KEY] = is_sar |
|
self.alert_groups[self.alert_id] = sub_g |
|
self.alert_id += 1 |
|
|
|
def write_account_list(self): |
|
os.makedirs(self.output_dir, exist_ok=True) |
|
acct_file = os.path.join(self.output_dir, self.out_account_file) |
|
with open(acct_file, "w") as wf: |
|
writer = csv.writer(wf) |
|
base_attrs = ["ACCOUNT_ID", "CUSTOMER_ID", "INIT_BALANCE", "COUNTRY", |
|
"ACCOUNT_TYPE", "IS_SAR", "BANK_ID"] |
|
writer.writerow(base_attrs + self.attr_names) |
|
for n in self.g.nodes(data=True): |
|
aid = n[0] |
|
cid = "C_" + str(aid) |
|
prop = n[1] |
|
balance = "{0:.2f}".format(prop["init_balance"]) |
|
country = prop["country"] |
|
business = prop["business"] |
|
is_sar = "true" if prop[IS_SAR_KEY] else "false" |
|
bank_id = prop["bank_id"] |
|
values = [aid, cid, balance, country, business, is_sar, bank_id] |
|
for attr_name in self.attr_names: |
|
values.append(prop[attr_name]) |
|
writer.writerow(values) |
|
logger.info("Exported %d accounts to %s" % (self.g.number_of_nodes(), acct_file)) |
|
|
|
def write_transaction_list(self): |
|
tx_file = os.path.join(self.output_dir, self.out_tx_file) |
|
with open(tx_file, "w") as wf: |
|
writer = csv.writer(wf) |
|
writer.writerow(["id", "src", "dst", "ttype"]) |
|
for e in self.g.edges(data=True): |
|
src = e[0] |
|
dst = e[1] |
|
attr = e[2] |
|
tid = attr['edge_id'] |
|
tx_type = random.choice(self.tx_types) |
|
if attr['active']: |
|
writer.writerow([tid, src, dst, tx_type]) |
|
logger.info("Exported %d transactions to %s" % (self.g.number_of_edges(), tx_file)) |
|
|
|
def write_alert_account_list(self): |
|
def get_out_edge_attrs(g, vid, name): |
|
return [v for k, v in nx.get_edge_attributes(g, name).items() if (k[0] == vid or k[1] == vid)] |
|
|
|
acct_count = 0 |
|
alert_member_file = os.path.join(self.output_dir, self.out_alert_member_file) |
|
logger.info("Output alert member list to: " + alert_member_file) |
|
with open(alert_member_file, "w") as wf: |
|
writer = csv.writer(wf) |
|
base_attrs = ["alertID", "reason", "accountID", "isMain", "isSAR", "modelID", |
|
"minAmount", "maxAmount", "startStep", "endStep", "scheduleID", "bankID"] |
|
writer.writerow(base_attrs + self.attr_names) |
|
for gid, sub_g in self.alert_groups.items(): |
|
main_id = sub_g.graph[MAIN_ACCT_KEY] |
|
model_id = sub_g.graph["model_id"] |
|
schedule_id = sub_g.graph["scheduleID"] |
|
reason = sub_g.graph["reason"] |
|
start = sub_g.graph["start"] |
|
end = sub_g.graph["end"] |
|
for n in sub_g.nodes(): |
|
is_main = "true" if n == main_id else "false" |
|
is_sar = "true" if sub_g.graph[IS_SAR_KEY] else "false" |
|
min_amt = '{:.2f}'.format(min(get_out_edge_attrs(sub_g, n, "amount"))) |
|
max_amt = '{:.2f}'.format(max(get_out_edge_attrs(sub_g, n, "amount"))) |
|
min_step = start |
|
max_step = end |
|
bank_id = sub_g.node[n]["bank_id"] |
|
values = [gid, reason, n, is_main, is_sar, model_id, min_amt, max_amt, |
|
min_step, max_step, schedule_id, bank_id] |
|
prop = self.g.node[n] |
|
for attr_name in self.attr_names: |
|
values.append(prop[attr_name]) |
|
writer.writerow(values) |
|
acct_count += 1 |
|
|
|
logger.info("Exported %d members for %d AML typologies to %s" % |
|
(acct_count, len(self.alert_groups), alert_member_file)) |
|
|
|
def write_normal_models(self): |
|
output_file = os.path.join(self.output_dir, self.out_normal_models_file) |
|
with open(output_file, "w") as wf: |
|
writer = csv.writer(wf) |
|
column_headers = ["modelID", "type", "accountID", "isMain", "isSAR", "scheduleID"] |
|
writer.writerow(column_headers) |
|
|
|
for normal_model in self.normal_models: |
|
for account_id in normal_model.node_ids: |
|
values = [normal_model.id, normal_model.type, account_id, normal_model.is_main(account_id), False, 2] |
|
writer.writerow(values) |
|
|
|
|
|
def count__patterns(self, threshold=2): |
|
"""Count the number of fan-in and fan-out patterns in the generated transaction graph |
|
""" |
|
in_deg = Counter(self.g.in_degree().values()) |
|
out_deg = Counter(self.g.out_degree().values()) |
|
for th in range(2, threshold + 1): |
|
num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
|
num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
|
logger.info("\tNumber of fan-in / fan-out patterns with %d neighbors: %d / %d" |
|
% (th, num_fan_in, num_fan_out)) |
|
|
|
main_in_deg = Counter() |
|
main_out_deg = Counter() |
|
for sub_g in self.alert_groups.values(): |
|
main_acct = sub_g.graph[MAIN_ACCT_KEY] |
|
main_in_deg[self.g.in_degree(main_acct)] += 1 |
|
main_out_deg[self.g.out_degree(main_acct)] += 1 |
|
for th in range(2, threshold + 1): |
|
num_fan_in = sum([c for d, c in main_in_deg.items() if d >= threshold]) |
|
num_fan_out = sum([c for d, c in main_out_deg.items() if d >= threshold]) |
|
logger.info("\tNumber of alerted fan-in / fan-out patterns with %d neighbors: %d / %d" |
|
% (th, num_fan_in, num_fan_out)) |
|
|
|
|
|
if __name__ == "__main__": |
|
argv = sys.argv |
|
argc = len(argv) |
|
if argc < 2: |
|
print("Usage: python3 %s [ConfJSON]" % argv[0]) |
|
exit(1) |
|
|
|
_conf_file = argv[1] |
|
_sim_name = argv[2] if argc >= 3 else None |
|
|
|
|
|
|
|
|
|
deg_param = os.getenv("DEGREE") |
|
degree_threshold = 0 if deg_param is None else int(deg_param) |
|
|
|
with open(_conf_file, "r") as rf: |
|
conf = json.load(rf) |
|
|
|
txg = TransactionGenerator(conf, _sim_name) |
|
txg.set_num_accounts() |
|
txg.generate_normal_transactions() |
|
txg.load_account_list() |
|
if degree_threshold > 0: |
|
logger.info("Generated normal transaction network") |
|
txg.count_fan_in_out_patterns(degree_threshold) |
|
txg.load_normal_models() |
|
|
|
txg.build_normal_models() |
|
txg.set_main_acct_candidates() |
|
txg.load_alert_patterns() |
|
txg.mark_active_edges() |
|
|
|
if degree_threshold > 0: |
|
logger.info("Added alert transaction patterns") |
|
txg.count_fan_in_out_patterns(degree_threshold) |
|
txg.write_account_list() |
|
txg.write_transaction_list() |
|
txg.write_alert_account_list() |
|
txg.write_normal_models() |
|
|