Emmanuel Frimpong Asante
update space
4bbeff4
raw
history blame
9.45 kB
# File: app.py
# Import necessary libraries
import os
import logging
from pymongo import MongoClient, errors
from datetime import datetime
from werkzeug.security import generate_password_hash
from services.utils import PoultryFarmBot
from services.chatbot import build_chatbot_interface
from transformers import AutoModelForCausalLM, AutoTokenizer
from flask import Flask, render_template, request, redirect, url_for, session
from celery import Celery
import time
import threading
import gunicorn.app.base
from gunicorn.six import iteritems
# Setup logging for better monitoring
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Flask app setup
app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "default_secret_key")
# Celery configuration
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0")
app.config['CELERY_BROKER_URL'] = CELERY_BROKER_URL
app.config['CELERY_RESULT_BACKEND'] = CELERY_BROKER_URL
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
# MongoDB Setup for logging and audit
MONGO_URI = os.environ.get("MONGO_URI")
if not MONGO_URI:
logger.error("MONGO_URI is not set in the environment variables.")
raise ValueError("MONGO_URI environment variable is required but not set.")
# Retry logic for MongoDB connection
max_retries = 3
for attempt in range(max_retries):
try:
logger.info(f"Connecting to MongoDB (Attempt {attempt + 1}/{max_retries}).")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # Timeout after 5 seconds
client.server_info() # Trigger exception if cannot connect
db = client.poultry_farm # Connect to the 'poultry_farm' database
enquiries_collection = db.enquiries # Collection to store farmer enquiries
users_collection = db.users # Collection to store user credentials
logs_collection = db.logs # Collection to store application logs
break
except errors.ServerSelectionTimeoutError as e:
logger.error(f"Failed to connect to MongoDB (Attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(5) # Wait for 5 seconds before retrying
else:
raise ConnectionError("Could not connect to MongoDB. Please check the MONGO_URI and ensure the database is running.")
def log_to_db(level, message):
try:
log_entry = {
"level": level,
"message": message,
"timestamp": datetime.utcnow()
}
logs_collection.insert_one(log_entry)
except Exception as e:
logger.error(f"Failed to log to database: {e}")
# Override logger methods to also log to MongoDB
class MongoHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
try:
log_to_db(record.levelname, log_entry)
except Exception as e:
logger.error(f"Failed to emit log to MongoDB: {e}")
mongo_handler = MongoHandler()
mongo_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
mongo_handler.setFormatter(formatter)
logger.addHandler(mongo_handler)
# Hugging Face token setup
tok = os.environ.get('HF_TOKEN')
if tok:
# Log in to Hugging Face using the token from environment variables
logger.info("Logging in to Hugging Face.")
else:
logger.warning("Hugging Face token not found in environment variables.")
# Initialize the bot instance
logger.info("Initializing PoultryFarmBot instance.")
bot = PoultryFarmBot(db)
# Global model and tokenizer variables
model = None
tokenizer = None
model_loading_event = threading.Event()
# Celery task to load the model and tokenizer
@celery.task
def load_model_and_tokenizer():
global model, tokenizer
try:
logger.info("Loading Llama 3.2 model and tokenizer.")
model_name = "meta-llama/Llama-3.2-3B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
if tokenizer.pad_token is None:
logger.info("Adding padding token to tokenizer.")
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
model.resize_token_embeddings(len(tokenizer))
logger.info("Model and tokenizer loaded successfully.")
model_loading_event.set() # Set the event to indicate that the model is loaded
except Exception as e:
logger.error(f"Failed to load Llama 3.2 model or tokenizer: {e}")
model_loading_event.set() # Set the event even if loading fails to prevent indefinite waiting
raise RuntimeError("Could not load the Llama 3.2 model or tokenizer. Please check the configuration.")
# Authentication function
def authenticate_user(username, password):
"""
Authenticate a user with username and password.
Args:
username (str): Username for authentication.
password (str): Password for authentication.
Returns:
bool: True if authentication is successful, False otherwise.
"""
user = bot.authenticate_user(username, password)
if user:
return True, user
else:
return False, None
# Registration function
def register_user(username, password):
"""
Register a new user with username and password.
Args:
username (str): Username for registration.
password (str): Password for registration.
Returns:
bool: True if registration is successful, False otherwise.
"""
try:
if users_collection.find_one({"username": username}):
logger.warning("Username already exists: %s", username)
return False
hashed_password = generate_password_hash(password)
users_collection.insert_one({"username": username, "password": hashed_password})
logger.info("User registered successfully: %s", username)
return True
except Exception as e:
logger.error(f"Failed to register user: {e}")
return False
# Flask routes for AdminLTE authentication
@app.route('/', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
success, user = authenticate_user(username, password)
if success:
logger.info("Authentication successful for user: %s", username)
session['username'] = username
# Start loading the model asynchronously after successful login
load_model_and_tokenizer.apply_async()
return redirect(url_for('chatbot'))
else:
logger.warning("Authentication failed for user: %s", username)
return render_template('login.html', error="Invalid username or password.")
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
success = register_user(username, password)
if success:
logger.info("Registration successful for user: %s", username)
return redirect(url_for('login'))
else:
logger.warning("Registration failed for user: %s", username)
return render_template('register.html', error="Username already exists. Please choose a different one.")
return render_template('register.html')
@app.route('/chatbot')
def chatbot():
if 'username' not in session:
return redirect(url_for('login'))
username = session['username']
# Wait until the model is loaded with a timeout mechanism
if not model_loading_event.wait(timeout=100): # Timeout after 100 seconds
return "Model loading timed out. Please try again later."
if model is None or tokenizer is None:
return "Model failed to load. Please try again later."
logger.info("Launching Gradio chatbot interface for user: %s", username)
chatbot_interface = build_chatbot_interface({'username': username})
return chatbot_interface.launch(inline=True, share=True)
# Custom Gunicorn application to run Flask safely in a production environment like Hugging Face Space Docker
class StandaloneApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, options=None):
self.application = app
self.options = options or {}
super().__init__()
def load_config(self):
config = {key: value for key, value in iteritems(self.options) if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
# Launch the Flask application using Gunicorn in a production-safe manner
if __name__ == "__main__":
try:
options = {
'bind': '%s:%s' % ('0.0.0.0', '5000'),
'workers': 2, # Set number of worker processes
'threads': 4, # Set number of threads per worker
'timeout': 120, # Set timeout for workers
}
StandaloneApplication(app, options).run()
except Exception as e:
logger.error(f"Failed to launch Flask server: {e}")
raise RuntimeError("Could not launch the Flask server. Please check the application setup.")