Spaces:
Running
Running
from datetime import timedelta, datetime | |
import anyio | |
from fastapi import Depends, HTTPException | |
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
from jose import jwt, JWTError | |
from passlib.context import CryptContext | |
from trauma.api.account.model import AccountModel | |
from trauma.core.config import settings | |
def verify_password(plain_password, hashed_password) -> bool: | |
result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password) | |
return result | |
def create_access_token(email: str, account_id: str): | |
payload = { | |
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email, | |
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id, | |
"accountId": account_id, | |
"iss": settings.Issuer, | |
"aud": settings.Audience, | |
"exp": datetime.utcnow() + timedelta(days=30) | |
} | |
encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") | |
return encoded_jwt | |
class PermissionDependency: | |
def __init__(self, is_public: bool = False): | |
self.is_public = is_public | |
def __call__( | |
self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)) | |
) -> AccountModel | None: | |
if credentials is None: | |
if self.is_public: | |
return None | |
else: | |
raise HTTPException(status_code=403, detail="Permission denied") | |
try: | |
account_id = self.authenticate_jwt_token(credentials.credentials) | |
account_data = anyio.from_thread.run(self.get_account_by_id, account_id) | |
self.check_account_health(account_data) | |
return account_data | |
except JWTError: | |
raise HTTPException(status_code=403, detail="Permission denied") | |
async def get_account_by_id(self, account_id: str) -> AccountModel: | |
account = await settings.DB_CLIENT.accounts.find_one({"id": account_id}) | |
return AccountModel.from_mongo(account) | |
def check_account_health(account: AccountModel): | |
if not account: | |
raise HTTPException(status_code=403, detail="Permission denied") | |
def authenticate_jwt_token(self, token: str) -> str: | |
payload = jwt.decode(token, | |
settings.SECRET_KEY, | |
algorithms="HS256", | |
audience=settings.Audience) | |
email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name") | |
account_id = payload.get("accountId") | |
if email is None or account_id is None: | |
raise HTTPException(status_code=403, detail="Permission denied") | |
return account_id | |