Emmanuel Frimpong Asante
"Update space"
25fb6a0
raw
history blame
4.7 kB
# auth/auth_controller.py
import bcrypt
import jwt
import datetime
from bson.objectid import ObjectId
from flask import jsonify, request
from functools import wraps
from config.db import users_collection # Importing the users collection from the database config
from config.settings import SECRET_KEY # Import the secret key from settings
# JWT decorator to protect routes
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'x-access-token' in request.headers:
token = request.headers['x-access-token']
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
current_user = users_collection.find_one({"_id": ObjectId(data["user_id"])})
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token has expired!'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': 'Token is invalid!'}), 401
return f(current_user, *args, **kwargs)
return decorated
# Register a new user
def register():
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Missing username or password"}), 400
# Check if user already exists
if users_collection.find_one({"username": data['username']}):
return jsonify({"message": "User already exists"}), 400
# Hash the password
hashed_password = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Create user document
new_user = {
"username": data['username'],
"password": hashed_password,
"role": data.get('role', 'user'), # Default role is 'user', could be 'manager'
"created_at": datetime.datetime.utcnow()
}
# Insert user into MongoDB
result = users_collection.insert_one(new_user)
return jsonify({"message": "User registered successfully", "user_id": str(result.inserted_id)}), 201
# User login
def login():
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Missing username or password"}), 400
# Find user in MongoDB
user = users_collection.find_one({"username": data['username']})
if not user:
return jsonify({"message": "User not found"}), 404
# Verify password
if bcrypt.checkpw(data['password'].encode('utf-8'), user['password'].encode('utf-8')):
# Create JWT token
token = jwt.encode({
'user_id': str(user['_id']),
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
}, SECRET_KEY, algorithm="HS256")
return jsonify({"token": token}), 200
else:
return jsonify({"message": "Invalid password"}), 401
# Get user info (protected route)
@token_required
def get_user_info(current_user):
user_info = {
"username": current_user['username'],
"role": current_user['role'],
"created_at": current_user['created_at']
}
return jsonify(user_info), 200
# Update user info (for manager/admin use)
@token_required
def update_user_info(current_user):
if current_user['role'] != 'manager': # Only managers can update user info
return jsonify({"message": "Permission denied"}), 403
data = request.get_json()
if not data or not data.get('user_id'):
return jsonify({"message": "Missing user_id"}), 400
user = users_collection.find_one({"_id": ObjectId(data['user_id'])})
if not user:
return jsonify({"message": "User not found"}), 404
# Update user information (only certain fields)
update_fields = {}
if 'username' in data:
update_fields['username'] = data['username']
if 'role' in data:
update_fields['role'] = data['role']
users_collection.update_one({"_id": ObjectId(data['user_id'])}, {"$set": update_fields})
return jsonify({"message": "User information updated successfully"}), 200
# Delete a user (for manager/admin use)
@token_required
def delete_user(current_user):
if current_user['role'] != 'manager': # Only managers can delete users
return jsonify({"message": "Permission denied"}), 403
data = request.get_json()
if not data or not data.get('user_id'):
return jsonify({"message": "Missing user_id"}), 400
result = users_collection.delete_one({"_id": ObjectId(data['user_id'])})
if result.deleted_count == 0:
return jsonify({"message": "User not found"}), 404
return jsonify({"message": "User deleted successfully"}), 200