File size: 3,540 Bytes
d2726bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
from fastapi import APIRouter, Depends, HTTPException, status, Security, Body, Query
from fastapi.security import OAuth2PasswordRequestForm
from models.user import UserSignup
from db.supabase_service import get_supabase
from typing import Annotated
from supabase import Client
from utils.auth import get_id
from fastapi.encoders import jsonable_encoder
from models.enums import Role
from gotrue.errors import AuthApiError
from crud.user import update_user
from utils.exceptions import BAD_REQUEST, CONFLICT, NOT_FOUND
router = APIRouter(tags=["Authentication"])
@router.post("/login", description="Login user using email and password")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
supabase: Annotated[Client, Depends(get_supabase)],
):
email = form_data.username
password = form_data.password
try:
user = supabase.auth.sign_in_with_password(
{"email": email, "password": password}
)
return {
"access_token": user.session.access_token,
"token_type": "bearer",
}
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect email or password",
)
@router.post(
"/signup", description="Sign up new user", status_code=status.HTTP_201_CREATED
)
async def signup(
user: Annotated[UserSignup, Body()],
supabase: Annotated[Client, Depends(get_supabase)],
):
email = user.email
password = user.password
if len(password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password should be at least 6 characters",
)
try:
user.role = Role.user
res = supabase.auth.sign_up(
{
"email": email,
"password": password,
"options": {
"data": jsonable_encoder(user, exclude=("password", "email"))
},
}
)
return {"detail": "User created"}
except AuthApiError:
raise CONFLICT
except:
raise BAD_REQUEST
@router.post("/reset_password", description="Send reset password email")
async def reset_password(
supabase: Annotated[Client, Depends(get_supabase)],
email: str,
):
try:
if (
len(supabase.rpc("get_user_id_by_email", {"email": email}).execute().data)
> 0
):
supabase.auth.reset_password_email(email)
return {"detail": "Reset password email sent"}
else:
raise NOT_FOUND
except:
raise NOT_FOUND
@router.post("/reset_password_confirm", description="Reset password")
async def reset_password_confirm(
supabase: Annotated[Client, Depends(get_supabase)],
email: Annotated[str, Query(description="Email", title="Email")],
new_password: Annotated[
str, Query(min_length=6, description="New password", title="New password")
],
token: Annotated[
str, Query(description="Reset password token", title="Reset password token")
],
):
try:
supabase.auth.verify_otp(
{
"email": email,
"token": token,
"type": "email",
}
)
await update_user(supabase, password=new_password)
return {"detail": "Your password has been reset"}
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect token",
)
|