from datetime import timedelta, datetime import anyio from fastapi import Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import jwt, JWTError from passlib.context import CryptContext from trauma.api.account.dto import AccountType from trauma.api.account.model import AccountModel from trauma.core.config import settings def verify_password(plain_password, hashed_password) -> bool: result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password) return result def create_access_token(email: str, account_id: str): payload = { "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email, "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id, "accountId": account_id, "iss": settings.Issuer, "aud": settings.Audience, "exp": datetime.utcnow() + timedelta(days=30) } encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") return encoded_jwt class PermissionDependency: def __init__(self, account_types: list[AccountType]): self.account_types = account_types def __call__( self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)) ) -> AccountModel | None: try: account_id = self.authenticate_jwt_token(credentials.credentials) account_data = anyio.from_thread.run(self.get_account_by_id, account_id) self.check_account_health(account_data) return AccountModel.from_mongo(account_data) except JWTError: raise HTTPException(status_code=403, detail="Permission denied") except Exception: raise HTTPException(status_code=403, detail="Permission denied") @staticmethod async def get_account_by_id(account_id: str) -> dict: account = await settings.DB_CLIENT.accounts.find_one({"id": account_id}) return account def check_account_health(self, account: dict): if not account: raise HTTPException(status_code=403, detail="Permission denied") if account['accountType'] not in [i.value for i in self.account_types]: raise HTTPException(status_code=403, detail="Permission denied") @staticmethod def authenticate_jwt_token(token: str) -> str: payload = jwt.decode(token, settings.SECRET_KEY, algorithms="HS256", audience=settings.Audience) email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name") account_id = payload.get("accountId") if email is None or account_id is None: raise HTTPException(status_code=403, detail="Permission denied") return account_id