File size: 12,422 Bytes
2854813
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8507fc0
2854813
 
 
 
 
 
 
 
 
 
8507fc0
 
 
2854813
 
 
 
 
 
8507fc0
 
2854813
 
 
 
8507fc0
2854813
 
 
 
 
8507fc0
 
 
2854813
 
 
 
 
 
 
 
8507fc0
 
 
2854813
 
 
 
 
8507fc0
2854813
8507fc0
2854813
 
 
 
8507fc0
 
2854813
 
 
 
8507fc0
2854813
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""icij_utils.py

Building an SQL agent over the ICIJ financial data leaks files.

:author: Didier Guillevic
:date: 2025-01-12
"""

import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

import pandas as pd
import sqlite3
import os
from pathlib import Path

from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


class ICIJDataLoader:
    def __init__(self, db_path='icij_data.db'):
        """Initialize the data loader with database path."""
        self.db_path = db_path
        self.table_mappings = {
            'nodes-addresses.csv': 'addresses',
            'nodes-entities.csv': 'entities',
            'nodes-intermediaries.csv': 'intermediaries',
            'nodes-officers.csv': 'officers',
            'nodes-others.csv': 'others',
            'relationships.csv': 'relationships'
        }

    def create_connection(self):
        """Create a database connection."""
        try:
            conn = sqlite3.connect(self.db_path)
            return conn
        except sqlite3.Error as e:
            print(f"Error connecting to database: {e}")
            return None

    def create_table_from_csv(self, csv_path, table_name, conn):
        """Create a table based on CSV structure and load data."""
        try:
            # Read the first few rows to get column names and types
            df = pd.read_csv(csv_path, nrows=5)
            
            # Create table with appropriate columns
            columns = []
            for col in df.columns:
                # Determine SQLite type based on pandas dtype
                dtype = df[col].dtype
                if 'int' in str(dtype):
                    sql_type = 'INTEGER'
                elif 'float' in str(dtype):
                    sql_type = 'REAL'
                else:
                    sql_type = 'TEXT'
                columns.append(f'"{col}" {sql_type}')
            
            # Create table
            create_table_sql = f'''CREATE TABLE IF NOT EXISTS {table_name} 
                                 ({', '.join(columns)})'''
            conn.execute(create_table_sql)
            
            # Load data in chunks to handle large files
            chunksize = 10000
            for chunk in pd.read_csv(csv_path, chunksize=chunksize):
                chunk.to_sql(table_name, conn, if_exists='append', index=False)
            
            print(f"Successfully loaded {table_name}")
            return True

        except Exception as e:
            print(f"Error processing {csv_path}: {e}")
            return False

    def load_all_files(self, data_directory):
        """Load all recognized CSV files from the directory into SQLite."""
        conn = self.create_connection()
        if not conn:
            return False

        try:
            data_path = Path(data_directory)
            files_processed = 0

            for csv_file, table_name in self.table_mappings.items():
                file_path = data_path / csv_file
                if file_path.exists():
                    print(f"Processing {csv_file}...")
                    if self.create_table_from_csv(file_path, table_name, conn):
                        files_processed += 1

            # Create indexes for better query performance
            self.create_indexes(conn)
            
            conn.commit()
            print(f"Successfully processed {files_processed} files")
            return True

        except Exception as e:
            print(f"Error during data loading: {e}")
            return False

        finally:
            conn.close()

    def create_indexes(self, conn):
        """Create indexes for better query performance."""
        index_definitions = [
            'CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)',
            'CREATE INDEX IF NOT EXISTS idx_officers_name ON officers(name)',
            'CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(node_id_start)',
            'CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(node_id_end)'
        ]
        
        for index_sql in index_definitions:
            try:
                conn.execute(index_sql)
            except sqlite3.Error as e:
                print(f"Error creating index: {e}")


class ICIJDatabaseConnector:
    def __init__(self, db_path='icij_leaks.db'):
        # Create the SQLAlchemy engine
        self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
        
        # Create declarative base
        self.Base = declarative_base()
        
        # Create session factory
        self.Session = sessionmaker(bind=self.engine)
        
        # Initialize metadata
        self.metadata = MetaData()
        
        # Reflect existing tables
        self.metadata.reflect(bind=self.engine)
        
    def get_engine(self):
        """Return the SQLAlchemy engine."""
        return self.engine
    
    def get_session(self):
        """Create and return a new session."""
        return self.Session()
    
    def get_table(self, table_name):
        """Get a table by name from the metadata."""
        return self.metadata.tables.get(table_name)
    
    def list_tables(self):
        """List all available tables in the database."""
        return list(self.metadata.tables.keys())

    def get_table_schema(self, table_name):
        """Get column names and their types for a specific table."""
        table = self.get_table(table_name)
        if table is not None:
            return {column.name: str(column.type) for column in table.columns}
        return {}
    
    def get_full_database_schema(self):
        """Get the schema for all tables in the database."""
        schema = {}
        for table_name in self.list_tables():
            schema[table_name] = self.get_table_schema(table_name)
        return schema

    def get_table_columns(self, table_name):
        """Get column names for a specific table."""
        table = self.get_table(table_name)
        if table is not None:
            return [column.name for column in table.columns]
        return []
    
    def query_table(self, table_name, limit=1):
        """Execute a simple query on a table."""
        table = self.get_table(table_name)
        if table is not None:
            stmt = select(table).limit(limit)
            with self.engine.connect() as connection:
                result = connection.execute(stmt)
                return [dict(row) for row in result]
        return []


class ICIJDatabaseMetadata:
    """Holds detailed documentation about the ICIJ database structure."""
    
    # Comprehensive table documentation
    TABLE_DOCS = {
        'entities': (
            "Contains information about companies, trusts, and other entities mentioned in the leaks. "
            "These are typically offshore entities created in tax havens."
        ),
        
        'officers': (
            "Contains information about people or organizations connected to offshore entities. "
            "Officers can be directors, shareholders, beneficiaries, or have other roles."
        ),
        'intermediaries': (
            "Contains information about professional firms that help create and manage offshore entities. "
            "These are typically law firms, banks, or corporate service providers."
        ),
        'addresses': (
            "Contains physical address information connected to entities, officers, or intermediaries. "
            "Addresses can be shared between multiple parties."
        ),
        'others': (
            "Contains information about miscellaneous parties that don't fit into other categories. "
            "This includes vessel names, legal cases, events, and other related parties mentioned "
            "in the leaks that aren't classified as entities, officers, or intermediaries."
        ),
        'relationships': (
            "Defines connections between different nodes (entities, officers, intermediaries) in the database. "
            "Shows how different parties are connected to each other."
        )
    }
    
    # Detailed column documentation for each table
    COLUMN_DOCS = {
        'entities': {
            'name': "Legal name of the offshore entity",
            'original_name': "Name in original language/character set",
            'former_name': "Previous names of the entity",
            #'jurisdiction': "Country/region where the entity is registered",
            'jurisdiction_description': "Detailed description of the jurisdiction",
            'company_type': "Legal structure of the entity (e.g., corporation, trust)",
            'address': "Primary registered address",
            'internal_id': "Unique identifier within the leak data",
            'incorporation_date': "Date when the entity was created",
            'inactivation_date': "Date when the entity became inactive",
            'struck_off_date': "Date when entity was struck from register",
            'dorm_date': "Date when entity became dormant",
            'status': "Current status of the entity",
            'service_provider': "Firm that provided offshore services",
            'country_codes': '3 letter abbreviations of country names',
            'countries': 'name of country',
            'sourceID': "Identifier for the leak source"
        },
        
        'others': {
            'name': "Name of the miscellaneous party or item",
            'type': "Type of the other party (e.g., vessel, legal case)",
            'incorporation_date': "Date of incorporation or creation if applicable",
            'jurisdiction': "2 letter code of the Jurisdiction associated with the party",
            'jurisdiction-description': 'full name of the jurisdiction',
            'countries': "Countries associated with the party",
            'status': "Current status",
            'internal_id': "Unique identifier within the leak data",
            'address': "Associated address if available",
            'sourceID': "Identifier for the leak source",
            'valid_until': "Date until which the information is valid"
        },
        
        'officers': {
            'name': "Name of the individual or organization",
            'countries': 'full name of the country connected to the officer',
            'country_codes': "3 letter code of the countries connected to the officer",
            'sourceID': "Identifier for the leak source",
            'valid_until': "Date until which the information is valid",
        },
        
        'intermediaries': {
            'name': "Name of the professional firm",
            'internal_id': "Unique identifier within the leak data",
            'address': "Business address",
            'status': "Current status",
            'countries': "Countries where intermediary operates",
            'country_codes': "3 letter abbreviations of the countries where intermediary operates",
            'sourceID': "Identifier for the leak source"
        },
        
        'addresses': {
            'address': "Full address text",
            'name': "Name associated with address",
            'country_codes': "3 letter country codes for the address",
            'countries': "Full country names",
            'sourceID': "Identifier for the leak source",
            'valid_until': "Date until which address is valid",
        },
        
        'relationships': {
            'node_id_start': "Internal ID of the source node",
            'node_id_end': "Internal ID of the target node",
            'rel_type': "Type of relationship (e.g., shareholder, director)",
            'link': "Additional details about the relationship",
            'start_date': "When the relationship began",
            'end_date': "When the relationship ended",
            'sourceID': "Identifier for the leak source",
            'status': "Current status of the relationship"
        }
    }
    
    # Source documentation
    SOURCE_IDS = {
        "PANAMA_PAPERS": "Data from Panama Papers leak (2016)",
        "PARADISE_PAPERS": "Data from Paradise Papers leak (2017)",
        "BAHAMAS_LEAKS": "Data from Bahamas Leaks (2016)",
        "OFFSHORE_LEAKS": "Data from Offshore Leaks (2013)",
        "PANDORA_PAPERS": "Data from Pandora Papers leak (2021)"
    }