TraumaBackend / trauma /core /security.py
brestok's picture
init
50553ea
raw
history blame
2.76 kB
from datetime import timedelta, datetime
import anyio
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
from trauma.api.account.model import AccountModel
from trauma.core.config import settings
def verify_password(plain_password, hashed_password) -> bool:
result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password)
return result
def create_access_token(email: str, account_id: str):
payload = {
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email,
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id,
"accountId": account_id,
"iss": settings.Issuer,
"aud": settings.Audience,
"exp": datetime.utcnow() + timedelta(days=30)
}
encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
return encoded_jwt
class PermissionDependency:
def __init__(self, is_public: bool = False):
self.is_public = is_public
def __call__(
self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False))
) -> AccountModel | None:
if credentials is None:
if self.is_public:
return None
else:
raise HTTPException(status_code=403, detail="Permission denied")
try:
account_id = self.authenticate_jwt_token(credentials.credentials)
account_data = anyio.from_thread.run(self.get_account_by_id, account_id)
self.check_account_health(account_data)
return account_data
except JWTError:
raise HTTPException(status_code=403, detail="Permission denied")
async def get_account_by_id(self, account_id: str) -> AccountModel:
account = await settings.DB_CLIENT.accounts.find_one({"id": account_id})
return AccountModel.from_mongo(account)
@staticmethod
def check_account_health(account: AccountModel):
if not account:
raise HTTPException(status_code=403, detail="Permission denied")
def authenticate_jwt_token(self, token: str) -> str:
payload = jwt.decode(token,
settings.SECRET_KEY,
algorithms="HS256",
audience=settings.Audience)
email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name")
account_id = payload.get("accountId")
if email is None or account_id is None:
raise HTTPException(status_code=403, detail="Permission denied")
return account_id