import os import shutil for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: shutil.rmtree(d, ignore_errors=True) os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" os.environ["TORCH_HOME"] = "/tmp/torch" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" import json import uuid import datetime import numpy as np import torch import cv2 import joblib import torch.nn as nn import torchvision.transforms as transforms import torchvision.models as models from io import BytesIO from PIL import Image as PILImage from fastapi import FastAPI, File, UploadFile, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse import tensorflow as tf from model_histo import BreastCancerClassifier from fastapi.staticfiles import StaticFiles import uvicorn try: from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY from reportlab.lib.units import inch from reportlab.lib.colors import navy, black REPORTLAB_AVAILABLE = True except ImportError: REPORTLAB_AVAILABLE = False from ultralytics import YOLO from sklearn.preprocessing import MinMaxScaler from model import MWT as create_model from augmentations import Augmentations from huggingface_hub import InferenceClient # ===================================================== # SETUP TEMP DIRS AND ENV # ===================================================== for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: shutil.rmtree(d, ignore_errors=True) os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" os.environ["TORCH_HOME"] = "/tmp/torch" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # ===================================================== # HUGGING FACE CLIENT SETUP # ===================================================== HF_MODEL_ID = "mistralai/Mistral-7B-v0.1" hf_token = os.getenv("HF_TOKEN") client = None if hf_token: try: client = InferenceClient(model=HF_MODEL_ID, token=hf_token) print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") except Exception as e: print("⚠️ Failed to initialize Hugging Face client:", e) else: print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): """Generate a brief medical interpretation using Mistral.""" if not client: return "⚠️ Hugging Face client not initialized — skipping summary." try: prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. Observed Cell Counts: - {abnormal_cells} Abnormal Cells - {normal_cells} Normal Cells - Detection Confidence: {avg_confidence:.1f}% Write a 2-3 sentence professional medical assessment focusing on: 1. Cell count analysis 2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) 3. Clinical significance Use objective, scientific language suitable for a pathology report.""" # Use streaming to avoid StopIteration response = client.text_generation( prompt, max_new_tokens=200, temperature=0.7, stream=False, details=True, stop_sequences=["\n\n", "###"] ) # Handle different response formats if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() # Fallback summary if response format is unexpected ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells, with average detection confidence of {avg_confidence:.1f}%." except Exception as e: # Provide a structured fallback summary instead of error message total = abnormal_cells + normal_cells if total == 0: return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." ratio = (abnormal_cells / total) * 100 severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio. Average detection confidence: {avg_confidence:.1f}%." def generate_mwt_summary(predicted_label, confidences, avg_confidence): """Generate a short MWT-specific interpretation using the HF client when available.""" if not client: return "⚠️ Hugging Face client not initialized — skipping AI interpretation." try: prompt = f""" You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. Result: - Predicted label: {predicted_label} - Confidence (average): {avg_confidence:.1f}% - Class probabilities: {json.dumps(confidences)} Provide guidance on the significance of the result and any suggested next steps in plain, objective language. """ response = client.text_generation( prompt, max_new_tokens=120, temperature=0.2, stream=False, details=True, stop_sequences=["\n\n", "###"] ) if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() return f"Result: {predicted_label} (avg confidence {avg_confidence:.1f}%)." except Exception as e: return f"Quantitative result: {predicted_label} with average confidence {avg_confidence:.1f}%." def generate_cin_summary(predicted_grade, confidences, avg_confidence): """Generate a short CIN-specific interpretation using the HF client when available.""" if not client: return "⚠️ Hugging Face client not initialized — skipping AI interpretation." try: prompt = f""" You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. Result: - Predicted grade: {predicted_grade} - Confidence (average): {avg_confidence:.1f}% - Class probabilities: {json.dumps(confidences)} Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. """ response = client.text_generation( prompt, max_new_tokens=140, temperature=0.2, stream=False, details=True, stop_sequences=["\n\n", "###"] ) if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() return f"Result: {predicted_grade} (avg confidence {avg_confidence:.1f}%)." except Exception: return f"Quantitative result: {predicted_grade} with average confidence {avg_confidence:.1f}%." # ===================================================== # FASTAPI SETUP # ===================================================== app = FastAPI(title="Pathora Medical Diagnostic API") app.add_middleware( CORSMiddleware, allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"] # Allow access to response headers ) # Use /tmp for outputs in Hugging Face Spaces (writable directory) OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") os.makedirs(OUTPUT_DIR, exist_ok=True) # Create image outputs dir IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") os.makedirs(IMAGES_DIR, exist_ok=True) app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") # Mount public sample images from frontend dist (Vite copies public/ to dist/ root) # Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") if not os.path.isdir(FRONTEND_DIST_CHECK): FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) for sample_dir in ["cyto", "colpo", "histo"]: sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) if os.path.isdir(sample_path): app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) print(f"✅ Mounted /{sample_dir} from {sample_path}") else: print(f"⚠️ Sample directory not found: {sample_path}") # Mount other static assets (logos, banners) from dist root for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) if os.path.isfile(static_path): print(f"✅ Static file available: /{static_file}") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # ===================================================== # MODEL LOADS # ===================================================== print("🔹 Loading YOLO model...") yolo_model = YOLO("best2.pt") print("🔹 Loading MWT model...") mwt_model = create_model(num_classes=2).to(device) mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) mwt_model.eval() mwt_class_names = ["Negative", "Positive"] print("🔹 Loading CIN model...") try: clf = joblib.load("logistic_regression_model.pkl") except Exception as e: print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") clf = None yolo_colposcopy = YOLO("yolo_colposcopy.pt") # ===================================================== # RESNET FEATURE EXTRACTORS FOR CIN # ===================================================== def build_resnet(model_name="resnet50"): if model_name == "resnet50": model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) elif model_name == "resnet101": model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) elif model_name == "resnet152": model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) model.eval().to(device) return ( nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), model.layer1, model.layer2, model.layer3, model.layer4, ) gap = nn.AdaptiveAvgPool2d((1, 1)) gmp = nn.AdaptiveMaxPool2d((1, 1)) resnet50_blocks = build_resnet("resnet50") resnet101_blocks = build_resnet("resnet101") resnet152_blocks = build_resnet("resnet152") transform = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def preprocess_for_mwt(image_np): img = cv2.resize(image_np, (224, 224)) img = Augmentations.Normalization((0, 1))(img) img = np.array(img, np.float32) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.transpose(2, 0, 1) img = np.expand_dims(img, axis=0) return torch.Tensor(img) def extract_cbf_features(blocks, img_t): block1, block2, block3, block4, block5 = blocks with torch.no_grad(): f1 = block1(img_t) f2 = block2(f1) f3 = block3(f2) f4 = block4(f3) f5 = block5(f4) p1 = gmp(f1).view(-1) p2 = gmp(f2).view(-1) p3 = gap(f3).view(-1) p4 = gap(f4).view(-1) p5 = gap(f5).view(-1) return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() # ===================================================== # Model 4: Histopathology Classifier (TensorFlow) # ===================================================== print("🔹 Attempting to load Breast Cancer Histopathology model...") try: classifier = BreastCancerClassifier(fine_tune=False) # Safely handle Hugging Face token auth hf_token = os.getenv("HF_TOKEN") if hf_token: if classifier.authenticate_huggingface(): print("✅ Hugging Face authentication successful.") else: print("⚠️ Warning: Hugging Face authentication failed, using local model only.") else: print("⚠️ HF_TOKEN not found in environment — skipping authentication.") # Load Path Foundation model if classifier.load_path_foundation(): print("✅ Loaded Path Foundation base model.") else: print("⚠️ Could not load Path Foundation base model, continuing with local weights only.") # Load trained histopathology model model_path = "histopathology_trained_model.keras" if os.path.exists(model_path): classifier.model = tf.keras.models.load_model(model_path) print(f"✅ Loaded local histopathology model: {model_path}") else: print(f"⚠️ Model file not found: {model_path}") except Exception as e: classifier = None print(f"❌ Error initializing histopathology model: {e}") def predict_histopathology(image): if classifier is None: return {"error": "Histopathology model not available."} try: if image.mode != "RGB": image = image.convert("RGB") image = image.resize((224, 224)) img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) embeddings = classifier.extract_embeddings(img_array) prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] predicted_class = int(np.argmax(prediction_proba)) class_names = ["Benign", "Malignant"] # Return confidence as dictionary with both class probabilities (like MWT/CIN) confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} avg_confidence = float(np.max(prediction_proba)) * 100 return { "model_used": "Histopathology Classifier", "prediction": class_names[predicted_class], "confidence": confidences, "summary": { "avg_confidence": round(avg_confidence, 2), "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue with {avg_confidence:.1f}% confidence.", }, } except Exception as e: return {"error": f"Histopathology prediction failed: {e}"} # ===================================================== # MAIN ENDPOINT # ===================================================== @app.post("/predict/") async def predict(model_name: str = Form(...), file: UploadFile = File(...)): print(f"Received prediction request - model: {model_name}, file: {file.filename}") # Validate model name if model_name not in ["yolo", "mwt", "cin", "histopathology"]: return JSONResponse( content={ "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology" }, status_code=400 ) # Validate and read file if not file.filename: return JSONResponse( content={"error": "No file provided"}, status_code=400 ) contents = await file.read() if len(contents) == 0: return JSONResponse( content={"error": "Empty file provided"}, status_code=400 ) # Attempt to open and validate image try: image = PILImage.open(BytesIO(contents)).convert("RGB") image_np = np.array(image) if image_np.size == 0: raise ValueError("Empty image array") print(f"Successfully loaded image, shape: {image_np.shape}") except Exception as e: return JSONResponse( content={"error": f"Invalid image file: {str(e)}"}, status_code=400 ) if model_name == "yolo": results = yolo_model(image) detections_json = results[0].to_json() detections = json.loads(detections_json) abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") normal_cells = sum(1 for d in detections if d["name"] == "normal") avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" output_path = os.path.join(IMAGES_DIR, output_filename) results[0].save(filename=output_path) return { "model_used": "YOLO Detection", "detections": detections, "annotated_image_url": f"/outputs/images/{output_filename}", "summary": { "abnormal_cells": abnormal_cells, "normal_cells": normal_cells, "avg_confidence": round(float(avg_confidence), 2), "ai_interpretation": ai_summary, }, } elif model_name == "mwt": tensor = preprocess_for_mwt(image_np) with torch.no_grad(): output = mwt_model(tensor.to(device)).cpu() probs = torch.softmax(output, dim=1)[0] confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} predicted_label = mwt_class_names[int(torch.argmax(probs).item())] # Average / primary confidence for display avg_confidence = float(torch.max(probs).item()) * 100 # Generate a brief AI interpretation using the Mistral client (if available) ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) return { "model_used": "MWT Classifier", "prediction": predicted_label, "confidence": confidences, "summary": { "avg_confidence": round(avg_confidence, 2), "ai_interpretation": ai_interp, }, } elif model_name == "cin": if clf is None: return JSONResponse( content={"error": "CIN classifier not available on server."}, status_code=503, ) nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) if len(results[0].boxes) == 0: return {"error": "No cervix detected"} x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) crop = img[y1:y2, x1:x2] crop = cv2.resize(crop, (224, 224)) img_t = transform(crop).unsqueeze(0).to(device) f50 = extract_cbf_features(resnet50_blocks, img_t) f101 = extract_cbf_features(resnet101_blocks, img_t) f152 = extract_cbf_features(resnet152_blocks, img_t) features = np.concatenate([f50, f101, f152]).reshape(1, -1) X_scaled = MinMaxScaler().fit_transform(features) pred = clf.predict(X_scaled)[0] proba = clf.predict_proba(X_scaled)[0] # Get actual number of classes from model output classes = ["Low-grade", "High-grade"] # Binary CIN classification predicted_label = classes[pred] confidences = {classes[i]: float(proba[i]) for i in range(len(classes))} # Map to more detailed classification based on confidence if predicted_label == "High-grade" and confidences["High-grade"] > 0.8: detailed_class = "CIN3" elif predicted_label == "High-grade": detailed_class = "CIN2" else: detailed_class = "CIN1" # Average / primary confidence for display avg_confidence = float(np.max(proba)) * 100 # Generate a brief AI interpretation using the Mistral client (if available) ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) return { "model_used": "CIN Classifier", "prediction": detailed_class, "grade": predicted_label, "confidence": confidences, "summary": { "avg_confidence": round(avg_confidence, 2), "ai_interpretation": ai_interp, }, } elif model_name == "histopathology": result = predict_histopathology(image) return result else: return JSONResponse(content={"error": "Invalid model name"}, status_code=400) # ===================================================== # ROUTES # ===================================================== def create_designed_pdf(pdf_path, report_data, analysis_summary_json): doc = SimpleDocTemplate(pdf_path, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) styles = getSampleStyleSheet() story = [] styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) patient = report_data['patient'] analysis = report_data.get('analysis', {}) # Safely parse analysis_summary_json try: ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} except (json.JSONDecodeError, TypeError): ai_summary = {} # Determine report type based on model used model_used = ai_summary.get('model_used', '') if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): report_type = "CYTOLOGY" report_title = "Cytology Report" elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): report_type = "COLPOSCOPY" report_title = "Colposcopy Report" elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): report_type = "HISTOPATHOLOGY" report_title = "Histopathology Report" else: report_type = "CYTOLOGY" report_title = "Medical Analysis Report" # Header story.append(Paragraph("MANALIFE AI", styles['Title'])) story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) story.append(Spacer(1, 0.3*inch)) story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) story.append(Paragraph(report_title, styles['Section'])) story.append(Spacer(1, 0.2*inch)) # Report ID and Date story.append(Paragraph(f"Report ID: {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Generated: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Patient Information Section story.append(Paragraph("Patient Information", styles['Section'])) story.append(Paragraph(f"Patient ID: {patient.get('id', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Exam Date: {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Physician: {patient.get('physician', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Facility: {patient.get('facility', 'N/A')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Sample Information Section story.append(Paragraph("Sample Information", styles['Section'])) story.append(Paragraph(f"Specimen Type: {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) story.append(Paragraph(f"Clinical History: {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # AI Analysis Section story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) story.append(Paragraph("System: Manalife AI System — Automated Analysis", styles['NormalSmall'])) story.append(Paragraph(f"Confidence Score: {ai_summary.get('avg_confidence', 'N/A')}%", styles['NormalSmall'])) # Add metrics based on report type if report_type == "HISTOPATHOLOGY": # For histopathology, show Benign/Malignant confidence confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict): benign_conf = confidence_dict.get('Benign', 0) * 100 malignant_conf = confidence_dict.get('Malignant', 0) * 100 story.append(Paragraph(f"Benign Confidence: {benign_conf:.2f}%", styles['NormalSmall'])) story.append(Paragraph(f"Malignant Confidence: {malignant_conf:.2f}%", styles['NormalSmall'])) elif report_type == "CYTOLOGY": # For cytology (YOLO), show abnormal/normal cells if 'abnormal_cells' in ai_summary: story.append(Paragraph(f"Abnormal Cells: {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) if 'normal_cells' in ai_summary: story.append(Paragraph(f"Normal Cells: {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) else: # For CIN/Colposcopy, show class confidences confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict): for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) story.append(Spacer(1, 0.1*inch)) story.append(Paragraph("AI Interpretation:", styles['NormalSmall'])) story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Doctor's Notes story.append(Paragraph("Doctor's Notes", styles['Section'])) story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Recommendations story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) story.append(Spacer(1, 0.3*inch)) # Signatures story.append(Paragraph("Signatures", styles['Section'])) story.append(Paragraph("Dr. Emily Roberts, MD (Cytopathologist)", styles['NormalSmall'])) story.append(Paragraph("Dr. James Wilson, MD (Pathologist)", styles['NormalSmall'])) story.append(Spacer(1, 0.1*inch)) story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) doc.build(story) @app.post("/reports/") async def generate_report( patient_id: str = Form(...), exam_date: str = Form(...), metadata: str = Form(...), notes: str = Form(None), analysis_id: str = Form(None), analysis_summary: str = Form(None), ): """Generate a structured medical report from analysis results and metadata.""" try: # Create reports directory if it doesn't exist reports_dir = os.path.join(OUTPUT_DIR, "reports") os.makedirs(reports_dir, exist_ok=True) # Generate unique report ID report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" report_dir = os.path.join(reports_dir, report_id) os.makedirs(report_dir, exist_ok=True) # Parse metadata metadata_dict = json.loads(metadata) # Get analysis results - assuming stored in memory or retrievable # TODO: Implement analysis results storage/retrieval # Construct report data report_data = { "report_id": report_id, "generated_at": datetime.datetime.now().isoformat(), "patient": { "id": patient_id, "exam_date": exam_date, **metadata_dict }, "analysis": { "id": analysis_id, # If the analysis_id is actually an annotated image URL, store it for report embedding "annotated_image_url": analysis_id, # TODO: Add actual analysis results }, "doctor_notes": notes } # Save report data report_json = os.path.join(report_dir, "report.json") with open(report_json, "w", encoding="utf-8") as f: json.dump(report_data, f, indent=2, ensure_ascii=False) # Attempt to create a PDF version if reportlab is available pdf_url = None if REPORTLAB_AVAILABLE: try: pdf_path = os.path.join(report_dir, "report.pdf") create_designed_pdf(pdf_path, report_data, analysis_summary) pdf_url = f"/outputs/reports/{report_id}/report.pdf" except Exception as e: print(f"Error creating designed PDF: {e}") pdf_url = None # Parse analysis_summary to get AI results try: ai_summary = json.loads(analysis_summary) if analysis_summary else {} except (json.JSONDecodeError, TypeError): ai_summary = {} # Determine report type based on analysis summary or model used model_used = ai_summary.get('model_used', '') if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): report_type = "Cytology" report_title = "Cytology Report" elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): report_type = "Colposcopy" report_title = "Colposcopy Report" elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): report_type = "Histopathology" report_title = "Histopathology Report" else: # Default fallback report_type = "Cytology" report_title = "Medical Analysis Report" # Build analysis metrics HTML based on report type if report_type == "Histopathology": # For histopathology, show Benign/Malignant confidence from the confidence dict confidence_dict = ai_summary.get('confidence', {}) benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 analysis_metrics_html = f"""
| Patient ID | {patient_id} |
|---|---|
| Exam Date | {exam_date} |
| Physician | {metadata_dict.get('physician', 'N/A')} |
| Facility | {metadata_dict.get('facility', 'N/A')} |
| Specimen Type | {metadata_dict.get('specimen_type', 'N/A')} |
|---|---|
| Clinical History | {metadata_dict.get('clinical_history', 'N/A')} |
| Collected | {exam_date} |
| Reported | {generated_time} |
{notes or 'No additional notes provided.'}
Continue routine screening as per standard guidelines. Follow up as directed by your physician.