Spaces:
Sleeping
Sleeping
from concurrent.futures import ThreadPoolExecutor | |
from chromadb.api.client import AdminClient, Client | |
from chromadb.config import DEFAULT_TENANT | |
def test_multiple_clients_concurrently(client: Client) -> None: | |
"""Tests running multiple clients, each against their own database, concurrently.""" | |
client.reset() | |
admin_client = AdminClient.from_system(client._system) | |
admin_client.create_database("test_db") | |
CLIENT_COUNT = 50 | |
COLLECTION_COUNT = 10 | |
# Each database will create the same collections by name, with differing metadata | |
databases = [f"db{i}" for i in range(CLIENT_COUNT)] | |
for database in databases: | |
admin_client.create_database(database) | |
collections = [f"collection{i}" for i in range(COLLECTION_COUNT)] | |
# Create N clients, each on a seperate thread, each with their own database | |
def run_target(n: int) -> None: | |
thread_client = Client( | |
tenant=DEFAULT_TENANT, | |
database=databases[n], | |
settings=client._system.settings, | |
) | |
for collection in collections: | |
thread_client.create_collection( | |
collection, metadata={"database": databases[n]} | |
) | |
with ThreadPoolExecutor(max_workers=CLIENT_COUNT) as executor: | |
executor.map(run_target, range(CLIENT_COUNT)) | |
# Create a final client, which will be used to verify the collections were created | |
client = Client(settings=client._system.settings) | |
# Verify that the collections were created | |
for database in databases: | |
client.set_database(database) | |
seen_collections = client.list_collections() | |
assert len(seen_collections) == COLLECTION_COUNT | |
for collection in seen_collections: | |
assert collection.name in collections | |
assert collection.metadata == {"database": database} | |