Emmanuel Frimpong Asante commited on
Commit
658e692
·
1 Parent(s): 990e191

update space

Browse files
app.py CHANGED
@@ -1,17 +1,27 @@
1
  # app.py
2
 
3
- from fastapi import FastAPI, Request
4
  from fastapi.templating import Jinja2Templates
5
  from fastapi.responses import HTMLResponse
6
- from routes.authentication import auth_router
7
  from fastapi.staticfiles import StaticFiles
 
 
8
 
 
9
  app = FastAPI()
10
  templates = Jinja2Templates(directory="templates")
11
- # app.mount("/static", StaticFiles(directory="static"), name="static")
12
- app.include_router(auth_router, prefix="/auth", tags=["Authentication"])
13
 
 
 
 
 
 
 
 
 
 
14
 
15
  @app.get("/", response_class=HTMLResponse)
16
  async def landing_page(request: Request):
17
- return templates.TemplateResponse("landing.html", {"request": request})
 
 
1
  # app.py
2
 
3
+ from fastapi import FastAPI, Request, HTTPException
4
  from fastapi.templating import Jinja2Templates
5
  from fastapi.responses import HTMLResponse
 
6
  from fastapi.staticfiles import StaticFiles
7
+ from routes.authentication import auth_router
8
+ import os
9
 
10
+ # Initialize FastAPI app and template directory
11
  app = FastAPI()
12
  templates = Jinja2Templates(directory="templates")
 
 
13
 
14
+ # Mount static files if the directory exists
15
+ static_dir = "static"
16
+ if os.path.isdir(static_dir):
17
+ app.mount("/static", StaticFiles(directory=static_dir), name="static")
18
+ else:
19
+ raise HTTPException(status_code=500, detail="Static directory not found.")
20
+
21
+ # Include authentication routes
22
+ app.include_router(auth_router, prefix="/auth", tags=["Authentication"])
23
 
24
  @app.get("/", response_class=HTMLResponse)
25
  async def landing_page(request: Request):
26
+ """Render the landing page template."""
27
+ return templates.TemplateResponse("landing.html", {"request": request})
requirements.txt CHANGED
@@ -1,10 +1,7 @@
1
  fastapi~=0.115.4
2
- passlib[bcrypt]
3
- pydantic[email]
4
  pymongo~=4.9.2
5
- pyjwt~=2.9.0
6
  python-dotenv~=1.0.1
7
- uvicorn~=0.32.0
8
- python-jose[cryptography]
9
- jinja2
10
- python-multipart
 
1
  fastapi~=0.115.4
2
+ passlib[bcrypt]~=1.7.4
3
+ pydantic[email]~=2.9.2
4
  pymongo~=4.9.2
 
5
  python-dotenv~=1.0.1
6
+ python-jose[cryptography]~=3.3.0
7
+ starlette~=0.41.2
 
 
routes/authentication.py CHANGED
@@ -1,10 +1,12 @@
1
  # routes/authentication.py
2
 
3
  from datetime import timedelta, datetime
4
- from fastapi import APIRouter, Depends, status, Request
5
  from fastapi.responses import RedirectResponse, JSONResponse
6
  from fastapi.security import OAuth2PasswordRequestForm
7
  from fastapi.templating import Jinja2Templates
 
 
8
  from models.schemas.user_schema import UserCreate, UserResponse
