#!/usr/bin/env python3 """ Alternative Data Modeling Script for HockeyFood Database Supports multiple connection methods and provides fallback options """ import os import json from dotenv import load_dotenv from typing import Dict, List, Any, Optional import logging from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Load environment variables load_dotenv() class DatabaseModelerAlternative: def __init__(self): self.server = os.getenv("DB_SERVER") self.database = os.getenv("DB_DATABASE") self.username = os.getenv("DB_USER") self.password = os.getenv("DB_PASSWORD") self.encrypt = os.getenv("DB_ENCRYPT", "true").lower() == "true" self.trust_cert = os.getenv("DB_TRUST_SERVER_CERTIFICATE", "true").lower() == "true" if not all([self.server, self.database, self.username, self.password]): raise ValueError("Missing required database connection parameters in .env file") self.connection = None self.data_model = {} def try_pymssql_connection(self): """Try connecting using pymssql (alternative to pyodbc)""" try: import pymssql self.connection = pymssql.connect( server=self.server, user=self.username, password=self.password, database=self.database, timeout=30, as_dict=True ) logging.info(f"Successfully connected to database using pymssql: {self.database}") return True except ImportError: logging.warning("pymssql not available. Install with: pip install pymssql") return False except Exception as e: logging.error(f"pymssql connection failed: {str(e)}") return False def try_sqlalchemy_connection(self): """Try connecting using SQLAlchemy with different drivers""" try: from sqlalchemy import create_engine, text import urllib.parse # URL-encode the password password_encoded = urllib.parse.quote_plus(self.password) # Try different connection strings connection_strings = [ f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+17+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes", f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes", f"mssql+pymssql://{self.username}:{password_encoded}@{self.server}/{self.database}", ] for conn_str in connection_strings: try: engine = create_engine(conn_str) connection = engine.connect() # Test the connection result = connection.execute(text("SELECT 1")) self.connection = connection self.engine = engine logging.info(f"Successfully connected using SQLAlchemy: {self.database}") return True except Exception as e: logging.debug(f"SQLAlchemy connection attempt failed: {str(e)}") continue return False except ImportError: logging.warning("SQLAlchemy not available. Install with: pip install sqlalchemy") return False except Exception as e: logging.error(f"SQLAlchemy connection failed: {str(e)}") return False def connect(self): """Try multiple connection methods""" connection_methods = [ ("pymssql", self.try_pymssql_connection), ("sqlalchemy", self.try_sqlalchemy_connection), ] for method_name, method in connection_methods: logging.info(f"Trying {method_name} connection...") if method(): self.connection_method = method_name return True logging.error("All connection methods failed") return False def disconnect(self): """Close database connection""" if self.connection: self.connection.close() logging.info("Database connection closed") def execute_query(self, query: str, params: tuple = None): """Execute a query using the established connection""" try: if self.connection_method == "pymssql": cursor = self.connection.cursor() cursor.execute(query, params or ()) return cursor.fetchall() elif self.connection_method == "sqlalchemy": from sqlalchemy import text result = self.connection.execute(text(query), params or {}) return [dict(row._mapping) for row in result] except Exception as e: logging.error(f"Query execution failed: {str(e)}") return [] def get_all_tables(self) -> List[str]: """Get list of all tables in the database""" try: query = """ SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME """ results = self.execute_query(query) if self.connection_method == "pymssql": # Store schema info for later use self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results} tables = [row['TABLE_NAME'] for row in results] else: self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results} tables = [row['TABLE_NAME'] for row in results] logging.info(f"Found {len(tables)} tables in database") return tables except Exception as e: logging.error(f"Error getting tables: {str(e)}") return [] def get_table_schema(self, table_name: str) -> Dict[str, Any]: """Get detailed schema information for a table""" try: # Get column information query = """ SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_DEFAULT, CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = %s ORDER BY ORDINAL_POSITION """ if self.connection_method == "sqlalchemy": query = query.replace("%s", ":table_name") results = self.execute_query(query, {"table_name": table_name}) else: results = self.execute_query(query, (table_name,)) columns = [] for row in results: columns.append({ 'name': row['COLUMN_NAME'], 'data_type': row['DATA_TYPE'], 'nullable': row['IS_NULLABLE'] == 'YES', 'default': row['COLUMN_DEFAULT'], 'max_length': row['CHARACTER_MAXIMUM_LENGTH'], 'precision': row['NUMERIC_PRECISION'], 'scale': row['NUMERIC_SCALE'], 'position': row['ORDINAL_POSITION'] }) # Get primary keys pk_query = """ SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME = %s AND CONSTRAINT_NAME LIKE 'PK_%' """ if self.connection_method == "sqlalchemy": pk_query = pk_query.replace("%s", ":table_name") pk_results = self.execute_query(pk_query, {"table_name": table_name}) else: pk_results = self.execute_query(pk_query, (table_name,)) primary_keys = [row['COLUMN_NAME'] for row in pk_results] # Get foreign keys - Using sys tables for Azure SQL compatibility fk_query = """ SELECT c1.name as COLUMN_NAME, OBJECT_NAME(fk.referenced_object_id) as REFERENCED_TABLE_NAME, c2.name as REFERENCED_COLUMN_NAME FROM sys.foreign_keys fk JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id JOIN sys.tables t ON fk.parent_object_id = t.object_id WHERE t.name = %s """ if self.connection_method == "sqlalchemy": fk_query = fk_query.replace("%s", ":table_name") fk_results = self.execute_query(fk_query, {"table_name": table_name}) else: fk_results = self.execute_query(fk_query, (table_name,)) foreign_keys = [] for row in fk_results: foreign_keys.append({ 'column': row['COLUMN_NAME'], 'referenced_table': row['REFERENCED_TABLE_NAME'], 'referenced_column': row['REFERENCED_COLUMN_NAME'] }) return { 'columns': columns, 'primary_keys': primary_keys, 'foreign_keys': foreign_keys } except Exception as e: logging.error(f"Error getting schema for table {table_name}: {str(e)}") return {} def get_table_data_insights(self, table_name: str, sample_size: int = 100) -> Dict[str, Any]: """Get data insights for a table including sample data and statistics""" try: # Get the correct schema for the table schema_name = getattr(self, 'table_schemas', {}).get(table_name, 'dbo') full_table_name = f"[{schema_name}].[{table_name}]" # Get row count with proper schema qualification count_query = f"SELECT COUNT(*) as row_count FROM {full_table_name}" count_results = self.execute_query(count_query) row_count = count_results[0]['row_count'] if count_results else 0 # Get sample data with proper schema qualification sample_query = f"SELECT TOP {sample_size} * FROM {full_table_name}" sample_results = self.execute_query(sample_query) # Convert sample data to list of dictionaries sample_data = [] for row in sample_results[:10]: # Limit to first 10 rows sample_data.append({k: str(v) if v is not None else None for k, v in row.items()}) # Get basic column statistics (simplified) data_analysis = {} if sample_results: columns = list(sample_results[0].keys()) for column in columns: try: # Basic analysis for each column with proper schema qualification col_query = f""" SELECT COUNT(*) as total_count, COUNT([{column}]) as non_null_count, COUNT(DISTINCT [{column}]) as distinct_count FROM {full_table_name} """ col_results = self.execute_query(col_query) if col_results: result = col_results[0] data_analysis[column] = { 'total_count': result['total_count'], 'non_null_count': result['non_null_count'], 'distinct_count': result['distinct_count'], 'null_percentage': round((result['total_count'] - result['non_null_count']) / result['total_count'] * 100, 2) if result['total_count'] > 0 else 0 } except Exception as e: logging.warning(f"Could not analyze column {column} in table {table_name}: {str(e)}") data_analysis[column] = {'error': str(e)} return { 'row_count': row_count, 'sample_data': sample_data, 'data_analysis': data_analysis } except Exception as e: logging.error(f"Error getting data insights for table {table_name}: {str(e)}") return {} def analyze_relationships(self) -> Dict[str, Any]: """Analyze relationships between tables""" try: # Simplified query for Azure SQL - use sys tables instead of INFORMATION_SCHEMA query = """ SELECT OBJECT_NAME(fk.parent_object_id) as source_table, c1.name as source_column, OBJECT_NAME(fk.referenced_object_id) as target_table, c2.name as target_column, fk.name as constraint_name FROM sys.foreign_keys fk JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id ORDER BY source_table """ results = self.execute_query(query) relationships = [] for row in results: relationships.append({ 'source_table': row['source_table'], 'source_column': row['source_column'], 'target_table': row['target_table'], 'target_column': row['target_column'], 'constraint_name': row['constraint_name'] }) return {'relationships': relationships} except Exception as e: logging.error(f"Error analyzing relationships: {str(e)}") return {} def build_comprehensive_data_model(self, focus_tables: List[str] = None): """Build comprehensive data model for the entire database""" if not self.connect(): return try: tables = self.get_all_tables() if focus_tables: # Filter to focus tables if they exist available_focus_tables = [t for t in focus_tables if t in tables] if available_focus_tables: tables = available_focus_tables logging.info(f"Focusing on tables: {available_focus_tables}") else: logging.warning(f"Focus tables {focus_tables} not found. Analyzing all tables.") logging.info(f"Available tables: {tables}") self.data_model = { 'database_info': { 'name': self.database, 'server': self.server, 'connection_method': self.connection_method, 'analysis_date': datetime.now().isoformat(), 'total_tables': len(tables), 'analyzed_tables': tables }, 'tables': {}, 'relationships': self.analyze_relationships() } # Analyze each table for table_name in tables: logging.info(f"Analyzing table: {table_name}") self.data_model['tables'][table_name] = { 'schema': self.get_table_schema(table_name), 'data_insights': self.get_table_data_insights(table_name) } logging.info("Data model analysis completed successfully") except Exception as e: logging.error(f"Error building data model: {str(e)}") finally: self.disconnect() def export_data_model(self, filename: str = None): """Export data model to JSON file""" if not filename: filename = f"hockey_db_data_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(self.data_model, f, indent=2, ensure_ascii=False) logging.info(f"Data model exported to: {filename}") return filename except Exception as e: logging.error(f"Error exporting data model: {str(e)}") return None def generate_summary_report(self) -> str: """Generate a human-readable summary report""" if not self.data_model: return "No data model available. Run build_comprehensive_data_model() first." report = [] report.append("=" * 60) report.append("HOCKEY DATABASE DATA MODEL SUMMARY") report.append("=" * 60) report.append(f"Database: {self.data_model['database_info']['name']}") report.append(f"Server: {self.data_model['database_info']['server']}") report.append(f"Connection Method: {self.data_model['database_info']['connection_method']}") report.append(f"Analysis Date: {self.data_model['database_info']['analysis_date']}") report.append(f"Total Tables Analyzed: {self.data_model['database_info']['total_tables']}") report.append(f"Tables: {', '.join(self.data_model['database_info']['analyzed_tables'])}") report.append("") # Table summaries report.append("TABLE SUMMARIES:") report.append("-" * 40) for table_name, table_data in self.data_model['tables'].items(): report.append(f"\nšŸ“Š TABLE: {table_name}") schema = table_data.get('schema', {}) insights = table_data.get('data_insights', {}) if schema.get('columns'): report.append(f" Columns: {len(schema['columns'])}") report.append(f" Primary Keys: {', '.join(schema.get('primary_keys', []))}") if schema.get('foreign_keys'): report.append(" Foreign Keys:") for fk in schema['foreign_keys']: report.append(f" - {fk['column']} → {fk['referenced_table']}.{fk['referenced_column']}") if insights.get('row_count') is not None: report.append(f" Total Rows: {insights['row_count']:,}") # Column details if schema.get('columns'): report.append(" Column Details:") for col in schema['columns'][:5]: # Show first 5 columns nullable = "NULL" if col['nullable'] else "NOT NULL" report.append(f" - {col['name']}: {col['data_type']} {nullable}") if len(schema['columns']) > 5: report.append(f" ... and {len(schema['columns']) - 5} more columns") # Sample data if insights.get('sample_data'): report.append(" Sample Data (first 3 rows):") for i, row in enumerate(insights['sample_data'][:3]): report.append(f" Row {i+1}: {row}") # Relationships relationships = self.data_model.get('relationships', {}).get('relationships', []) if relationships: report.append(f"\nšŸ”— DATABASE RELATIONSHIPS ({len(relationships)} total):") report.append("-" * 40) for rel in relationships: report.append(f" {rel['source_table']}.{rel['source_column']} → {rel['target_table']}.{rel['target_column']}") return "\n".join(report) def install_dependencies(): """Install required dependencies""" import subprocess import sys packages = ['pymssql', 'sqlalchemy'] for package in packages: try: subprocess.check_call([sys.executable, '-m', 'pip', 'install', package]) print(f"Successfully installed {package}") except subprocess.CalledProcessError: print(f"Failed to install {package}") def main(): """Main function to run the data modeling analysis""" try: # Try to install dependencies install_dependencies() # Focus on specific tables mentioned in HockeyFood_DB.md focus_tables = ['Exercise', 'Multimedia', 'Serie'] modeler = DatabaseModelerAlternative() logging.info("Starting comprehensive database analysis...") # Build the data model modeler.build_comprehensive_data_model(focus_tables=focus_tables) # Export to JSON json_file = modeler.export_data_model() # Generate and save summary report summary = modeler.generate_summary_report() summary_file = f"hockey_db_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(summary_file, 'w', encoding='utf-8') as f: f.write(summary) print(summary) print(f"\nšŸ“ Files generated:") print(f" - JSON Data Model: {json_file}") print(f" - Summary Report: {summary_file}") except Exception as e: logging.error(f"Main execution error: {str(e)}") print(f"Error: {str(e)}") if __name__ == "__main__": main()