Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Alternative Data Modeling Script for HockeyFood Database | |
| Supports multiple connection methods and provides fallback options | |
| """ | |
| import os | |
| import json | |
| from dotenv import load_dotenv | |
| from typing import Dict, List, Any, Optional | |
| import logging | |
| from datetime import datetime | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| # Load environment variables | |
| load_dotenv() | |
| class DatabaseModelerAlternative: | |
| def __init__(self): | |
| self.server = os.getenv("DB_SERVER") | |
| self.database = os.getenv("DB_DATABASE") | |
| self.username = os.getenv("DB_USER") | |
| self.password = os.getenv("DB_PASSWORD") | |
| self.encrypt = os.getenv("DB_ENCRYPT", "true").lower() == "true" | |
| self.trust_cert = os.getenv("DB_TRUST_SERVER_CERTIFICATE", "true").lower() == "true" | |
| if not all([self.server, self.database, self.username, self.password]): | |
| raise ValueError("Missing required database connection parameters in .env file") | |
| self.connection = None | |
| self.data_model = {} | |
| def try_pymssql_connection(self): | |
| """Try connecting using pymssql (alternative to pyodbc)""" | |
| try: | |
| import pymssql | |
| self.connection = pymssql.connect( | |
| server=self.server, | |
| user=self.username, | |
| password=self.password, | |
| database=self.database, | |
| timeout=30, | |
| as_dict=True | |
| ) | |
| logging.info(f"Successfully connected to database using pymssql: {self.database}") | |
| return True | |
| except ImportError: | |
| logging.warning("pymssql not available. Install with: pip install pymssql") | |
| return False | |
| except Exception as e: | |
| logging.error(f"pymssql connection failed: {str(e)}") | |
| return False | |
| def try_sqlalchemy_connection(self): | |
| """Try connecting using SQLAlchemy with different drivers""" | |
| try: | |
| from sqlalchemy import create_engine, text | |
| import urllib.parse | |
| # URL-encode the password | |
| password_encoded = urllib.parse.quote_plus(self.password) | |
| # Try different connection strings | |
| connection_strings = [ | |
| f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+17+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes", | |
| f"mssql+pyodbc://{self.username}:{password_encoded}@{self.server}/{self.database}?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=yes&TrustServerCertificate=yes", | |
| f"mssql+pymssql://{self.username}:{password_encoded}@{self.server}/{self.database}", | |
| ] | |
| for conn_str in connection_strings: | |
| try: | |
| engine = create_engine(conn_str) | |
| connection = engine.connect() | |
| # Test the connection | |
| result = connection.execute(text("SELECT 1")) | |
| self.connection = connection | |
| self.engine = engine | |
| logging.info(f"Successfully connected using SQLAlchemy: {self.database}") | |
| return True | |
| except Exception as e: | |
| logging.debug(f"SQLAlchemy connection attempt failed: {str(e)}") | |
| continue | |
| return False | |
| except ImportError: | |
| logging.warning("SQLAlchemy not available. Install with: pip install sqlalchemy") | |
| return False | |
| except Exception as e: | |
| logging.error(f"SQLAlchemy connection failed: {str(e)}") | |
| return False | |
| def connect(self): | |
| """Try multiple connection methods""" | |
| connection_methods = [ | |
| ("pymssql", self.try_pymssql_connection), | |
| ("sqlalchemy", self.try_sqlalchemy_connection), | |
| ] | |
| for method_name, method in connection_methods: | |
| logging.info(f"Trying {method_name} connection...") | |
| if method(): | |
| self.connection_method = method_name | |
| return True | |
| logging.error("All connection methods failed") | |
| return False | |
| def disconnect(self): | |
| """Close database connection""" | |
| if self.connection: | |
| self.connection.close() | |
| logging.info("Database connection closed") | |
| def execute_query(self, query: str, params: tuple = None): | |
| """Execute a query using the established connection""" | |
| try: | |
| if self.connection_method == "pymssql": | |
| cursor = self.connection.cursor() | |
| cursor.execute(query, params or ()) | |
| return cursor.fetchall() | |
| elif self.connection_method == "sqlalchemy": | |
| from sqlalchemy import text | |
| result = self.connection.execute(text(query), params or {}) | |
| return [dict(row._mapping) for row in result] | |
| except Exception as e: | |
| logging.error(f"Query execution failed: {str(e)}") | |
| return [] | |
| def get_all_tables(self) -> List[str]: | |
| """Get list of all tables in the database""" | |
| try: | |
| query = """ | |
| SELECT TABLE_SCHEMA, TABLE_NAME | |
| FROM INFORMATION_SCHEMA.TABLES | |
| WHERE TABLE_TYPE = 'BASE TABLE' | |
| ORDER BY TABLE_NAME | |
| """ | |
| results = self.execute_query(query) | |
| if self.connection_method == "pymssql": | |
| # Store schema info for later use | |
| self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results} | |
| tables = [row['TABLE_NAME'] for row in results] | |
| else: | |
| self.table_schemas = {row['TABLE_NAME']: row['TABLE_SCHEMA'] for row in results} | |
| tables = [row['TABLE_NAME'] for row in results] | |
| logging.info(f"Found {len(tables)} tables in database") | |
| return tables | |
| except Exception as e: | |
| logging.error(f"Error getting tables: {str(e)}") | |
| return [] | |
| def get_table_schema(self, table_name: str) -> Dict[str, Any]: | |
| """Get detailed schema information for a table""" | |
| try: | |
| # Get column information | |
| query = """ | |
| SELECT | |
| COLUMN_NAME, | |
| DATA_TYPE, | |
| IS_NULLABLE, | |
| COLUMN_DEFAULT, | |
| CHARACTER_MAXIMUM_LENGTH, | |
| NUMERIC_PRECISION, | |
| NUMERIC_SCALE, | |
| ORDINAL_POSITION | |
| FROM INFORMATION_SCHEMA.COLUMNS | |
| WHERE TABLE_NAME = %s | |
| ORDER BY ORDINAL_POSITION | |
| """ | |
| if self.connection_method == "sqlalchemy": | |
| query = query.replace("%s", ":table_name") | |
| results = self.execute_query(query, {"table_name": table_name}) | |
| else: | |
| results = self.execute_query(query, (table_name,)) | |
| columns = [] | |
| for row in results: | |
| columns.append({ | |
| 'name': row['COLUMN_NAME'], | |
| 'data_type': row['DATA_TYPE'], | |
| 'nullable': row['IS_NULLABLE'] == 'YES', | |
| 'default': row['COLUMN_DEFAULT'], | |
| 'max_length': row['CHARACTER_MAXIMUM_LENGTH'], | |
| 'precision': row['NUMERIC_PRECISION'], | |
| 'scale': row['NUMERIC_SCALE'], | |
| 'position': row['ORDINAL_POSITION'] | |
| }) | |
| # Get primary keys | |
| pk_query = """ | |
| SELECT COLUMN_NAME | |
| FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE | |
| WHERE TABLE_NAME = %s AND CONSTRAINT_NAME LIKE 'PK_%' | |
| """ | |
| if self.connection_method == "sqlalchemy": | |
| pk_query = pk_query.replace("%s", ":table_name") | |
| pk_results = self.execute_query(pk_query, {"table_name": table_name}) | |
| else: | |
| pk_results = self.execute_query(pk_query, (table_name,)) | |
| primary_keys = [row['COLUMN_NAME'] for row in pk_results] | |
| # Get foreign keys - Using sys tables for Azure SQL compatibility | |
| fk_query = """ | |
| SELECT | |
| c1.name as COLUMN_NAME, | |
| OBJECT_NAME(fk.referenced_object_id) as REFERENCED_TABLE_NAME, | |
| c2.name as REFERENCED_COLUMN_NAME | |
| FROM sys.foreign_keys fk | |
| JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id | |
| JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id | |
| JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id | |
| JOIN sys.tables t ON fk.parent_object_id = t.object_id | |
| WHERE t.name = %s | |
| """ | |
| if self.connection_method == "sqlalchemy": | |
| fk_query = fk_query.replace("%s", ":table_name") | |
| fk_results = self.execute_query(fk_query, {"table_name": table_name}) | |
| else: | |
| fk_results = self.execute_query(fk_query, (table_name,)) | |
| foreign_keys = [] | |
| for row in fk_results: | |
| foreign_keys.append({ | |
| 'column': row['COLUMN_NAME'], | |
| 'referenced_table': row['REFERENCED_TABLE_NAME'], | |
| 'referenced_column': row['REFERENCED_COLUMN_NAME'] | |
| }) | |
| return { | |
| 'columns': columns, | |
| 'primary_keys': primary_keys, | |
| 'foreign_keys': foreign_keys | |
| } | |
| except Exception as e: | |
| logging.error(f"Error getting schema for table {table_name}: {str(e)}") | |
| return {} | |
| def get_table_data_insights(self, table_name: str, sample_size: int = 100) -> Dict[str, Any]: | |
| """Get data insights for a table including sample data and statistics""" | |
| try: | |
| # Get the correct schema for the table | |
| schema_name = getattr(self, 'table_schemas', {}).get(table_name, 'dbo') | |
| full_table_name = f"[{schema_name}].[{table_name}]" | |
| # Get row count with proper schema qualification | |
| count_query = f"SELECT COUNT(*) as row_count FROM {full_table_name}" | |
| count_results = self.execute_query(count_query) | |
| row_count = count_results[0]['row_count'] if count_results else 0 | |
| # Get sample data with proper schema qualification | |
| sample_query = f"SELECT TOP {sample_size} * FROM {full_table_name}" | |
| sample_results = self.execute_query(sample_query) | |
| # Convert sample data to list of dictionaries | |
| sample_data = [] | |
| for row in sample_results[:10]: # Limit to first 10 rows | |
| sample_data.append({k: str(v) if v is not None else None for k, v in row.items()}) | |
| # Get basic column statistics (simplified) | |
| data_analysis = {} | |
| if sample_results: | |
| columns = list(sample_results[0].keys()) | |
| for column in columns: | |
| try: | |
| # Basic analysis for each column with proper schema qualification | |
| col_query = f""" | |
| SELECT | |
| COUNT(*) as total_count, | |
| COUNT([{column}]) as non_null_count, | |
| COUNT(DISTINCT [{column}]) as distinct_count | |
| FROM {full_table_name} | |
| """ | |
| col_results = self.execute_query(col_query) | |
| if col_results: | |
| result = col_results[0] | |
| data_analysis[column] = { | |
| 'total_count': result['total_count'], | |
| 'non_null_count': result['non_null_count'], | |
| 'distinct_count': result['distinct_count'], | |
| 'null_percentage': round((result['total_count'] - result['non_null_count']) / result['total_count'] * 100, 2) if result['total_count'] > 0 else 0 | |
| } | |
| except Exception as e: | |
| logging.warning(f"Could not analyze column {column} in table {table_name}: {str(e)}") | |
| data_analysis[column] = {'error': str(e)} | |
| return { | |
| 'row_count': row_count, | |
| 'sample_data': sample_data, | |
| 'data_analysis': data_analysis | |
| } | |
| except Exception as e: | |
| logging.error(f"Error getting data insights for table {table_name}: {str(e)}") | |
| return {} | |
| def analyze_relationships(self) -> Dict[str, Any]: | |
| """Analyze relationships between tables""" | |
| try: | |
| # Simplified query for Azure SQL - use sys tables instead of INFORMATION_SCHEMA | |
| query = """ | |
| SELECT | |
| OBJECT_NAME(fk.parent_object_id) as source_table, | |
| c1.name as source_column, | |
| OBJECT_NAME(fk.referenced_object_id) as target_table, | |
| c2.name as target_column, | |
| fk.name as constraint_name | |
| FROM sys.foreign_keys fk | |
| JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id | |
| JOIN sys.columns c1 ON fkc.parent_object_id = c1.object_id AND fkc.parent_column_id = c1.column_id | |
| JOIN sys.columns c2 ON fkc.referenced_object_id = c2.object_id AND fkc.referenced_column_id = c2.column_id | |
| ORDER BY source_table | |
| """ | |
| results = self.execute_query(query) | |
| relationships = [] | |
| for row in results: | |
| relationships.append({ | |
| 'source_table': row['source_table'], | |
| 'source_column': row['source_column'], | |
| 'target_table': row['target_table'], | |
| 'target_column': row['target_column'], | |
| 'constraint_name': row['constraint_name'] | |
| }) | |
| return {'relationships': relationships} | |
| except Exception as e: | |
| logging.error(f"Error analyzing relationships: {str(e)}") | |
| return {} | |
| def build_comprehensive_data_model(self, focus_tables: List[str] = None): | |
| """Build comprehensive data model for the entire database""" | |
| if not self.connect(): | |
| return | |
| try: | |
| tables = self.get_all_tables() | |
| if focus_tables: | |
| # Filter to focus tables if they exist | |
| available_focus_tables = [t for t in focus_tables if t in tables] | |
| if available_focus_tables: | |
| tables = available_focus_tables | |
| logging.info(f"Focusing on tables: {available_focus_tables}") | |
| else: | |
| logging.warning(f"Focus tables {focus_tables} not found. Analyzing all tables.") | |
| logging.info(f"Available tables: {tables}") | |
| self.data_model = { | |
| 'database_info': { | |
| 'name': self.database, | |
| 'server': self.server, | |
| 'connection_method': self.connection_method, | |
| 'analysis_date': datetime.now().isoformat(), | |
| 'total_tables': len(tables), | |
| 'analyzed_tables': tables | |
| }, | |
| 'tables': {}, | |
| 'relationships': self.analyze_relationships() | |
| } | |
| # Analyze each table | |
| for table_name in tables: | |
| logging.info(f"Analyzing table: {table_name}") | |
| self.data_model['tables'][table_name] = { | |
| 'schema': self.get_table_schema(table_name), | |
| 'data_insights': self.get_table_data_insights(table_name) | |
| } | |
| logging.info("Data model analysis completed successfully") | |
| except Exception as e: | |
| logging.error(f"Error building data model: {str(e)}") | |
| finally: | |
| self.disconnect() | |
| def export_data_model(self, filename: str = None): | |
| """Export data model to JSON file""" | |
| if not filename: | |
| filename = f"hockey_db_data_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(self.data_model, f, indent=2, ensure_ascii=False) | |
| logging.info(f"Data model exported to: {filename}") | |
| return filename | |
| except Exception as e: | |
| logging.error(f"Error exporting data model: {str(e)}") | |
| return None | |
| def generate_summary_report(self) -> str: | |
| """Generate a human-readable summary report""" | |
| if not self.data_model: | |
| return "No data model available. Run build_comprehensive_data_model() first." | |
| report = [] | |
| report.append("=" * 60) | |
| report.append("HOCKEY DATABASE DATA MODEL SUMMARY") | |
| report.append("=" * 60) | |
| report.append(f"Database: {self.data_model['database_info']['name']}") | |
| report.append(f"Server: {self.data_model['database_info']['server']}") | |
| report.append(f"Connection Method: {self.data_model['database_info']['connection_method']}") | |
| report.append(f"Analysis Date: {self.data_model['database_info']['analysis_date']}") | |
| report.append(f"Total Tables Analyzed: {self.data_model['database_info']['total_tables']}") | |
| report.append(f"Tables: {', '.join(self.data_model['database_info']['analyzed_tables'])}") | |
| report.append("") | |
| # Table summaries | |
| report.append("TABLE SUMMARIES:") | |
| report.append("-" * 40) | |
| for table_name, table_data in self.data_model['tables'].items(): | |
| report.append(f"\nπ TABLE: {table_name}") | |
| schema = table_data.get('schema', {}) | |
| insights = table_data.get('data_insights', {}) | |
| if schema.get('columns'): | |
| report.append(f" Columns: {len(schema['columns'])}") | |
| report.append(f" Primary Keys: {', '.join(schema.get('primary_keys', []))}") | |
| if schema.get('foreign_keys'): | |
| report.append(" Foreign Keys:") | |
| for fk in schema['foreign_keys']: | |
| report.append(f" - {fk['column']} β {fk['referenced_table']}.{fk['referenced_column']}") | |
| if insights.get('row_count') is not None: | |
| report.append(f" Total Rows: {insights['row_count']:,}") | |
| # Column details | |
| if schema.get('columns'): | |
| report.append(" Column Details:") | |
| for col in schema['columns'][:5]: # Show first 5 columns | |
| nullable = "NULL" if col['nullable'] else "NOT NULL" | |
| report.append(f" - {col['name']}: {col['data_type']} {nullable}") | |
| if len(schema['columns']) > 5: | |
| report.append(f" ... and {len(schema['columns']) - 5} more columns") | |
| # Sample data | |
| if insights.get('sample_data'): | |
| report.append(" Sample Data (first 3 rows):") | |
| for i, row in enumerate(insights['sample_data'][:3]): | |
| report.append(f" Row {i+1}: {row}") | |
| # Relationships | |
| relationships = self.data_model.get('relationships', {}).get('relationships', []) | |
| if relationships: | |
| report.append(f"\nπ DATABASE RELATIONSHIPS ({len(relationships)} total):") | |
| report.append("-" * 40) | |
| for rel in relationships: | |
| report.append(f" {rel['source_table']}.{rel['source_column']} β {rel['target_table']}.{rel['target_column']}") | |
| return "\n".join(report) | |
| def install_dependencies(): | |
| """Install required dependencies""" | |
| import subprocess | |
| import sys | |
| packages = ['pymssql', 'sqlalchemy'] | |
| for package in packages: | |
| try: | |
| subprocess.check_call([sys.executable, '-m', 'pip', 'install', package]) | |
| print(f"Successfully installed {package}") | |
| except subprocess.CalledProcessError: | |
| print(f"Failed to install {package}") | |
| def main(): | |
| """Main function to run the data modeling analysis""" | |
| try: | |
| # Try to install dependencies | |
| install_dependencies() | |
| # Focus on specific tables mentioned in HockeyFood_DB.md | |
| focus_tables = ['Exercise', 'Multimedia', 'Serie'] | |
| modeler = DatabaseModelerAlternative() | |
| logging.info("Starting comprehensive database analysis...") | |
| # Build the data model | |
| modeler.build_comprehensive_data_model(focus_tables=focus_tables) | |
| # Export to JSON | |
| json_file = modeler.export_data_model() | |
| # Generate and save summary report | |
| summary = modeler.generate_summary_report() | |
| summary_file = f"hockey_db_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" | |
| with open(summary_file, 'w', encoding='utf-8') as f: | |
| f.write(summary) | |
| print(summary) | |
| print(f"\nπ Files generated:") | |
| print(f" - JSON Data Model: {json_file}") | |
| print(f" - Summary Report: {summary_file}") | |
| except Exception as e: | |
| logging.error(f"Main execution error: {str(e)}") | |
| print(f"Error: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |