Spaces:
Sleeping
Sleeping
| from datetime import timedelta, datetime | |
| import anyio | |
| from fastapi import Depends, HTTPException | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from jose import jwt, JWTError | |
| from passlib.context import CryptContext | |
| from trauma.api.account.model import AccountModel | |
| from trauma.core.config import settings | |
| def verify_password(plain_password, hashed_password) -> bool: | |
| result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password) | |
| return result | |
| def create_access_token(email: str, account_id: str): | |
| payload = { | |
| "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email, | |
| "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id, | |
| "accountId": account_id, | |
| "iss": settings.Issuer, | |
| "aud": settings.Audience, | |
| "exp": datetime.utcnow() + timedelta(days=30) | |
| } | |
| encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") | |
| return encoded_jwt | |
| class PermissionDependency: | |
| def __init__(self, is_public: bool = False): | |
| self.is_public = is_public | |
| def __call__( | |
| self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)) | |
| ) -> AccountModel | None: | |
| if credentials is None: | |
| if self.is_public: | |
| return None | |
| else: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| try: | |
| account_id = self.authenticate_jwt_token(credentials.credentials) | |
| account_data = anyio.from_thread.run(self.get_account_by_id, account_id) | |
| self.check_account_health(account_data) | |
| return account_data | |
| except JWTError: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| async def get_account_by_id(self, account_id: str) -> AccountModel: | |
| account = await settings.DB_CLIENT.accounts.find_one({"id": account_id}) | |
| return AccountModel.from_mongo(account) | |
| def check_account_health(account: AccountModel): | |
| if not account: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| def authenticate_jwt_token(self, token: str) -> str: | |
| payload = jwt.decode(token, | |
| settings.SECRET_KEY, | |
| algorithms="HS256", | |
| audience=settings.Audience) | |
| email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name") | |
| account_id = payload.get("accountId") | |
| if email is None or account_id is None: | |
| raise HTTPException(status_code=403, detail="Permission denied") | |
| return account_id | |