import java.util.concurrent.atomic.AtomicLong USER_DIR = System.getProperty('user.working_dir','.') + '/' DATA_DIR = USER_DIR + "outputs/" ACCT_CSV = DATA_DIR + "accounts.csv" TX_CSV = DATA_DIR + "tx.csv" PROP_FILE = USER_DIR + "janusgraph.properties" println "Open JanusGraph database from " + PROP_FILE counter = new AtomicLong() batchSize = 100000 cache = [:] graph = JanusGraphFactory.open(PROP_FILE) // create schema mgmt = graph.openManagement() // vertex schema // ACCOUNT_ID,CUSTOMER_ID,INIT_BALANCE,COUNTRY,ACCOUNT_TYPE,IS_SAR,TX_BEHAVIOR_ID mgmt.makePropertyKey('acct_id').dataType(String.class).make() // ACCOUNT_ID mgmt.makePropertyKey('cust_id').dataType(String.class).make() // CUSTOMER_ID mgmt.makePropertyKey('init_amount').dataType(Float.class).make() // INIT_BALANCE mgmt.makePropertyKey('country').dataType(String.class).make() // COUNTRY mgmt.makePropertyKey('acct_type').dataType(String.class).make() // ACCOUNT_TYPE mgmt.makePropertyKey('is_sar_acct').dataType(Boolean.class).make() // IS_SAR mgmt.makePropertyKey('behavior_id').dataType(Long.class).make() // TX_BEHAVIOR_ID // edge schema // TX_ID,SENDER_ACCOUNT_ID,RECEIVER_ACCOUNT_ID,TX_TYPE,TX_AMOUNT,TIMESTAMP,IS_SAR,ALERT_ID mgmt.makeEdgeLabel('edgelabel').make() mgmt.makePropertyKey('tx_id').dataType(String.class).make() // TX_ID mgmt.makePropertyKey('tx_type').dataType(String.class).make() // TX_TYPE mgmt.makePropertyKey('amount').dataType(Float.class).make() // TX_AMOUNT mgmt.makePropertyKey('date').dataType(Long.class).make() // TIMESTAMP mgmt.makePropertyKey('is_sar_tx').dataType(Boolean.class).make() // IS_SAR mgmt.makePropertyKey('alert_id').dataType(Long.class).make() // ALERT_ID mgmt.commit() mutate = { -> if (0 == counter.incrementAndGet() % batchSize) { graph.tx().commit() } } addVertex = { def acct, def cust, def amt, def country, def type, def is_sar, def behavior -> if(!cache.containsKey(acct)){ v = graph.addVertex("acct_id", acct, "cust_id", cust, "init_amount", amt, "country", country, "acct_type", type, "is_sar_acct", is_sar, "behavior_id", behavior) mutate() cache[acct] = v } } setProperty = {def placeholder, def label, def key, def value -> cache[label].property(key, value) mutate() } /** * Load account list (vertices) * ACCOUNT_ID,CUSTOMER_ID,INIT_BALANCE,COUNTRY,ACCOUNT_TYPE,IS_SAR,TX_BEHAVIOR_ID */ println "Start loading accounts from " + ACCT_CSV line_counter = new AtomicLong() reader = new BufferedReader(new FileReader(ACCT_CSV)) DEFAULT_INDEX = -1 acct_idx = DEFAULT_INDEX cust_idx = DEFAULT_INDEX amt_idx = DEFAULT_INDEX country_idx = DEFAULT_INDEX type_idx = DEFAULT_INDEX sar_idx = DEFAULT_INDEX behavior_idx = DEFAULT_INDEX // read the header line = reader.readLine() fields = line.split(',', -1) for(int i=0; i "tkey" since "key" is protected value in namespace cache[orig].addEdge("edgelabel", cache[dest], "tx_id", tx_id, "tx_type", tx_type, "amount", amount, "date", date, "is_sar_tx", is_sar, "alert_id", alert_id) mutate() } } graph.tx().commit() g = graph.traversal() numV = g.V().count().next() numE = g.E().count().next() println "total vertices added: ${numV}" println "total edges added: ${numE}" g.close() graph.close() System.exit(0)