|
|
|
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from services.utils import db |
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, UploadFile, File, Depends, Request |
|
|
from fastapi.responses import HTMLResponse, JSONResponse |
|
|
from datetime import datetime |
|
|
from services.disease_detection_service import bot |
|
|
from models.schemas.inquiry_schema import Inquiry |
|
|
from services.auth_service import get_current_user |
|
|
from typing import List |
|
|
|
|
|
|
|
|
chatbot_router = APIRouter() |
|
|
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
|
|
|
inquiry_collection = db.get_collection("inquiries") |
|
|
|
|
|
|
|
|
active_connections: List[WebSocket] = [] |
|
|
|
|
|
async def greeting_based_on_time(): |
|
|
current_hour = datetime.now().hour |
|
|
if 5 <= current_hour < 12: |
|
|
return "Good morning" |
|
|
elif 12 <= current_hour < 17: |
|
|
return "Good afternoon" |
|
|
else: |
|
|
return "Good evening" |
|
|
|
|
|
@chatbot_router.get("/", response_class=HTMLResponse) |
|
|
async def chatbot_page(request: Request, current_user: str = Depends(get_current_user)): |
|
|
greeting = await greeting_based_on_time() |
|
|
context = {"request": request, "greeting": f"{greeting}, {current_user}! How can I help you today?"} |
|
|
return templates.TemplateResponse("chatbot.html", context) |
|
|
|
|
|
@chatbot_router.websocket("/ws/chat") |
|
|
async def websocket_endpoint(websocket: WebSocket, current_user: str = Depends(get_current_user)): |
|
|
"""Handle real-time WebSocket connections for chat and sidebar updates.""" |
|
|
await websocket.accept() |
|
|
active_connections.append(websocket) |
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_text() |
|
|
response_text, detected_disease, confidence = await handle_inquiry(data, current_user) |
|
|
await websocket.send_json({"response": response_text, "disease": detected_disease, "confidence": confidence}) |
|
|
await broadcast_inquiry_history(current_user) |
|
|
except WebSocketDisconnect: |
|
|
active_connections.remove(websocket) |
|
|
|
|
|
async def handle_inquiry(text: str, current_user: str, file: UploadFile = None): |
|
|
"""Process inquiries asynchronously and update MongoDB.""" |
|
|
detected_disease, disease_confidence = None, None |
|
|
response_text = "" |
|
|
|
|
|
|
|
|
if file: |
|
|
image_data = await file.read() |
|
|
image_np = np.frombuffer(image_data, np.uint8) |
|
|
response_text, detected_disease, status, recommendation, disease_confidence = bot.diagnose_and_respond( |
|
|
cv2.imdecode(image_np, cv2.IMREAD_COLOR) |
|
|
) |
|
|
else: |
|
|
response_text = bot.llama_response(text) |
|
|
|
|
|
|
|
|
inquiry = Inquiry( |
|
|
user_id=current_user, |
|
|
inquiry_text=text, |
|
|
response_text=response_text, |
|
|
attached_image=file.filename if file else None, |
|
|
detected_disease=detected_disease, |
|
|
disease_confidence=disease_confidence |
|
|
) |
|
|
inquiry_collection.insert_one(inquiry.dict(by_alias=True)) |
|
|
return response_text, detected_disease, disease_confidence |
|
|
|
|
|
async def broadcast_inquiry_history(user_id: str): |
|
|
"""Send updated inquiry history to all WebSocket clients.""" |
|
|
history = list(inquiry_collection.find({"user_id": user_id}).sort("timestamp", -1).limit(10)) |
|
|
for connection in active_connections: |
|
|
await connection.send_json({"history": history}) |
|
|
|
|
|
@chatbot_router.post("/inquire") |
|
|
async def inquire(text: str, current_user: str = Depends(get_current_user), file: UploadFile = None): |
|
|
"""Handle inquiries with text and optional image for disease detection.""" |
|
|
response_text = "" |
|
|
detected_disease, disease_confidence = None, None |
|
|
|
|
|
|
|
|
if file: |
|
|
image_data = await file.read() |
|
|
image_np = np.frombuffer(image_data, np.uint8) |
|
|
response, disease_name, status, recommendation, confidence = bot.diagnose_and_respond(cv2.imdecode(image_np, cv2.IMREAD_COLOR)) |
|
|
response_text = response |
|
|
detected_disease, disease_confidence = disease_name, confidence |
|
|
else: |
|
|
|
|
|
response_text = bot.llama_response(text) |
|
|
|
|
|
|
|
|
inquiry = Inquiry( |
|
|
user_id=current_user, |
|
|
inquiry_text=text, |
|
|
response_text=response_text, |
|
|
attached_image=file.filename if file else None, |
|
|
detected_disease=detected_disease, |
|
|
disease_confidence=disease_confidence |
|
|
) |
|
|
inquiry_collection.insert_one(inquiry.dict(by_alias=True)) |
|
|
|
|
|
return JSONResponse(content={"response_text": response_text, "detected_disease": detected_disease, "confidence": disease_confidence}) |
|
|
|
|
|
|