Spaces:
Sleeping
Sleeping
File size: 12,422 Bytes
2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 8507fc0 2854813 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
"""icij_utils.py
Building an SQL agent over the ICIJ financial data leaks files.
:author: Didier Guillevic
:date: 2025-01-12
"""
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
import pandas as pd
import sqlite3
import os
from pathlib import Path
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
class ICIJDataLoader:
def __init__(self, db_path='icij_data.db'):
"""Initialize the data loader with database path."""
self.db_path = db_path
self.table_mappings = {
'nodes-addresses.csv': 'addresses',
'nodes-entities.csv': 'entities',
'nodes-intermediaries.csv': 'intermediaries',
'nodes-officers.csv': 'officers',
'nodes-others.csv': 'others',
'relationships.csv': 'relationships'
}
def create_connection(self):
"""Create a database connection."""
try:
conn = sqlite3.connect(self.db_path)
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
return None
def create_table_from_csv(self, csv_path, table_name, conn):
"""Create a table based on CSV structure and load data."""
try:
# Read the first few rows to get column names and types
df = pd.read_csv(csv_path, nrows=5)
# Create table with appropriate columns
columns = []
for col in df.columns:
# Determine SQLite type based on pandas dtype
dtype = df[col].dtype
if 'int' in str(dtype):
sql_type = 'INTEGER'
elif 'float' in str(dtype):
sql_type = 'REAL'
else:
sql_type = 'TEXT'
columns.append(f'"{col}" {sql_type}')
# Create table
create_table_sql = f'''CREATE TABLE IF NOT EXISTS {table_name}
({', '.join(columns)})'''
conn.execute(create_table_sql)
# Load data in chunks to handle large files
chunksize = 10000
for chunk in pd.read_csv(csv_path, chunksize=chunksize):
chunk.to_sql(table_name, conn, if_exists='append', index=False)
print(f"Successfully loaded {table_name}")
return True
except Exception as e:
print(f"Error processing {csv_path}: {e}")
return False
def load_all_files(self, data_directory):
"""Load all recognized CSV files from the directory into SQLite."""
conn = self.create_connection()
if not conn:
return False
try:
data_path = Path(data_directory)
files_processed = 0
for csv_file, table_name in self.table_mappings.items():
file_path = data_path / csv_file
if file_path.exists():
print(f"Processing {csv_file}...")
if self.create_table_from_csv(file_path, table_name, conn):
files_processed += 1
# Create indexes for better query performance
self.create_indexes(conn)
conn.commit()
print(f"Successfully processed {files_processed} files")
return True
except Exception as e:
print(f"Error during data loading: {e}")
return False
finally:
conn.close()
def create_indexes(self, conn):
"""Create indexes for better query performance."""
index_definitions = [
'CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)',
'CREATE INDEX IF NOT EXISTS idx_officers_name ON officers(name)',
'CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(node_id_start)',
'CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(node_id_end)'
]
for index_sql in index_definitions:
try:
conn.execute(index_sql)
except sqlite3.Error as e:
print(f"Error creating index: {e}")
class ICIJDatabaseConnector:
def __init__(self, db_path='icij_leaks.db'):
# Create the SQLAlchemy engine
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
# Create declarative base
self.Base = declarative_base()
# Create session factory
self.Session = sessionmaker(bind=self.engine)
# Initialize metadata
self.metadata = MetaData()
# Reflect existing tables
self.metadata.reflect(bind=self.engine)
def get_engine(self):
"""Return the SQLAlchemy engine."""
return self.engine
def get_session(self):
"""Create and return a new session."""
return self.Session()
def get_table(self, table_name):
"""Get a table by name from the metadata."""
return self.metadata.tables.get(table_name)
def list_tables(self):
"""List all available tables in the database."""
return list(self.metadata.tables.keys())
def get_table_schema(self, table_name):
"""Get column names and their types for a specific table."""
table = self.get_table(table_name)
if table is not None:
return {column.name: str(column.type) for column in table.columns}
return {}
def get_full_database_schema(self):
"""Get the schema for all tables in the database."""
schema = {}
for table_name in self.list_tables():
schema[table_name] = self.get_table_schema(table_name)
return schema
def get_table_columns(self, table_name):
"""Get column names for a specific table."""
table = self.get_table(table_name)
if table is not None:
return [column.name for column in table.columns]
return []
def query_table(self, table_name, limit=1):
"""Execute a simple query on a table."""
table = self.get_table(table_name)
if table is not None:
stmt = select(table).limit(limit)
with self.engine.connect() as connection:
result = connection.execute(stmt)
return [dict(row) for row in result]
return []
class ICIJDatabaseMetadata:
"""Holds detailed documentation about the ICIJ database structure."""
# Comprehensive table documentation
TABLE_DOCS = {
'entities': (
"Contains information about companies, trusts, and other entities mentioned in the leaks. "
"These are typically offshore entities created in tax havens."
),
'officers': (
"Contains information about people or organizations connected to offshore entities. "
"Officers can be directors, shareholders, beneficiaries, or have other roles."
),
'intermediaries': (
"Contains information about professional firms that help create and manage offshore entities. "
"These are typically law firms, banks, or corporate service providers."
),
'addresses': (
"Contains physical address information connected to entities, officers, or intermediaries. "
"Addresses can be shared between multiple parties."
),
'others': (
"Contains information about miscellaneous parties that don't fit into other categories. "
"This includes vessel names, legal cases, events, and other related parties mentioned "
"in the leaks that aren't classified as entities, officers, or intermediaries."
),
'relationships': (
"Defines connections between different nodes (entities, officers, intermediaries) in the database. "
"Shows how different parties are connected to each other."
)
}
# Detailed column documentation for each table
COLUMN_DOCS = {
'entities': {
'name': "Legal name of the offshore entity",
'original_name': "Name in original language/character set",
'former_name': "Previous names of the entity",
#'jurisdiction': "Country/region where the entity is registered",
'jurisdiction_description': "Detailed description of the jurisdiction",
'company_type': "Legal structure of the entity (e.g., corporation, trust)",
'address': "Primary registered address",
'internal_id': "Unique identifier within the leak data",
'incorporation_date': "Date when the entity was created",
'inactivation_date': "Date when the entity became inactive",
'struck_off_date': "Date when entity was struck from register",
'dorm_date': "Date when entity became dormant",
'status': "Current status of the entity",
'service_provider': "Firm that provided offshore services",
'country_codes': '3 letter abbreviations of country names',
'countries': 'name of country',
'sourceID': "Identifier for the leak source"
},
'others': {
'name': "Name of the miscellaneous party or item",
'type': "Type of the other party (e.g., vessel, legal case)",
'incorporation_date': "Date of incorporation or creation if applicable",
'jurisdiction': "2 letter code of the Jurisdiction associated with the party",
'jurisdiction-description': 'full name of the jurisdiction',
'countries': "Countries associated with the party",
'status': "Current status",
'internal_id': "Unique identifier within the leak data",
'address': "Associated address if available",
'sourceID': "Identifier for the leak source",
'valid_until': "Date until which the information is valid"
},
'officers': {
'name': "Name of the individual or organization",
'countries': 'full name of the country connected to the officer',
'country_codes': "3 letter code of the countries connected to the officer",
'sourceID': "Identifier for the leak source",
'valid_until': "Date until which the information is valid",
},
'intermediaries': {
'name': "Name of the professional firm",
'internal_id': "Unique identifier within the leak data",
'address': "Business address",
'status': "Current status",
'countries': "Countries where intermediary operates",
'country_codes': "3 letter abbreviations of the countries where intermediary operates",
'sourceID': "Identifier for the leak source"
},
'addresses': {
'address': "Full address text",
'name': "Name associated with address",
'country_codes': "3 letter country codes for the address",
'countries': "Full country names",
'sourceID': "Identifier for the leak source",
'valid_until': "Date until which address is valid",
},
'relationships': {
'node_id_start': "Internal ID of the source node",
'node_id_end': "Internal ID of the target node",
'rel_type': "Type of relationship (e.g., shareholder, director)",
'link': "Additional details about the relationship",
'start_date': "When the relationship began",
'end_date': "When the relationship ended",
'sourceID': "Identifier for the leak source",
'status': "Current status of the relationship"
}
}
# Source documentation
SOURCE_IDS = {
"PANAMA_PAPERS": "Data from Panama Papers leak (2016)",
"PARADISE_PAPERS": "Data from Paradise Papers leak (2017)",
"BAHAMAS_LEAKS": "Data from Bahamas Leaks (2016)",
"OFFSHORE_LEAKS": "Data from Offshore Leaks (2013)",
"PANDORA_PAPERS": "Data from Pandora Papers leak (2021)"
}
|