# routes/chatbot.py import cv2 import numpy as np from fastapi.templating import Jinja2Templates from services.utils import db from fastapi import APIRouter, WebSocket, WebSocketDisconnect, UploadFile, File, Depends, Request from fastapi.responses import HTMLResponse, JSONResponse from datetime import datetime from services.disease_detection_service import bot from models.schemas.inquiry_schema import Inquiry from services.auth_service import get_current_user from typing import List chatbot_router = APIRouter() templates = Jinja2Templates(directory="templates") # MongoDB collection for inquiries inquiry_collection = db.get_collection("inquiries") # Store active WebSocket connections active_connections: List[WebSocket] = [] async def greeting_based_on_time(): current_hour = datetime.now().hour if 5 <= current_hour < 12: return "Good morning" elif 12 <= current_hour < 17: return "Good afternoon" else: return "Good evening" @chatbot_router.get("/", response_class=HTMLResponse) async def chatbot_page(request: Request, current_user: str = Depends(get_current_user)): greeting = await greeting_based_on_time() context = {"request": request, "greeting": f"{greeting}, {current_user}! How can I help you today?"} return templates.TemplateResponse("chatbot.html", context) @chatbot_router.websocket("/ws/chat") async def websocket_endpoint(websocket: WebSocket, current_user: str = Depends(get_current_user)): """Handle real-time WebSocket connections for chat and sidebar updates.""" await websocket.accept() active_connections.append(websocket) try: while True: data = await websocket.receive_text() response_text, detected_disease, confidence = await handle_inquiry(data, current_user) await websocket.send_json({"response": response_text, "disease": detected_disease, "confidence": confidence}) await broadcast_inquiry_history(current_user) # Update sidebar for all users except WebSocketDisconnect: active_connections.remove(websocket) async def handle_inquiry(text: str, current_user: str, file: UploadFile = None): """Process inquiries asynchronously and update MongoDB.""" detected_disease, disease_confidence = None, None response_text = "" # Handle image inquiries if file: image_data = await file.read() image_np = np.frombuffer(image_data, np.uint8) response_text, detected_disease, status, recommendation, disease_confidence = bot.diagnose_and_respond( cv2.imdecode(image_np, cv2.IMREAD_COLOR) ) else: response_text = bot.llama_response(text) # Save inquiry to MongoDB inquiry = Inquiry( user_id=current_user, inquiry_text=text, response_text=response_text, attached_image=file.filename if file else None, detected_disease=detected_disease, disease_confidence=disease_confidence ) inquiry_collection.insert_one(inquiry.dict(by_alias=True)) return response_text, detected_disease, disease_confidence async def broadcast_inquiry_history(user_id: str): """Send updated inquiry history to all WebSocket clients.""" history = list(inquiry_collection.find({"user_id": user_id}).sort("timestamp", -1).limit(10)) for connection in active_connections: await connection.send_json({"history": history}) @chatbot_router.post("/inquire") async def inquire(text: str, current_user: str = Depends(get_current_user), file: UploadFile = None): """Handle inquiries with text and optional image for disease detection.""" response_text = "" detected_disease, disease_confidence = None, None # Handle image inquiries with disease detection if file: image_data = await file.read() image_np = np.frombuffer(image_data, np.uint8) # Convert to NumPy array response, disease_name, status, recommendation, confidence = bot.diagnose_and_respond(cv2.imdecode(image_np, cv2.IMREAD_COLOR)) response_text = response detected_disease, disease_confidence = disease_name, confidence else: # General inquiries using Llama model response_text = bot.llama_response(text) # Log inquiry into MongoDB inquiry = Inquiry( user_id=current_user, inquiry_text=text, response_text=response_text, attached_image=file.filename if file else None, detected_disease=detected_disease, disease_confidence=disease_confidence ) inquiry_collection.insert_one(inquiry.dict(by_alias=True)) return JSONResponse(content={"response_text": response_text, "detected_disease": detected_disease, "confidence": disease_confidence})