|
from fastapi import APIRouter, Depends, HTTPException, status, Security, Body, Query |
|
from fastapi.security import OAuth2PasswordRequestForm |
|
from models.user import UserSignup |
|
from db.supabase_service import get_supabase |
|
from typing import Annotated |
|
from supabase import Client |
|
from utils.auth import get_id |
|
from fastapi.encoders import jsonable_encoder |
|
from models.enums import Role |
|
from gotrue.errors import AuthApiError |
|
from crud.user import update_user |
|
from utils.exceptions import BAD_REQUEST, CONFLICT, NOT_FOUND |
|
|
|
router = APIRouter(tags=["Authentication"]) |
|
|
|
|
|
@router.post("/login", description="Login user using email and password") |
|
async def login( |
|
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], |
|
supabase: Annotated[Client, Depends(get_supabase)], |
|
): |
|
email = form_data.username |
|
password = form_data.password |
|
try: |
|
user = supabase.auth.sign_in_with_password( |
|
{"email": email, "password": password} |
|
) |
|
return { |
|
"access_token": user.session.access_token, |
|
"token_type": "bearer", |
|
} |
|
except: |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="Incorrect email or password", |
|
) |
|
|
|
|
|
@router.post( |
|
"/signup", description="Sign up new user", status_code=status.HTTP_201_CREATED |
|
) |
|
async def signup( |
|
user: Annotated[UserSignup, Body()], |
|
supabase: Annotated[Client, Depends(get_supabase)], |
|
): |
|
email = user.email |
|
password = user.password |
|
if len(password) < 6: |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="Password should be at least 6 characters", |
|
) |
|
try: |
|
user.role = Role.user |
|
res = supabase.auth.sign_up( |
|
{ |
|
"email": email, |
|
"password": password, |
|
"options": { |
|
"data": jsonable_encoder(user, exclude=("password", "email")) |
|
}, |
|
} |
|
) |
|
return {"detail": "User created"} |
|
except AuthApiError: |
|
raise CONFLICT |
|
except: |
|
raise BAD_REQUEST |
|
|
|
|
|
@router.post("/reset_password", description="Send reset password email") |
|
async def reset_password( |
|
supabase: Annotated[Client, Depends(get_supabase)], |
|
email: str, |
|
): |
|
try: |
|
if ( |
|
len(supabase.rpc("get_user_id_by_email", {"email": email}).execute().data) |
|
> 0 |
|
): |
|
supabase.auth.reset_password_email(email) |
|
return {"detail": "Reset password email sent"} |
|
else: |
|
raise NOT_FOUND |
|
except: |
|
raise NOT_FOUND |
|
|
|
|
|
@router.post("/reset_password_confirm", description="Reset password") |
|
async def reset_password_confirm( |
|
supabase: Annotated[Client, Depends(get_supabase)], |
|
email: Annotated[str, Query(description="Email", title="Email")], |
|
new_password: Annotated[ |
|
str, Query(min_length=6, description="New password", title="New password") |
|
], |
|
token: Annotated[ |
|
str, Query(description="Reset password token", title="Reset password token") |
|
], |
|
): |
|
try: |
|
supabase.auth.verify_otp( |
|
{ |
|
"email": email, |
|
"token": token, |
|
"type": "email", |
|
} |
|
) |
|
await update_user(supabase, password=new_password) |
|
return {"detail": "Your password has been reset"} |
|
except: |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="Incorrect token", |
|
) |
|
|