Spaces:
Running
Running
import asyncio | |
from trauma.api.account.dto import AccountType | |
from trauma.api.account.model import AccountModel | |
from trauma.api.account.schemas import CreateAccountRequest | |
from trauma.api.common.db_requests import check_unique_fields_existence | |
from trauma.core.config import settings | |
async def get_all_model_obj(page_size: int, page_index: int) -> tuple[list[AccountModel], int]: | |
skip = page_size * page_index | |
objects, total_count = await asyncio.gather( | |
settings.DB_CLIENT.accounts | |
.find() | |
.skip(skip) | |
.limit(page_size) | |
.to_list(length=page_size), | |
settings.DB_CLIENT.accounts.count_documents({}) | |
) | |
return objects, total_count | |
async def create_account_obj(data: CreateAccountRequest) -> AccountModel: | |
await check_unique_fields_existence("email", data.email) | |
account = AccountModel(**data.model_dump()) | |
await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) | |
return account | |
async def delete_account(account_id: str) -> None: | |
await settings.DB_CLIENT.accounts.delete_one({"id": account_id}) | |