"""icij_utils.py Building an SQL agent over the ICIJ financial data leaks files. :author: Didier Guillevic :date: 2025-01-12 """ import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) import pandas as pd import sqlite3 import os from pathlib import Path from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker class ICIJDataLoader: def __init__(self, db_path='icij_data.db'): """Initialize the data loader with database path.""" self.db_path = db_path self.table_mappings = { 'nodes-addresses.csv': 'addresses', 'nodes-entities.csv': 'entities', 'nodes-intermediaries.csv': 'intermediaries', 'nodes-officers.csv': 'officers', 'nodes-others.csv': 'others', 'relationships.csv': 'relationships' } def create_connection(self): """Create a database connection.""" try: conn = sqlite3.connect(self.db_path) return conn except sqlite3.Error as e: print(f"Error connecting to database: {e}") return None def create_table_from_csv(self, csv_path, table_name, conn): """Create a table based on CSV structure and load data.""" try: # Read the first few rows to get column names and types df = pd.read_csv(csv_path, nrows=5) # Create table with appropriate columns columns = [] for col in df.columns: # Determine SQLite type based on pandas dtype dtype = df[col].dtype if 'int' in str(dtype): sql_type = 'INTEGER' elif 'float' in str(dtype): sql_type = 'REAL' else: sql_type = 'TEXT' columns.append(f'"{col}" {sql_type}') # Create table create_table_sql = f'''CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})''' conn.execute(create_table_sql) # Load data in chunks to handle large files chunksize = 10000 for chunk in pd.read_csv(csv_path, chunksize=chunksize): chunk.to_sql(table_name, conn, if_exists='append', index=False) print(f"Successfully loaded {table_name}") return True except Exception as e: print(f"Error processing {csv_path}: {e}") return False def load_all_files(self, data_directory): """Load all recognized CSV files from the directory into SQLite.""" conn = self.create_connection() if not conn: return False try: data_path = Path(data_directory) files_processed = 0 for csv_file, table_name in self.table_mappings.items(): file_path = data_path / csv_file if file_path.exists(): print(f"Processing {csv_file}...") if self.create_table_from_csv(file_path, table_name, conn): files_processed += 1 # Create indexes for better query performance self.create_indexes(conn) conn.commit() print(f"Successfully processed {files_processed} files") return True except Exception as e: print(f"Error during data loading: {e}") return False finally: conn.close() def create_indexes(self, conn): """Create indexes for better query performance.""" index_definitions = [ 'CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)', 'CREATE INDEX IF NOT EXISTS idx_officers_name ON officers(name)', 'CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(node_id_start)', 'CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(node_id_end)' ] for index_sql in index_definitions: try: conn.execute(index_sql) except sqlite3.Error as e: print(f"Error creating index: {e}") class ICIJDatabaseConnector: def __init__(self, db_path='icij_leaks.db'): # Create the SQLAlchemy engine self.engine = create_engine(f'sqlite:///{db_path}', echo=False) # Create declarative base self.Base = declarative_base() # Create session factory self.Session = sessionmaker(bind=self.engine) # Initialize metadata self.metadata = MetaData() # Reflect existing tables self.metadata.reflect(bind=self.engine) def get_engine(self): """Return the SQLAlchemy engine.""" return self.engine def get_session(self): """Create and return a new session.""" return self.Session() def get_table(self, table_name): """Get a table by name from the metadata.""" return self.metadata.tables.get(table_name) def list_tables(self): """List all available tables in the database.""" return list(self.metadata.tables.keys()) def get_table_schema(self, table_name): """Get column names and their types for a specific table.""" table = self.get_table(table_name) if table is not None: return {column.name: str(column.type) for column in table.columns} return {} def get_full_database_schema(self): """Get the schema for all tables in the database.""" schema = {} for table_name in self.list_tables(): schema[table_name] = self.get_table_schema(table_name) return schema def get_table_columns(self, table_name): """Get column names for a specific table.""" table = self.get_table(table_name) if table is not None: return [column.name for column in table.columns] return [] def query_table(self, table_name, limit=1): """Execute a simple query on a table.""" table = self.get_table(table_name) if table is not None: stmt = select(table).limit(limit) with self.engine.connect() as connection: result = connection.execute(stmt) return [dict(row) for row in result] return [] class ICIJDatabaseMetadata: """Holds detailed documentation about the ICIJ database structure.""" # Comprehensive table documentation TABLE_DOCS = { 'entities': ( "Contains information about companies, trusts, and other entities mentioned in the leaks. " "These are typically offshore entities created in tax havens." ), 'officers': ( "Contains information about people or organizations connected to offshore entities. " "Officers can be directors, shareholders, beneficiaries, or have other roles." ), 'intermediaries': ( "Contains information about professional firms that help create and manage offshore entities. " "These are typically law firms, banks, or corporate service providers." ), 'addresses': ( "Contains physical address information connected to entities, officers, or intermediaries. " "Addresses can be shared between multiple parties." ), 'others': ( "Contains information about miscellaneous parties that don't fit into other categories. " "This includes vessel names, legal cases, events, and other related parties mentioned " "in the leaks that aren't classified as entities, officers, or intermediaries." ), 'relationships': ( "Defines connections between different nodes (entities, officers, intermediaries) in the database. " "Shows how different parties are connected to each other." ) } # Detailed column documentation for each table COLUMN_DOCS = { 'entities': { 'name': "Legal name of the offshore entity", 'original_name': "Name in original language/character set", 'former_name': "Previous names of the entity", #'jurisdiction': "Country/region where the entity is registered", 'jurisdiction_description': "Detailed description of the jurisdiction", 'company_type': "Legal structure of the entity (e.g., corporation, trust)", 'address': "Primary registered address", 'internal_id': "Unique identifier within the leak data", 'incorporation_date': "Date when the entity was created", 'inactivation_date': "Date when the entity became inactive", 'struck_off_date': "Date when entity was struck from register", 'dorm_date': "Date when entity became dormant", 'status': "Current status of the entity", 'service_provider': "Firm that provided offshore services", 'country_codes': '3 letter abbreviations of country names', 'countries': 'name of country', 'sourceID': "Identifier for the leak source" }, 'others': { 'name': "Name of the miscellaneous party or item", 'type': "Type of the other party (e.g., vessel, legal case)", 'incorporation_date': "Date of incorporation or creation if applicable", 'jurisdiction': "2 letter code of the Jurisdiction associated with the party", 'jurisdiction-description': 'full name of the jurisdiction', 'countries': "Countries associated with the party", 'status': "Current status", 'internal_id': "Unique identifier within the leak data", 'address': "Associated address if available", 'sourceID': "Identifier for the leak source", 'valid_until': "Date until which the information is valid" }, 'officers': { 'name': "Name of the individual or organization", 'countries': 'full name of the country connected to the officer', 'country_codes': "3 letter code of the countries connected to the officer", 'sourceID': "Identifier for the leak source", 'valid_until': "Date until which the information is valid", }, 'intermediaries': { 'name': "Name of the professional firm", 'internal_id': "Unique identifier within the leak data", 'address': "Business address", 'status': "Current status", 'countries': "Countries where intermediary operates", 'country_codes': "3 letter abbreviations of the countries where intermediary operates", 'sourceID': "Identifier for the leak source" }, 'addresses': { 'address': "Full address text", 'name': "Name associated with address", 'country_codes': "3 letter country codes for the address", 'countries': "Full country names", 'sourceID': "Identifier for the leak source", 'valid_until': "Date until which address is valid", }, 'relationships': { 'node_id_start': "Internal ID of the source node", 'node_id_end': "Internal ID of the target node", 'rel_type': "Type of relationship (e.g., shareholder, director)", 'link': "Additional details about the relationship", 'start_date': "When the relationship began", 'end_date': "When the relationship ended", 'sourceID': "Identifier for the leak source", 'status': "Current status of the relationship" } } # Source documentation SOURCE_IDS = { "PANAMA_PAPERS": "Data from Panama Papers leak (2016)", "PARADISE_PAPERS": "Data from Paradise Papers leak (2017)", "BAHAMAS_LEAKS": "Data from Bahamas Leaks (2016)", "OFFSHORE_LEAKS": "Data from Offshore Leaks (2013)", "PANDORA_PAPERS": "Data from Pandora Papers leak (2021)" }