TraumaBackend / trauma /core /security.py
brestok's picture
add auth
8e611c3
raw
history blame
2.89 kB
from datetime import timedelta, datetime
import anyio
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
from trauma.api.account.dto import AccountType
from trauma.api.account.model import AccountModel
from trauma.core.config import settings
def verify_password(plain_password, hashed_password) -> bool:
result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password)
return result
def create_access_token(email: str, account_id: str):
payload = {
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email,
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id,
"accountId": account_id,
"iss": settings.Issuer,
"aud": settings.Audience,
"exp": datetime.utcnow() + timedelta(days=30)
}
encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
return encoded_jwt
class PermissionDependency:
def __init__(self, account_types: list[AccountType]):
self.account_types = account_types
def __call__(
self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False))
) -> AccountModel | None:
try:
account_id = self.authenticate_jwt_token(credentials.credentials)
account_data = anyio.from_thread.run(self.get_account_by_id, account_id)
self.check_account_health(account_data)
return AccountModel.from_mongo(account_data)
except JWTError:
raise HTTPException(status_code=403, detail="Permission denied")
except Exception:
raise HTTPException(status_code=403, detail="Permission denied")
@staticmethod
async def get_account_by_id(account_id: str) -> dict:
account = await settings.DB_CLIENT.accounts.find_one({"id": account_id})
return account
def check_account_health(self, account: dict):
if not account:
raise HTTPException(status_code=403, detail="Permission denied")
if account['accountType'] not in [i.value for i in self.account_types]:
raise HTTPException(status_code=403, detail="Permission denied")
@staticmethod
def authenticate_jwt_token(token: str) -> str:
payload = jwt.decode(token,
settings.SECRET_KEY,
algorithms="HS256",
audience=settings.Audience)
email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name")
account_id = payload.get("accountId")
if email is None or account_id is None:
raise HTTPException(status_code=403, detail="Permission denied")
return account_id