hockey-mind-db / data_modeling_hockeyfood.py
talhasideline's picture
Upload 9 files
e6c15c5 verified
#!/usr/bin/env python3
"""
Alternative Data Modeling Script for HockeyFood Database
Supports multiple connection methods and provides fallback options
"""
import os
import json
from dotenv import load_dotenv
from typing import Dict, List, Any, Optional
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Load environment variables
load_dotenv()
class DatabaseModelerAlternative:
def __init__(self):
self.server = os.getenv("DB_SERVER")
self.database = os.getenv("DB_DATABASE")
self.username = os.getenv("DB_USER")
self.password = os.getenv("DB_PASSWORD")
self.encrypt = os.getenv("DB_ENCRYPT", "true").lower() == "true"
self.trust_cert = os.getenv("DB_TRUST_SERVER_CERTIFICATE", "true").lower() == "true"
if not all([self.server, self.database, self.username, self.password]):
raise ValueError("Missing required database connection parameters in .env file")
self.connection = None
self.data_model = {}
def try_pymssql_connection(self):
"""Try connecting using pymssql (alternative to pyodbc)"""
try:
import pymssql
self.connection = pymssql.connect(
server=self.server,
user=self.username,
password=self.password,
database=self.database,
timeout=30,
as_dict=True
)
logging.info(f"Successfully connected to database using pymssql: {self.database}")
return True
except ImportError:
logging.warning("pymssql not available. Install with: pip install pymssql")
return False
except Exception as e:
logging.error(f"pymssql connection failed: {str(e)}")
return False
def try_sqlalchemy_connection(self):
"""Try connecting using SQLAlchemy with different drivers"""
try:
from sqlalchemy import create_engine, text
import urllib.parse
# URL-encode the password
password_encoded = urllib.parse.quote_plus(self.password)
# Try different connection strings
connection_strings = [
f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+17+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes",
f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes",
f"mssql+pymssql://{self.username}:{password_encoded}@{self.server}/{self.database}",
]
for conn_str in connection_strings:
try:
engine = create_engine(conn_str)
connection = engine.connect()
# Test the connection
result = connection.execute(text("SELECT 1"))
self.connection = connection
self.engine = engine
logging.info(f"Successfully connected using SQLAlchemy: {self.database}")
return True
except Exception as e:
logging.debug(f"SQLAlchemy connection attempt failed: {str(e)}")
continue
return False
except ImportError:
logging.warning("SQLAlchemy not available. Install with: pip install sqlalchemy")
return False
except Exception as e:
logging.error(f"SQLAlchemy connection failed: {str(e)}")
return False
def connect(self):
"""Try multiple connection methods"""
connection_methods = [
("pymssql", self.try_pymssql_connection),
("sqlalchemy", self.try_sqlalchemy_connection),
]
for method_name, method in connection_methods:
logging.info(f"Trying {method_name} connection...")
if method():
self.connection_method = method_name
return True
logging.error("All connection methods failed")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
logging.info("Database connection closed")
def execute_query(self, query: str, params: tuple = None):
"""Execute a query using the established connection"""
try:
if self.connection_method == "pymssql":
cursor = self.connection.cursor()
cursor.execute(query, params or ())
return cursor.fetchall()
elif self.connection_method == "sqlalchemy":
from sqlalchemy import text
result = self.connection.execute(text(query), params or {})
return [dict(row._mapping) for row in result]
except Exception as e:
logging.error(f"Query execution failed: {str(e)}")
return []
def get_all_tables(self) -> List[str]:
"""Get list of all tables in the database"""
try:
query = """
SELECT TABLE_SCHEMA, TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
"""
results = self.execute_query(query)
if self.connection_method == "pymssql":
# Store schema info for later use
self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results}
tables = [row['TABLE_NAME'] for row in results]
else:
self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results}
tables = [row['TABLE_NAME'] for row in results]
logging.info(f"Found {len(tables)} tables in database")
return tables
except Exception as e:
logging.error(f"Error getting tables: {str(e)}")
return []
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""Get detailed schema information for a table"""
try:
# Get column information
query = """
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
COLUMN_DEFAULT,
CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION,
NUMERIC_SCALE,
ORDINAL_POSITION
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
"""
if self.connection_method == "sqlalchemy":
query = query.replace("%s", ":table_name")
results = self.execute_query(query, {"table_name": table_name})
else:
results = self.execute_query(query, (table_name,))
columns = []
for row in results:
columns.append({
'name': row['COLUMN_NAME'],
'data_type': row['DATA_TYPE'],
'nullable': row['IS_NULLABLE'] == 'YES',
'default': row['COLUMN_DEFAULT'],
'max_length': row['CHARACTER_MAXIMUM_LENGTH'],
'precision': row['NUMERIC_PRECISION'],
'scale': row['NUMERIC_SCALE'],
'position': row['ORDINAL_POSITION']
})
# Get primary keys
pk_query = """
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_NAME = %s AND CONSTRAINT_NAME LIKE 'PK_%'
"""
if self.connection_method == "sqlalchemy":
pk_query = pk_query.replace("%s", ":table_name")
pk_results = self.execute_query(pk_query, {"table_name": table_name})
else:
pk_results = self.execute_query(pk_query, (table_name,))
primary_keys = [row['COLUMN_NAME'] for row in pk_results]
# Get foreign keys - Using sys tables for Azure SQL compatibility
fk_query = """
SELECT
c1.name as COLUMN_NAME,
OBJECT_NAME(fk.referenced_object_id) as REFERENCED_TABLE_NAME,
c2.name as REFERENCED_COLUMN_NAME
FROM sys.foreign_keys fk
JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id
JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id
JOIN sys.tables t ON fk.parent_object_id = t.object_id
WHERE t.name = %s
"""
if self.connection_method == "sqlalchemy":
fk_query = fk_query.replace("%s", ":table_name")
fk_results = self.execute_query(fk_query, {"table_name": table_name})
else:
fk_results = self.execute_query(fk_query, (table_name,))
foreign_keys = []
for row in fk_results:
foreign_keys.append({
'column': row['COLUMN_NAME'],
'referenced_table': row['REFERENCED_TABLE_NAME'],
'referenced_column': row['REFERENCED_COLUMN_NAME']
})
return {
'columns': columns,
'primary_keys': primary_keys,
'foreign_keys': foreign_keys
}
except Exception as e:
logging.error(f"Error getting schema for table {table_name}: {str(e)}")
return {}
def get_table_data_insights(self, table_name: str, sample_size: int = 100) -> Dict[str, Any]:
"""Get data insights for a table including sample data and statistics"""
try:
# Get the correct schema for the table
schema_name = getattr(self, 'table_schemas', {}).get(table_name, 'dbo')
full_table_name = f"[{schema_name}].[{table_name}]"
# Get row count with proper schema qualification
count_query = f"SELECT COUNT(*) as row_count FROM {full_table_name}"
count_results = self.execute_query(count_query)
row_count = count_results[0]['row_count'] if count_results else 0
# Get sample data with proper schema qualification
sample_query = f"SELECT TOP {sample_size} * FROM {full_table_name}"
sample_results = self.execute_query(sample_query)
# Convert sample data to list of dictionaries
sample_data = []
for row in sample_results[:10]: # Limit to first 10 rows
sample_data.append({k: str(v) if v is not None else None for k, v in row.items()})
# Get basic column statistics (simplified)
data_analysis = {}
if sample_results:
columns = list(sample_results[0].keys())
for column in columns:
try:
# Basic analysis for each column with proper schema qualification
col_query = f"""
SELECT
COUNT(*) as total_count,
COUNT([{column}]) as non_null_count,
COUNT(DISTINCT [{column}]) as distinct_count
FROM {full_table_name}
"""
col_results = self.execute_query(col_query)
if col_results:
result = col_results[0]
data_analysis[column] = {
'total_count': result['total_count'],
'non_null_count': result['non_null_count'],
'distinct_count': result['distinct_count'],
'null_percentage': round((result['total_count'] - result['non_null_count']) / result['total_count'] * 100, 2) if result['total_count'] > 0 else 0
}
except Exception as e:
logging.warning(f"Could not analyze column {column} in table {table_name}: {str(e)}")
data_analysis[column] = {'error': str(e)}
return {
'row_count': row_count,
'sample_data': sample_data,
'data_analysis': data_analysis
}
except Exception as e:
logging.error(f"Error getting data insights for table {table_name}: {str(e)}")
return {}
def analyze_relationships(self) -> Dict[str, Any]:
"""Analyze relationships between tables"""
try:
# Simplified query for Azure SQL - use sys tables instead of INFORMATION_SCHEMA
query = """
SELECT
OBJECT_NAME(fk.parent_object_id) as source_table,
c1.name as source_column,
OBJECT_NAME(fk.referenced_object_id) as target_table,
c2.name as target_column,
fk.name as constraint_name
FROM sys.foreign_keys fk
JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id
JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id
ORDER BY source_table
"""
results = self.execute_query(query)
relationships = []
for row in results:
relationships.append({
'source_table': row['source_table'],
'source_column': row['source_column'],
'target_table': row['target_table'],
'target_column': row['target_column'],
'constraint_name': row['constraint_name']
})
return {'relationships': relationships}
except Exception as e:
logging.error(f"Error analyzing relationships: {str(e)}")
return {}
def build_comprehensive_data_model(self, focus_tables: List[str] = None):
"""Build comprehensive data model for the entire database"""
if not self.connect():
return
try:
tables = self.get_all_tables()
if focus_tables:
# Filter to focus tables if they exist
available_focus_tables = [t for t in focus_tables if t in tables]
if available_focus_tables:
tables = available_focus_tables
logging.info(f"Focusing on tables: {available_focus_tables}")
else:
logging.warning(f"Focus tables {focus_tables} not found. Analyzing all tables.")
logging.info(f"Available tables: {tables}")
self.data_model = {
'database_info': {
'name': self.database,
'server': self.server,
'connection_method': self.connection_method,
'analysis_date': datetime.now().isoformat(),
'total_tables': len(tables),
'analyzed_tables': tables
},
'tables': {},
'relationships': self.analyze_relationships()
}
# Analyze each table
for table_name in tables:
logging.info(f"Analyzing table: {table_name}")
self.data_model['tables'][table_name] = {
'schema': self.get_table_schema(table_name),
'data_insights': self.get_table_data_insights(table_name)
}
logging.info("Data model analysis completed successfully")
except Exception as e:
logging.error(f"Error building data model: {str(e)}")
finally:
self.disconnect()
def export_data_model(self, filename: str = None):
"""Export data model to JSON file"""
if not filename:
filename = f"hockey_db_data_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.data_model, f, indent=2, ensure_ascii=False)
logging.info(f"Data model exported to: {filename}")
return filename
except Exception as e:
logging.error(f"Error exporting data model: {str(e)}")
return None
def generate_summary_report(self) -> str:
"""Generate a human-readable summary report"""
if not self.data_model:
return "No data model available. Run build_comprehensive_data_model() first."
report = []
report.append("=" * 60)
report.append("HOCKEY DATABASE DATA MODEL SUMMARY")
report.append("=" * 60)
report.append(f"Database: {self.data_model['database_info']['name']}")
report.append(f"Server: {self.data_model['database_info']['server']}")
report.append(f"Connection Method: {self.data_model['database_info']['connection_method']}")
report.append(f"Analysis Date: {self.data_model['database_info']['analysis_date']}")
report.append(f"Total Tables Analyzed: {self.data_model['database_info']['total_tables']}")
report.append(f"Tables: {', '.join(self.data_model['database_info']['analyzed_tables'])}")
report.append("")
# Table summaries
report.append("TABLE SUMMARIES:")
report.append("-" * 40)
for table_name, table_data in self.data_model['tables'].items():
report.append(f"\nπŸ“Š TABLE: {table_name}")
schema = table_data.get('schema', {})
insights = table_data.get('data_insights', {})
if schema.get('columns'):
report.append(f" Columns: {len(schema['columns'])}")
report.append(f" Primary Keys: {', '.join(schema.get('primary_keys', []))}")
if schema.get('foreign_keys'):
report.append(" Foreign Keys:")
for fk in schema['foreign_keys']:
report.append(f" - {fk['column']} β†’ {fk['referenced_table']}.{fk['referenced_column']}")
if insights.get('row_count') is not None:
report.append(f" Total Rows: {insights['row_count']:,}")
# Column details
if schema.get('columns'):
report.append(" Column Details:")
for col in schema['columns'][:5]: # Show first 5 columns
nullable = "NULL" if col['nullable'] else "NOT NULL"
report.append(f" - {col['name']}: {col['data_type']} {nullable}")
if len(schema['columns']) > 5:
report.append(f" ... and {len(schema['columns']) - 5} more columns")
# Sample data
if insights.get('sample_data'):
report.append(" Sample Data (first 3 rows):")
for i, row in enumerate(insights['sample_data'][:3]):
report.append(f" Row {i+1}: {row}")
# Relationships
relationships = self.data_model.get('relationships', {}).get('relationships', [])
if relationships:
report.append(f"\nπŸ”— DATABASE RELATIONSHIPS ({len(relationships)} total):")
report.append("-" * 40)
for rel in relationships:
report.append(f" {rel['source_table']}.{rel['source_column']} β†’ {rel['target_table']}.{rel['target_column']}")
return "\n".join(report)
def install_dependencies():
"""Install required dependencies"""
import subprocess
import sys
packages = ['pymssql', 'sqlalchemy']
for package in packages:
try:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
print(f"Successfully installed {package}")
except subprocess.CalledProcessError:
print(f"Failed to install {package}")
def main():
"""Main function to run the data modeling analysis"""
try:
# Try to install dependencies
install_dependencies()
# Focus on specific tables mentioned in HockeyFood_DB.md
focus_tables = ['Exercise', 'Multimedia', 'Serie']
modeler = DatabaseModelerAlternative()
logging.info("Starting comprehensive database analysis...")
# Build the data model
modeler.build_comprehensive_data_model(focus_tables=focus_tables)
# Export to JSON
json_file = modeler.export_data_model()
# Generate and save summary report
summary = modeler.generate_summary_report()
summary_file = f"hockey_db_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(summary)
print(summary)
print(f"\nπŸ“ Files generated:")
print(f" - JSON Data Model: {json_file}")
print(f" - Summary Report: {summary_file}")
except Exception as e:
logging.error(f"Main execution error: {str(e)}")
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()