9
  from services.auth_service import (
10
  create_user,
@@ -20,53 +22,66 @@ auth_router = APIRouter()
20
  templates = Jinja2Templates(directory="templates")
21
 
22
 
23
-
24
- @auth_router.get("/register", response_class=JSONResponse)
25
  async def register_page(request: Request):
26
- """Renders the registration form."""
27
  return templates.TemplateResponse("auth/register.html", {"request": request})
28
 
29
 
30
  @auth_router.post("/register", response_model=UserResponse)
31
  async def register(request: Request, user: UserCreate):
32
- """Handles user registration."""
33
  result = create_user(user)
34
  if result["status"] == "error":
35
- return templates.TemplateResponse("auth/register.html", {"request": request, "error": result["message"]})
 
 
 
36
  return RedirectResponse(url="/auth/login", status_code=status.HTTP_302_FOUND)
37
 
38
 
39
- @auth_router.get("/login", response_class=JSONResponse)
40
  async def login_page(request: Request):
41
- """Renders the login form."""
42
  return templates.TemplateResponse("auth/login.html", {"request": request})
43
 
44
 
45
  @auth_router.post("/login", response_model=Token)
46
  async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
47
- """Handles user login and JWT issuance."""
48
  user = authenticate_user(form_data.username, form_data.password)
49
  if not user:
50
- return templates.TemplateResponse("auth/login.html",
51
- {"request": request, "error": "Invalid username or password"})
 
 
52
 
53
  access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
54
  access_token = create_access_token(data={"sub": user["username"]}, expires_delta=access_token_expires)
55
 
56
  response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
57
- response.set_cookie(key="access_token", value=f"Bearer {access_token}", httponly=True)
 
 
 
 
 
 
 
58
  return response
59
 
60
 
61
  @auth_router.get("/me", response_model=UserResponse)
62
  async def read_users_me(current_user: str = Depends(get_current_user)):
63
- """Fetches the current authenticated user information."""
 
 
64
  return {"username": current_user}
65
 
66
 
67
  @auth_router.get("/logout")
68
  async def logout(request: Request, token: str = Depends(oauth2_scheme)):
69
- """Logs out the user by blacklisting the token and clearing cookies."""
70
  expiration = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
71
  blacklist_token(token, expiration)
72
 
 
1
  # routes/authentication.py
2
 
3
  from datetime import timedelta, datetime
4
+ from fastapi import APIRouter, Depends, status, Request, HTTPException
5
  from fastapi.responses import RedirectResponse, JSONResponse
6
  from fastapi.security import OAuth2PasswordRequestForm
7
  from fastapi.templating import Jinja2Templates
8
+ from starlette.responses import HTMLResponse
9
+
10
  from models.schemas.user_schema import UserCreate, UserResponse
11
  from services.auth_service import (
12
  create_user,
 
22
  templates = Jinja2Templates(directory="templates")
23
 
24
 
25
+ @auth_router.get("/register", response_class=HTMLResponse)
 
26
  async def register_page(request: Request):
27
+ """Render the registration form."""
28
  return templates.TemplateResponse("auth/register.html", {"request": request})
29
 
30
 
31
  @auth_router.post("/register", response_model=UserResponse)
32
  async def register(request: Request, user: UserCreate):
33
+ """Handle user registration and redirect to login upon success."""
34
  result = create_user(user)
35
  if result["status"] == "error":
36
+ return templates.TemplateResponse("auth/register.html", {
37
+ "request": request,
38
+ "error": result["message"]
39
+ })
40
  return RedirectResponse(url="/auth/login", status_code=status.HTTP_302_FOUND)
41
 
42
 
43
+ @auth_router.get("/login", response_class=HTMLResponse)
44
  async def login_page(request: Request):
45
+ """Render the login form."""
46
  return templates.TemplateResponse("auth/login.html", {"request": request})
47
 
48
 
49
  @auth_router.post("/login", response_model=Token)
50
  async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
51
+ """Authenticate user, issue JWT token, and set it in a cookie."""
52
  user = authenticate_user(form_data.username, form_data.password)
53
  if not user:
54
+ return templates.TemplateResponse("auth/login.html", {
55
+ "request": request,
56
+ "error": "Invalid username or password"
57
+ })
58
 
59
  access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
60
  access_token = create_access_token(data={"sub": user["username"]}, expires_delta=access_token_expires)
61
 
62
  response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
63
+ response.set_cookie(
64
+ key="access_token",
65
+ value=f"Bearer {access_token}",
66
+ httponly=True,
67
+ max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
68
+ secure=True,
69
+ samesite="Lax"
70
+ )
71
  return response
72
 
73
 
74
  @auth_router.get("/me", response_model=UserResponse)
75
  async def read_users_me(current_user: str = Depends(get_current_user)):
76
+ """Retrieve the current authenticated user information."""
77
+ if not current_user:
78
+ raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
79
  return {"username": current_user}
80
 
81
 
82
  @auth_router.get("/logout")
83
  async def logout(request: Request, token: str = Depends(oauth2_scheme)):
84
+ """Logout the user by blacklisting the token and clearing cookies."""
85
  expiration = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
86
  blacklist_token(token, expiration)
87
 
services/auth_service.py CHANGED
@@ -2,12 +2,12 @@
2
 
3
  import os
4
  from datetime import datetime, timedelta
5
- from typing import Optional
6
  from fastapi.security import OAuth2PasswordBearer
7
  from passlib.context import CryptContext
8
  from pymongo.errors import DuplicateKeyError
9
  from models.schemas.user_schema import UserCreate
10
- from fastapi import Depends, HTTPException, status
11
  from pydantic import BaseModel
12
  from services.utils import mongo_instance
13
  from jose import JWTError, jwt
@@ -15,45 +15,42 @@ from jose import JWTError, jwt
15
  # OAuth2 scheme for FastAPI
16
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
17
 
18
-
19
  # Password hashing context
20
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
21
 
22
  # JWT configuration
23
- SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key") # Replace with a secure key in production
24
  ALGORITHM = "HS256"
25
  ACCESS_TOKEN_EXPIRE_MINUTES = 30
26
 
27
  # MongoDB Collection
28
  user_collection = mongo_instance.get_collection("users")
 
29
 
30
 
31
  class Token(BaseModel):
32
  access_token: str
33
  token_type: str
34
 
35
- blacklist_collection = mongo_instance.get_collection("token_blacklist")
36
 
37
  def blacklist_token(token: str, expiration: datetime):
38
- """Add token to blacklist."""
39
  blacklist_collection.insert_one({"token": token, "expires": expiration})
40
 
 
41
  def is_token_blacklisted(token: str) -> bool:
42
- """Check if a token is in the blacklist."""
43
  entry = blacklist_collection.find_one({"token": token})
44
  return entry is not None
45
 
46
- # Modify decode_access_token to check blacklist
47
- def decode_access_token(token: str):
 
48
  if is_token_blacklisted(token):
49
  return None
50
- """Decode and validate JWT token."""
51
  try:
52
  payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
53
- username: str = payload.get("sub")
54
- if username is None:
55
- return None
56
- return username
57
  except JWTError:
58
  return None
59
 
@@ -63,8 +60,8 @@ def hash_password(password: str) -> str:
63
  return pwd_context.hash(password)
64
 
65
 
66
- def create_user(user_data: UserCreate):
67
- """Registers a new user with hashed password."""
68
  try:
69
  hashed_password = hash_password(user_data.password)
70
  user_document = {
@@ -78,7 +75,7 @@ def create_user(user_data: UserCreate):
78
  except DuplicateKeyError:
79
  return {"status": "error", "message": "Username or email already exists."}
80
  except Exception as e:
81
- return {"status": "error", "message": f"Error registering user: {e}"}
82
 
83
 
84
  def verify_password(plain_password: str, hashed_password: str) -> bool:
@@ -86,19 +83,18 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
86
  return pwd_context.verify(plain_password, hashed_password)
87
 
88
 
89
- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
90
- """Creates a JWT access token."""
91
  to_encode = data.copy()
92
- if expires_delta:
93
- expire = datetime.utcnow() + expires_delta
94
- else:
95
- expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
96
  to_encode.update({"exp": expire})
97
  return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
98
 
99
- def get_current_user(token: str = Depends(oauth2_scheme)):
 
 
100
  username = decode_access_token(token)
101
- if not username:
102
  raise HTTPException(
103
  status_code=status.HTTP_401_UNAUTHORIZED,
104
  detail="Could not validate credentials",
@@ -107,8 +103,8 @@ def get_current_user(token: str = Depends(oauth2_scheme)):
107
  return username
108
 
109
 
110
- def authenticate_user(username: str, password: str):
111
- """Authenticate user credentials."""
112
  user = user_collection.find_one({"username": username})
113
  if user and verify_password(password, user["password"]):
114
  return user
 
2
 
3
  import os
4
  from datetime import datetime, timedelta
5
+ from typing import Optional, Union
6
  from fastapi.security import OAuth2PasswordBearer
7
  from passlib.context import CryptContext
8
  from pymongo.errors import DuplicateKeyError
9
  from models.schemas.user_schema import UserCreate
10
+ from fastapi import Depends, HTTPException, status
11
  from pydantic import BaseModel
12
  from services.utils import mongo_instance
13
  from jose import JWTError, jwt
 
15
  # OAuth2 scheme for FastAPI
16
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
17
 
 
18
  # Password hashing context
19
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
20
 
21
  # JWT configuration
22
+ SECRET_KEY = os.getenv("SECRET_KEY", "your-secure-secret-key") # Replace in production
23
  ALGORITHM = "HS256"
24
  ACCESS_TOKEN_EXPIRE_MINUTES = 30
25
 
26
  # MongoDB Collection
27
  user_collection = mongo_instance.get_collection("users")
28
+ blacklist_collection = mongo_instance.get_collection("token_blacklist")
29
 
30
 
31
  class Token(BaseModel):
32
  access_token: str
33
  token_type: str
34
 
 
35
 
36
  def blacklist_token(token: str, expiration: datetime):
37
+ """Adds a token to the blacklist with its expiration time."""
38
  blacklist_collection.insert_one({"token": token, "expires": expiration})
39
 
40
+
41
  def is_token_blacklisted(token: str) -> bool:
42
+ """Checks if a token is blacklisted."""
43
  entry = blacklist_collection.find_one({"token": token})
44
  return entry is not None
45
 
46
+
47
+ def decode_access_token(token: str) -> Optional[str]:
48
+ """Decodes and validates a JWT token, checking against the blacklist."""
49
  if is_token_blacklisted(token):
50
  return None
 
51
  try:
52
  payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
53
+ return payload.get("sub")
 
 
 
54
  except JWTError:
55
  return None
56
 
 
60
  return pwd_context.hash(password)
61
 
62
 
63
+ def create_user(user_data: UserCreate) -> dict:
64
+ """Registers a new user with hashed password and stores in MongoDB."""
65
  try:
66
  hashed_password = hash_password(user_data.password)
67
  user_document = {
 
75
  except DuplicateKeyError:
76
  return {"status": "error", "message": "Username or email already exists."}
77
  except Exception as e:
78
+ return {"status": "error", "message": f"Error registering user: {str(e)}"}
79
 
80
 
81
  def verify_password(plain_password: str, hashed_password: str) -> bool:
 
83
  return pwd_context.verify(plain_password, hashed_password)
84
 
85
 
86
+ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
87
+ """Creates a JWT access token with an expiration time."""
88
  to_encode = data.copy()
89
+ expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
 
 
 
90
  to_encode.update({"exp": expire})
91
  return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
92
 
93
+
94
+ def get_current_user(token: str = Depends(oauth2_scheme)) -> Union[str, None]:
95
+ """Retrieves the current user by decoding the access token."""
96
  username = decode_access_token(token)
97
+ if username is None:
98
  raise HTTPException(
99
  status_code=status.HTTP_401_UNAUTHORIZED,
100
  detail="Could not validate credentials",
 
103
  return username
104
 
105
 
106
+ def authenticate_user(username: str, password: str) -> Optional[dict]:
107
+ """Authenticates user credentials by checking the MongoDB records."""
108
  user = user_collection.find_one({"username": username})
109
  if user and verify_password(password, user["password"]):
110
  return user
services/utils.py CHANGED
@@ -1,6 +1,7 @@
1
  # services/utils.py
2
 
3
  import os
 
4
  from dotenv import load_dotenv
5
  from pymongo import MongoClient
6
  from pymongo.errors import ConnectionFailure, OperationFailure
@@ -11,28 +12,38 @@ load_dotenv()
11
  # MongoDB Connection URI
12
  MONGO_URI = os.getenv("MONGO_URI")
13
 
14
- # Initialize MongoDB client and database connection
 
 
 
15
  class MongoDB:
16
  def __init__(self, uri: str, db_name: str):
 
17
  self.client = MongoClient(uri)
18
  self.db = self.client[db_name]
 
19
 
20
  def test_connection(self) -> str:
21
- """Tests MongoDB connection by performing a basic operation."""
22
  try:
23
- self.db.test_collection.insert_one({"status": "connection_test"})
24
- return "Connection successful: Test document inserted!"
 
 
 
25
  except (ConnectionFailure, OperationFailure) as e:
 
26
  return f"Database connection error: {e}"
27
 
28
  def get_collection(self, collection_name: str):
29
- """Fetches a MongoDB collection."""
30
  return self.db[collection_name]
31
 
32
  # Instantiate MongoDB with URI and database name
33
  mongo_instance = MongoDB(uri=MONGO_URI, db_name="poultry_management")
34
 
 
35
  def test_db_connection():
36
- """Tests the database connection and prints result."""
37
  result = mongo_instance.test_connection()
38
  print(result)
 
1
  # services/utils.py
2
 
3
  import os
4
+ import logging
5
  from dotenv import load_dotenv
6
  from pymongo import MongoClient
7
  from pymongo.errors import ConnectionFailure, OperationFailure
 
12
  # MongoDB Connection URI
13
  MONGO_URI = os.getenv("MONGO_URI")
14
 
15
+ # Configure logging
16
+ logging.basicConfig(level=logging.INFO)
17
+ logger = logging.getLogger(__name__)
18
+
19
  class MongoDB:
20
  def __init__(self, uri: str, db_name: str):
21
+ """Initializes the MongoDB client and connects to the specified database."""
22
  self.client = MongoClient(uri)
23
  self.db = self.client[db_name]
24
+ logger.info(f"Connected to MongoDB database: {db_name}")
25
 
26
  def test_connection(self) -> str:
27
+ """Tests MongoDB connection by inserting and then deleting a test document."""
28
  try:
29
+ test_doc = {"status": "connection_test"}
30
+ self.db.test_collection.insert_one(test_doc)
31
+ self.db.test_collection.delete_many({"status": "connection_test"}) # Cleanup
32
+ logger.info("Connection successful: Test document inserted and removed!")
33
+ return "Connection successful!"
34
  except (ConnectionFailure, OperationFailure) as e:
35
+ logger.error(f"Database connection error: {e}")
36
  return f"Database connection error: {e}"
37
 
38
  def get_collection(self, collection_name: str):
39
+ """Fetches a MongoDB collection with the given name."""
40
  return self.db[collection_name]
41
 
42
  # Instantiate MongoDB with URI and database name
43
  mongo_instance = MongoDB(uri=MONGO_URI, db_name="poultry_management")
44
 
45
+
46
  def test_db_connection():
47
+ """Tests the database connection and logs the result."""
48
  result = mongo_instance.test_connection()
49
  print(result)