File size: 8,278 Bytes
2795186
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
Load output files from AMLSim and create NetworkX graph for analytics
"""
import os
import sys
import csv
from datetime import datetime, timedelta
from dateutil.parser import parse
from collections import Counter
import networkx as nx
import json


# Account (vertex) and transaction (edge) attribute keys
ACCT_SAR = "sar"
TX_AMOUNT = "amount"
TX_DATE = "date"

DEGREE_STEP = 5  # Interval of degrees


def load_base_csv(acct_csv, tx_csv, schema_data):
    """Load account and transaction list CSV from the transaction graph generator (before running AMLSim)
    :param acct_csv: Account list CSV
    :param tx_csv: Transaction list CSV
    :param schema_data: Schema data from JSON file
    :return: Base transaction network as a NetworkX graph object
    """
    return None


def load_result_csv(acct_csv: str, tx_csv: str, schema_data) -> nx.MultiDiGraph:
    """Load account list CSV and transaction list CSV from AMLSim and generate transaction graph
    :param acct_csv: Account list CSV
    :param tx_csv: Transaction list CSV
    :param schema_data: Schema data from JSON
    :return: Transaction network as a NetworkX graph object
    """
    acct_id_idx = None
    acct_sar_idx = None
    tx_src_idx = None
    tx_dst_idx = None
    tx_amt_idx = None
    tx_date_idx = None
    is_date_type = False
    base_date = datetime(1970, 1, 1)

    for idx, col in enumerate(schema_data["account"]):
        data_type = col.get("dataType")
        if data_type == "account_id":
            acct_id_idx = idx
        elif data_type == "sar_flag":
            acct_sar_idx = idx
    for idx, col in enumerate(schema_data["transaction"]):
        data_type = col.get("dataType")
        if data_type == "orig_id":
            tx_src_idx = idx
        elif data_type == "dest_id":
            tx_dst_idx = idx
        elif data_type == "amount":
            tx_amt_idx = idx
        elif data_type == "timestamp":
            tx_date_idx = idx
            is_date_type = col.get("valueType") == "date"

    _g = nx.MultiDiGraph()
    num_accts = 0
    num_sar = 0
    num_txs = 0
    # Load account list CSV
    print("Load account list CSV file", acct_csv)
    with open(acct_csv, "r") as rf:
        reader = csv.reader(rf)
        next(reader)  # Skip header
        for row in reader:
            acct_id = row[acct_id_idx]  # Account ID
            is_sar = row[acct_sar_idx].lower() == "true"  # SAR flag
            attr = {ACCT_SAR: is_sar}
            _g.add_node(acct_id, **attr)
            num_accts += 1
            if is_sar:
                num_sar += 1
    print("Number of total accounts: %d" % num_accts)
    print("Number of SAR accounts: %d (%.2f%%)" % (num_sar, num_sar/num_accts*100))

    # Load transaction list CSV
    print("Loading transaction list CSV file", tx_csv)
    with open(tx_csv, "r") as rf:
        reader = csv.reader(rf)
        next(reader)  # Skip header
        for row in reader:
            src_id = row[tx_src_idx]  # Originator account ID
            dst_id = row[tx_dst_idx]  # Beneficiary account ID
            amount = float(row[tx_amt_idx])  # TX_AMOUNT
            date = parse(row[tx_date_idx]) if is_date_type else base_date + timedelta(int(row[tx_date_idx]))
            date_str = date.strftime("%Y-%m-%d")
            attr = {TX_AMOUNT: amount, TX_DATE: date_str}
            _g.add_edge(src_id, dst_id, **attr)
            num_txs += 1
            if num_txs % 100000 == 0:
                print("Loaded %d transactions" % num_txs)
    print("Number of transactions: %d" % num_txs)
    return _g


def load_alert_csv(_g, alert_acct_csv, alert_tx_csv, schema_data):
    """Load alert member and transaction lists
    """
    acct_id_idx = None


class __TransactionGraphLoader:

    def __init__(self, _conf_json):
        with open(_conf_json, "r") as rf:
            self.conf = json.load(rf)
        schema_json = os.path.join(self.conf["input"]["directory"], self.conf["input"]["schema"])
        with open(schema_json, "r") as rf:
            self.schema = json.load(rf)
        self.output_conf = self.conf["output"]
        self.g = nx.MultiDiGraph()
        self.sim_name = self.conf["general"]["simulation_name"]

    def get_graph(self):
        return self.g

    def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10):
        """Count number of "hub" accounts by degree
        """
        in_deg = Counter(self.g.in_degree().values())  # in-degree, count
        out_deg = Counter(self.g.out_degree().values())  # out-degree, count
        for th in range(min_degree, max_degree + 1, DEGREE_STEP):
            num_fan_in = sum([c for d, c in in_deg.items() if d >= th])
            num_fan_out = sum([c for d, c in out_deg.items() if d >= th])
            print("\tNumber of fan-in / fan-out patterns with", th, "or more neighbors:", num_fan_in, "/", num_fan_out)


class BaseGraphLoader(__TransactionGraphLoader):

    def __init__(self, _conf_json):
        super(BaseGraphLoader, self).__init__(_conf_json)


class ResultGraphLoader(__TransactionGraphLoader):

    def __init__(self, _conf_json):
        super(ResultGraphLoader, self).__init__(_conf_json)

        # Create a transaction graph from output files
        output_dir = os.path.join(self.output_conf["directory"], self.sim_name)
        acct_file = self.output_conf["accounts"]
        tx_file = self.output_conf["transactions"]
        alert_acct_file = self.output_conf["alert_members"]
        alert_tx_file = self.output_conf["alert_transactions"]

        acct_path = os.path.join(output_dir, acct_file)
        tx_path = os.path.join(output_dir, tx_file)
        self.g = load_result_csv(acct_path, tx_path, self.schema)
        self.num_normal_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if not flag])
        self.num_sar_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if flag])

    def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10):
        super(ResultGraphLoader, self).count_hub_accounts(min_degree, max_degree)

        # Extract the same statistical data for normal and alert account vertices
        normal_in_deg = Counter([v for k, v in self.g.in_degree().items() if not self.g.node[k][ACCT_SAR]])
        normal_out_deg = Counter([v for k, v in self.g.out_degree().items() if not self.g.node[k][ACCT_SAR]])
        sar_in_deg = Counter([v for k, v in self.g.in_degree().items() if self.g.node[k][ACCT_SAR]])
        sar_out_deg = Counter([v for k, v in self.g.out_degree().items() if self.g.node[k][ACCT_SAR]])

        print("Number of fan-in / fan-out patterns for %d normal accounts" % self.num_normal_accts)
        for th in range(min_degree, max_degree + 1, DEGREE_STEP):
            num_fan_in = sum([c for d, c in normal_in_deg.items() if d >= th])
            num_fan_out = sum([c for d, c in normal_out_deg.items() if d >= th])
            ratio_fan_in = num_fan_in / self.num_normal_accts
            ratio_fan_out = num_fan_out / self.num_normal_accts
            print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" %
                  (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100))

        print("Number of fan-in / fan-out patterns for %d SAR accounts" % self.num_sar_accts)
        for th in range(min_degree, max_degree + 1, DEGREE_STEP):
            num_fan_in = sum([c for d, c in sar_in_deg.items() if d >= th])
            num_fan_out = sum([c for d, c in sar_out_deg.items() if d >= th])
            ratio_fan_in = num_fan_in / self.num_sar_accts
            ratio_fan_out = num_fan_out / self.num_sar_accts
            print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" %
                  (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100))


if __name__ == "__main__":
    argv = sys.argv
    if len(argv) < 2:
        print("Usage: python3 %s [ConfJSON]" % argv[0])
        exit(1)

    conf_json = argv[1]

    # Base transaction network (as input of the simulator) analytics
    # bgl = BaseGraphLoader(conf_json)

    # Generated transaction network analysis as the final result
    rgl = ResultGraphLoader(conf_json)
    rgl.count_hub_accounts(5, 25)