Emmanuel Frimpong Asante
update space
33a55fb
raw
history blame
4.75 kB
# routes/chatbot.py
import cv2
import numpy as np
from fastapi.templating import Jinja2Templates
from services.utils import db
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, UploadFile, File, Depends, Request
from fastapi.responses import HTMLResponse, JSONResponse
from datetime import datetime
from services.disease_detection_service import bot
from models.schemas.inquiry_schema import Inquiry
from services.auth_service import get_current_user
from typing import List
chatbot_router = APIRouter()
templates = Jinja2Templates(directory="templates")
# MongoDB collection for inquiries
inquiry_collection = db.get_collection("inquiries")
# Store active WebSocket connections
active_connections: List[WebSocket] = []
async def greeting_based_on_time():
current_hour = datetime.now().hour
if 5 <= current_hour < 12:
return "Good morning"
elif 12 <= current_hour < 17:
return "Good afternoon"
else:
return "Good evening"
@chatbot_router.get("/", response_class=HTMLResponse)
async def chatbot_page(request: Request, current_user: str = Depends(get_current_user)):
greeting = await greeting_based_on_time()
context = {"request": request, "greeting": f"{greeting}, {current_user}! How can I help you today?"}
return templates.TemplateResponse("chatbot.html", context)
@chatbot_router.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket, current_user: str = Depends(get_current_user)):
"""Handle real-time WebSocket connections for chat and sidebar updates."""
await websocket.accept()
active_connections.append(websocket)
try:
while True:
data = await websocket.receive_text()
response_text, detected_disease, confidence = await handle_inquiry(data, current_user)
await websocket.send_json({"response": response_text, "disease": detected_disease, "confidence": confidence})
await broadcast_inquiry_history(current_user) # Update sidebar for all users
except WebSocketDisconnect:
active_connections.remove(websocket)
async def handle_inquiry(text: str, current_user: str, file: UploadFile = None):
"""Process inquiries asynchronously and update MongoDB."""
detected_disease, disease_confidence = None, None
response_text = ""
# Handle image inquiries
if file:
image_data = await file.read()
image_np = np.frombuffer(image_data, np.uint8)
response_text, detected_disease, status, recommendation, disease_confidence = bot.diagnose_and_respond(
cv2.imdecode(image_np, cv2.IMREAD_COLOR)
)
else:
response_text = bot.llama_response(text)
# Save inquiry to MongoDB
inquiry = Inquiry(
user_id=current_user,
inquiry_text=text,
response_text=response_text,
attached_image=file.filename if file else None,
detected_disease=detected_disease,
disease_confidence=disease_confidence
)
inquiry_collection.insert_one(inquiry.dict(by_alias=True))
return response_text, detected_disease, disease_confidence
async def broadcast_inquiry_history(user_id: str):
"""Send updated inquiry history to all WebSocket clients."""
history = list(inquiry_collection.find({"user_id": user_id}).sort("timestamp", -1).limit(10))
for connection in active_connections:
await connection.send_json({"history": history})
@chatbot_router.post("/inquire")
async def inquire(text: str, current_user: str = Depends(get_current_user), file: UploadFile = None):
"""Handle inquiries with text and optional image for disease detection."""
response_text = ""
detected_disease, disease_confidence = None, None
# Handle image inquiries with disease detection
if file:
image_data = await file.read()
image_np = np.frombuffer(image_data, np.uint8) # Convert to NumPy array
response, disease_name, status, recommendation, confidence = bot.diagnose_and_respond(cv2.imdecode(image_np, cv2.IMREAD_COLOR))
response_text = response
detected_disease, disease_confidence = disease_name, confidence
else:
# General inquiries using Llama model
response_text = bot.llama_response(text)
# Log inquiry into MongoDB
inquiry = Inquiry(
user_id=current_user,
inquiry_text=text,
response_text=response_text,
attached_image=file.filename if file else None,
detected_disease=detected_disease,
disease_confidence=disease_confidence
)
inquiry_collection.insert_one(inquiry.dict(by_alias=True))
return JSONResponse(content={"response_text": response_text, "detected_disease": detected_disease, "confidence": disease_confidence})