Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Alternative Data Modeling Script for HockeyFood Database | |
Supports multiple connection methods and provides fallback options | |
""" | |
import os | |
import json | |
from dotenv import load_dotenv | |
from typing import Dict, List, Any, Optional | |
import logging | |
from datetime import datetime | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# Load environment variables | |
load_dotenv() | |
class DatabaseModelerAlternative: | |
def __init__(self): | |
self.server = os.getenv("DB_SERVER") | |
self.database = os.getenv("DB_DATABASE") | |
self.username = os.getenv("DB_USER") | |
self.password = os.getenv("DB_PASSWORD") | |
self.encrypt = os.getenv("DB_ENCRYPT", "true").lower() == "true" | |
self.trust_cert = os.getenv("DB_TRUST_SERVER_CERTIFICATE", "true").lower() == "true" | |
if not all([self.server, self.database, self.username, self.password]): | |
raise ValueError("Missing required database connection parameters in .env file") | |
self.connection = None | |
self.data_model = {} | |
def try_pymssql_connection(self): | |
"""Try connecting using pymssql (alternative to pyodbc)""" | |
try: | |
import pymssql | |
self.connection = pymssql.connect( | |
server=self.server, | |
user=self.username, | |
password=self.password, | |
database=self.database, | |
timeout=30, | |
as_dict=True | |
) | |
logging.info(f"Successfully connected to database using pymssql: {self.database}") | |
return True | |
except ImportError: | |
logging.warning("pymssql not available. Install with: pip install pymssql") | |
return False | |
except Exception as e: | |
logging.error(f"pymssql connection failed: {str(e)}") | |
return False | |
def try_sqlalchemy_connection(self): | |
"""Try connecting using SQLAlchemy with different drivers""" | |
try: | |
from sqlalchemy import create_engine, text | |
import urllib.parse | |
# URL-encode the password | |
password_encoded = urllib.parse.quote_plus(self.password) | |
# Try different connection strings | |
connection_strings = [ | |
f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+17+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes", | |
f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes", | |
f"mssql+pymssql://{self.username}:{password_encoded}@{self.server}/{self.database}", | |
] | |
for conn_str in connection_strings: | |
try: | |
engine = create_engine(conn_str) | |
connection = engine.connect() | |
# Test the connection | |
result = connection.execute(text("SELECT 1")) | |
self.connection = connection | |
self.engine = engine | |
logging.info(f"Successfully connected using SQLAlchemy: {self.database}") | |
return True | |
except Exception as e: | |
logging.debug(f"SQLAlchemy connection attempt failed: {str(e)}") | |
continue | |
return False | |
except ImportError: | |
logging.warning("SQLAlchemy not available. Install with: pip install sqlalchemy") | |
return False | |
except Exception as e: | |
logging.error(f"SQLAlchemy connection failed: {str(e)}") | |
return False | |
def connect(self): | |
"""Try multiple connection methods""" | |
connection_methods = [ | |
("pymssql", self.try_pymssql_connection), | |
("sqlalchemy", self.try_sqlalchemy_connection), | |
] | |
for method_name, method in connection_methods: | |
logging.info(f"Trying {method_name} connection...") | |
if method(): | |
self.connection_method = method_name | |
return True | |
logging.error("All connection methods failed") | |
return False | |
def disconnect(self): | |
"""Close database connection""" | |
if self.connection: | |
self.connection.close() | |
logging.info("Database connection closed") | |
def execute_query(self, query: str, params: tuple = None): | |
"""Execute a query using the established connection""" | |
try: | |
if self.connection_method == "pymssql": | |
cursor = self.connection.cursor() | |
cursor.execute(query, params or ()) | |
return cursor.fetchall() | |
elif self.connection_method == "sqlalchemy": | |
from sqlalchemy import text | |
result = self.connection.execute(text(query), params or {}) | |
return [dict(row._mapping) for row in result] | |
except Exception as e: | |
logging.error(f"Query execution failed: {str(e)}") | |
return [] | |
def get_all_tables(self) -> List[str]: | |
"""Get list of all tables in the database""" | |
try: | |
query = """ | |
SELECT TABLE_SCHEMA, TABLE_NAME | |
FROM INFORMATION_SCHEMA.TABLES | |
WHERE TABLE_TYPE = 'BASE TABLE' | |
ORDER BY TABLE_NAME | |
""" | |
results = self.execute_query(query) | |
if self.connection_method == "pymssql": | |
# Store schema info for later use | |
self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results} | |
tables = [row['TABLE_NAME'] for row in results] | |
else: | |
self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results} | |
tables = [row['TABLE_NAME'] for row in results] | |
logging.info(f"Found {len(tables)} tables in database") | |
return tables | |
except Exception as e: | |
logging.error(f"Error getting tables: {str(e)}") | |
return [] | |
def get_table_schema(self, table_name: str) -> Dict[str, Any]: | |
"""Get detailed schema information for a table""" | |
try: | |
# Get column information | |
query = """ | |
SELECT | |
COLUMN_NAME, | |
DATA_TYPE, | |
IS_NULLABLE, | |
COLUMN_DEFAULT, | |
CHARACTER_MAXIMUM_LENGTH, | |
NUMERIC_PRECISION, | |
NUMERIC_SCALE, | |
ORDINAL_POSITION | |
FROM INFORMATION_SCHEMA.COLUMNS | |
WHERE TABLE_NAME = %s | |
ORDER BY ORDINAL_POSITION | |
""" | |
if self.connection_method == "sqlalchemy": | |
query = query.replace("%s", ":table_name") | |
results = self.execute_query(query, {"table_name": table_name}) | |
else: | |
results = self.execute_query(query, (table_name,)) | |
columns = [] | |
for row in results: | |
columns.append({ | |
'name': row['COLUMN_NAME'], | |
'data_type': row['DATA_TYPE'], | |
'nullable': row['IS_NULLABLE'] == 'YES', | |
'default': row['COLUMN_DEFAULT'], | |
'max_length': row['CHARACTER_MAXIMUM_LENGTH'], | |
'precision': row['NUMERIC_PRECISION'], | |
'scale': row['NUMERIC_SCALE'], | |
'position': row['ORDINAL_POSITION'] | |
}) | |
# Get primary keys | |
pk_query = """ | |
SELECT COLUMN_NAME | |
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE | |
WHERE TABLE_NAME = %s AND CONSTRAINT_NAME LIKE 'PK_%' | |
""" | |
if self.connection_method == "sqlalchemy": | |
pk_query = pk_query.replace("%s", ":table_name") | |
pk_results = self.execute_query(pk_query, {"table_name": table_name}) | |
else: | |
pk_results = self.execute_query(pk_query, (table_name,)) | |
primary_keys = [row['COLUMN_NAME'] for row in pk_results] | |
# Get foreign keys - Using sys tables for Azure SQL compatibility | |
fk_query = """ | |
SELECT | |
c1.name as COLUMN_NAME, | |
OBJECT_NAME(fk.referenced_object_id) as REFERENCED_TABLE_NAME, | |
c2.name as REFERENCED_COLUMN_NAME | |
FROM sys.foreign_keys fk | |
JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id | |
JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id | |
JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id | |
JOIN sys.tables t ON fk.parent_object_id = t.object_id | |
WHERE t.name = %s | |
""" | |
if self.connection_method == "sqlalchemy": | |
fk_query = fk_query.replace("%s", ":table_name") | |
fk_results = self.execute_query(fk_query, {"table_name": table_name}) | |
else: | |
fk_results = self.execute_query(fk_query, (table_name,)) | |
foreign_keys = [] | |
for row in fk_results: | |
foreign_keys.append({ | |
'column': row['COLUMN_NAME'], | |
'referenced_table': row['REFERENCED_TABLE_NAME'], | |
'referenced_column': row['REFERENCED_COLUMN_NAME'] | |
}) | |
return { | |
'columns': columns, | |
'primary_keys': primary_keys, | |
'foreign_keys': foreign_keys | |
} | |
except Exception as e: | |
logging.error(f"Error getting schema for table {table_name}: {str(e)}") | |
return {} | |
def get_table_data_insights(self, table_name: str, sample_size: int = 100) -> Dict[str, Any]: | |
"""Get data insights for a table including sample data and statistics""" | |
try: | |
# Get the correct schema for the table | |
schema_name = getattr(self, 'table_schemas', {}).get(table_name, 'dbo') | |
full_table_name = f"[{schema_name}].[{table_name}]" | |
# Get row count with proper schema qualification | |
count_query = f"SELECT COUNT(*) as row_count FROM {full_table_name}" | |
count_results = self.execute_query(count_query) | |
row_count = count_results[0]['row_count'] if count_results else 0 | |
# Get sample data with proper schema qualification | |
sample_query = f"SELECT TOP {sample_size} * FROM {full_table_name}" | |
sample_results = self.execute_query(sample_query) | |
# Convert sample data to list of dictionaries | |
sample_data = [] | |
for row in sample_results[:10]: # Limit to first 10 rows | |
sample_data.append({k: str(v) if v is not None else None for k, v in row.items()}) | |
# Get basic column statistics (simplified) | |
data_analysis = {} | |
if sample_results: | |
columns = list(sample_results[0].keys()) | |
for column in columns: | |
try: | |
# Basic analysis for each column with proper schema qualification | |
col_query = f""" | |
SELECT | |
COUNT(*) as total_count, | |
COUNT([{column}]) as non_null_count, | |
COUNT(DISTINCT [{column}]) as distinct_count | |
FROM {full_table_name} | |
""" | |
col_results = self.execute_query(col_query) | |
if col_results: | |
result = col_results[0] | |
data_analysis[column] = { | |
'total_count': result['total_count'], | |
'non_null_count': result['non_null_count'], | |
'distinct_count': result['distinct_count'], | |
'null_percentage': round((result['total_count'] - result['non_null_count']) / result['total_count'] * 100, 2) if result['total_count'] > 0 else 0 | |
} | |
except Exception as e: | |
logging.warning(f"Could not analyze column {column} in table {table_name}: {str(e)}") | |
data_analysis[column] = {'error': str(e)} | |
return { | |
'row_count': row_count, | |
'sample_data': sample_data, | |
'data_analysis': data_analysis | |
} | |
except Exception as e: | |
logging.error(f"Error getting data insights for table {table_name}: {str(e)}") | |
return {} | |
def analyze_relationships(self) -> Dict[str, Any]: | |
"""Analyze relationships between tables""" | |
try: | |
# Simplified query for Azure SQL - use sys tables instead of INFORMATION_SCHEMA | |
query = """ | |
SELECT | |
OBJECT_NAME(fk.parent_object_id) as source_table, | |
c1.name as source_column, | |
OBJECT_NAME(fk.referenced_object_id) as target_table, | |
c2.name as target_column, | |
fk.name as constraint_name | |
FROM sys.foreign_keys fk | |
JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id | |
JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id | |
JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id | |
ORDER BY source_table | |
""" | |
results = self.execute_query(query) | |
relationships = [] | |
for row in results: | |
relationships.append({ | |
'source_table': row['source_table'], | |
'source_column': row['source_column'], | |
'target_table': row['target_table'], | |
'target_column': row['target_column'], | |
'constraint_name': row['constraint_name'] | |
}) | |
return {'relationships': relationships} | |
except Exception as e: | |
logging.error(f"Error analyzing relationships: {str(e)}") | |
return {} | |
def build_comprehensive_data_model(self, focus_tables: List[str] = None): | |
"""Build comprehensive data model for the entire database""" | |
if not self.connect(): | |
return | |
try: | |
tables = self.get_all_tables() | |
if focus_tables: | |
# Filter to focus tables if they exist | |
available_focus_tables = [t for t in focus_tables if t in tables] | |
if available_focus_tables: | |
tables = available_focus_tables | |
logging.info(f"Focusing on tables: {available_focus_tables}") | |
else: | |
logging.warning(f"Focus tables {focus_tables} not found. Analyzing all tables.") | |
logging.info(f"Available tables: {tables}") | |
self.data_model = { | |
'database_info': { | |
'name': self.database, | |
'server': self.server, | |
'connection_method': self.connection_method, | |
'analysis_date': datetime.now().isoformat(), | |
'total_tables': len(tables), | |
'analyzed_tables': tables | |
}, | |
'tables': {}, | |
'relationships': self.analyze_relationships() | |
} | |
# Analyze each table | |
for table_name in tables: | |
logging.info(f"Analyzing table: {table_name}") | |
self.data_model['tables'][table_name] = { | |
'schema': self.get_table_schema(table_name), | |
'data_insights': self.get_table_data_insights(table_name) | |
} | |
logging.info("Data model analysis completed successfully") | |
except Exception as e: | |
logging.error(f"Error building data model: {str(e)}") | |
finally: | |
self.disconnect() | |
def export_data_model(self, filename: str = None): | |
"""Export data model to JSON file""" | |
if not filename: | |
filename = f"hockey_db_data_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
try: | |
with open(filename, 'w', encoding='utf-8') as f: | |
json.dump(self.data_model, f, indent=2, ensure_ascii=False) | |
logging.info(f"Data model exported to: {filename}") | |
return filename | |
except Exception as e: | |
logging.error(f"Error exporting data model: {str(e)}") | |
return None | |
def generate_summary_report(self) -> str: | |
"""Generate a human-readable summary report""" | |
if not self.data_model: | |
return "No data model available. Run build_comprehensive_data_model() first." | |
report = [] | |
report.append("=" * 60) | |
report.append("HOCKEY DATABASE DATA MODEL SUMMARY") | |
report.append("=" * 60) | |
report.append(f"Database: {self.data_model['database_info']['name']}") | |
report.append(f"Server: {self.data_model['database_info']['server']}") | |
report.append(f"Connection Method: {self.data_model['database_info']['connection_method']}") | |
report.append(f"Analysis Date: {self.data_model['database_info']['analysis_date']}") | |
report.append(f"Total Tables Analyzed: {self.data_model['database_info']['total_tables']}") | |
report.append(f"Tables: {', '.join(self.data_model['database_info']['analyzed_tables'])}") | |
report.append("") | |
# Table summaries | |
report.append("TABLE SUMMARIES:") | |
report.append("-" * 40) | |
for table_name, table_data in self.data_model['tables'].items(): | |
report.append(f"\nπ TABLE: {table_name}") | |
schema = table_data.get('schema', {}) | |
insights = table_data.get('data_insights', {}) | |
if schema.get('columns'): | |
report.append(f" Columns: {len(schema['columns'])}") | |
report.append(f" Primary Keys: {', '.join(schema.get('primary_keys', []))}") | |
if schema.get('foreign_keys'): | |
report.append(" Foreign Keys:") | |
for fk in schema['foreign_keys']: | |
report.append(f" - {fk['column']} β {fk['referenced_table']}.{fk['referenced_column']}") | |
if insights.get('row_count') is not None: | |
report.append(f" Total Rows: {insights['row_count']:,}") | |
# Column details | |
if schema.get('columns'): | |
report.append(" Column Details:") | |
for col in schema['columns'][:5]: # Show first 5 columns | |
nullable = "NULL" if col['nullable'] else "NOT NULL" | |
report.append(f" - {col['name']}: {col['data_type']} {nullable}") | |
if len(schema['columns']) > 5: | |
report.append(f" ... and {len(schema['columns']) - 5} more columns") | |
# Sample data | |
if insights.get('sample_data'): | |
report.append(" Sample Data (first 3 rows):") | |
for i, row in enumerate(insights['sample_data'][:3]): | |
report.append(f" Row {i+1}: {row}") | |
# Relationships | |
relationships = self.data_model.get('relationships', {}).get('relationships', []) | |
if relationships: | |
report.append(f"\nπ DATABASE RELATIONSHIPS ({len(relationships)} total):") | |
report.append("-" * 40) | |
for rel in relationships: | |
report.append(f" {rel['source_table']}.{rel['source_column']} β {rel['target_table']}.{rel['target_column']}") | |
return "\n".join(report) | |
def install_dependencies(): | |
"""Install required dependencies""" | |
import subprocess | |
import sys | |
packages = ['pymssql', 'sqlalchemy'] | |
for package in packages: | |
try: | |
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package]) | |
print(f"Successfully installed {package}") | |
except subprocess.CalledProcessError: | |
print(f"Failed to install {package}") | |
def main(): | |
"""Main function to run the data modeling analysis""" | |
try: | |
# Try to install dependencies | |
install_dependencies() | |
# Focus on specific tables mentioned in HockeyFood_DB.md | |
focus_tables = ['Exercise', 'Multimedia', 'Serie'] | |
modeler = DatabaseModelerAlternative() | |
logging.info("Starting comprehensive database analysis...") | |
# Build the data model | |
modeler.build_comprehensive_data_model(focus_tables=focus_tables) | |
# Export to JSON | |
json_file = modeler.export_data_model() | |
# Generate and save summary report | |
summary = modeler.generate_summary_report() | |
summary_file = f"hockey_db_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" | |
with open(summary_file, 'w', encoding='utf-8') as f: | |
f.write(summary) | |
print(summary) | |
print(f"\nπ Files generated:") | |
print(f" - JSON Data Model: {json_file}") | |
print(f" - Summary Report: {summary_file}") | |
except Exception as e: | |
logging.error(f"Main execution error: {str(e)}") | |
print(f"Error: {str(e)}") | |
if __name__ == "__main__": | |
main() |