|
|
|
|
|
|
|
|
import bcrypt |
|
|
import jwt |
|
|
import datetime |
|
|
from bson.objectid import ObjectId |
|
|
from flask import jsonify, request |
|
|
from functools import wraps |
|
|
from config.db import users_collection |
|
|
from config.settings import SECRET_KEY |
|
|
|
|
|
|
|
|
|
|
|
def token_required(f): |
|
|
@wraps(f) |
|
|
def decorated(*args, **kwargs): |
|
|
token = None |
|
|
if 'x-access-token' in request.headers: |
|
|
token = request.headers['x-access-token'] |
|
|
if not token: |
|
|
return jsonify({'message': 'Token is missing!'}), 401 |
|
|
try: |
|
|
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) |
|
|
current_user = users_collection.find_one({"_id": ObjectId(data["user_id"])}) |
|
|
except jwt.ExpiredSignatureError: |
|
|
return jsonify({'message': 'Token has expired!'}), 401 |
|
|
except jwt.InvalidTokenError: |
|
|
return jsonify({'message': 'Token is invalid!'}), 401 |
|
|
return f(current_user, *args, **kwargs) |
|
|
|
|
|
return decorated |
|
|
|
|
|
|
|
|
|
|
|
def register(): |
|
|
data = request.get_json() |
|
|
if not data or not data.get('username') or not data.get('password'): |
|
|
return jsonify({"message": "Missing username or password"}), 400 |
|
|
|
|
|
|
|
|
if users_collection.find_one({"username": data['username']}): |
|
|
return jsonify({"message": "User already exists"}), 400 |
|
|
|
|
|
|
|
|
hashed_password = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt()).decode('utf-8') |
|
|
|
|
|
|
|
|
new_user = { |
|
|
"username": data['username'], |
|
|
"password": hashed_password, |
|
|
"role": data.get('role', 'user'), |
|
|
"created_at": datetime.datetime.utcnow() |
|
|
} |
|
|
|
|
|
|
|
|
result = users_collection.insert_one(new_user) |
|
|
return jsonify({"message": "User registered successfully", "user_id": str(result.inserted_id)}), 201 |
|
|
|
|
|
|
|
|
|
|
|
def login(): |
|
|
data = request.get_json() |
|
|
if not data or not data.get('username') or not data.get('password'): |
|
|
return jsonify({"message": "Missing username or password"}), 400 |
|
|
|
|
|
|
|
|
user = users_collection.find_one({"username": data['username']}) |
|
|
if not user: |
|
|
return jsonify({"message": "User not found"}), 404 |
|
|
|
|
|
|
|
|
if bcrypt.checkpw(data['password'].encode('utf-8'), user['password'].encode('utf-8')): |
|
|
|
|
|
token = jwt.encode({ |
|
|
'user_id': str(user['_id']), |
|
|
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24) |
|
|
}, SECRET_KEY, algorithm="HS256") |
|
|
|
|
|
return jsonify({"token": token}), 200 |
|
|
else: |
|
|
return jsonify({"message": "Invalid password"}), 401 |
|
|
|
|
|
|
|
|
|
|
|
@token_required |
|
|
def get_user_info(current_user): |
|
|
user_info = { |
|
|
"username": current_user['username'], |
|
|
"role": current_user['role'], |
|
|
"created_at": current_user['created_at'] |
|
|
} |
|
|
return jsonify(user_info), 200 |
|
|
|
|
|
|
|
|
|
|
|
@token_required |
|
|
def update_user_info(current_user): |
|
|
if current_user['role'] != 'manager': |
|
|
return jsonify({"message": "Permission denied"}), 403 |
|
|
|
|
|
data = request.get_json() |
|
|
if not data or not data.get('user_id'): |
|
|
return jsonify({"message": "Missing user_id"}), 400 |
|
|
|
|
|
user = users_collection.find_one({"_id": ObjectId(data['user_id'])}) |
|
|
if not user: |
|
|
return jsonify({"message": "User not found"}), 404 |
|
|
|
|
|
|
|
|
update_fields = {} |
|
|
if 'username' in data: |
|
|
update_fields['username'] = data['username'] |
|
|
if 'role' in data: |
|
|
update_fields['role'] = data['role'] |
|
|
|
|
|
users_collection.update_one({"_id": ObjectId(data['user_id'])}, {"$set": update_fields}) |
|
|
return jsonify({"message": "User information updated successfully"}), 200 |
|
|
|
|
|
|
|
|
|
|
|
@token_required |
|
|
def delete_user(current_user): |
|
|
if current_user['role'] != 'manager': |
|
|
return jsonify({"message": "Permission denied"}), 403 |
|
|
|
|
|
data = request.get_json() |
|
|
if not data or not data.get('user_id'): |
|
|
return jsonify({"message": "Missing user_id"}), 400 |
|
|
|
|
|
result = users_collection.delete_one({"_id": ObjectId(data['user_id'])}) |
|
|
if result.deleted_count == 0: |
|
|
return jsonify({"message": "User not found"}), 404 |
|
|
return jsonify({"message": "User deleted successfully"}), 200 |
|
|
|