Spaces:
Running
Running
| from azure.cosmos import CosmosClient, exceptions | |
| import os | |
| def diagnose_cosmos_connection(): | |
| endpoint = os.environ.get("COSMOS_ENDPOINT") | |
| key = os.environ.get("COSMOS_KEY") | |
| if not all([endpoint, key]): | |
| raise ValueError("Please ensure COSMOS_ENDPOINT and COSMOS_KEY are set.") | |
| client = CosmosClient(endpoint, key) | |
| print("Attempting to connect to Cosmos DB...") | |
| try: | |
| # List databases | |
| print("Listing databases:") | |
| databases = list(client.list_databases()) | |
| for db in databases: | |
| print(f"- {db['id']}") | |
| # Try to list containers for each database | |
| try: | |
| containers = list(client.get_database_client(db['id']).list_containers()) | |
| print(f" Containers in {db['id']}:") | |
| for container in containers: | |
| print(f" - {container['id']}") | |
| except exceptions.CosmosResourceNotFoundError: | |
| print(f" Unable to list containers in {db['id']}") | |
| except Exception as e: | |
| print(f" Error listing containers in {db['id']}: {str(e)}") | |
| print() # Add a blank line for readability | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| if __name__ == "__main__": | |
| diagnose_cosmos_connection() |