Spaces:
Runtime error
Runtime error
| from flask import Flask, request, jsonify, render_template | |
| from PIL import Image | |
| import io | |
| import os | |
| import requests | |
| from roboflow import Roboflow | |
| import supervision as sv | |
| import cv2 | |
| import tempfile | |
| import gdown | |
| import os | |
| import requests | |
| import requests | |
| import cloudinary | |
| import model | |
| import cloudinary.uploader | |
| from a import main | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| GDRIVE_MODEL_URL = "https://drive.google.com/uc?id=1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" | |
| LOCAL_MODEL_PATH = "checkpoint32.pth" | |
| print(GDRIVE_MODEL_URL) | |
| def download_file_from_google_drive(): | |
| gdown.download(GDRIVE_MODEL_URL, LOCAL_MODEL_PATH, quiet=False) | |
| file_id = "1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" | |
| destination = "checkpoint32.pth" | |
| def download_model(): | |
| if not os.path.exists(LOCAL_MODEL_PATH): | |
| response = requests.get(GDRIVE_MODEL_URL, stream=True) | |
| if response.status_code == 200: | |
| with open(LOCAL_MODEL_PATH, "wb") as f: | |
| f.write(response.content) | |
| else: | |
| raise Exception( | |
| f"Failed to download model from Google Drive: {response.status_code}" | |
| ) | |
| download_file_from_google_drive() | |
| def home(): | |
| return render_template("index.html") | |
| def fetchImage(): | |
| file = None | |
| url = "" | |
| if "url" in request.form: | |
| url = request.form["url"] | |
| response = requests.get(url) | |
| file = io.BytesIO(response.content) | |
| elif "file" in request.files: | |
| file = request.files["file"] | |
| # url = "https://firebasestorage.googleapis.com/v0/b/car-damage-detector-s34rrz.firebasestorage.app/o/users%2FYMd99dt33HaktTWpYp5MM5oYeBE3%2Fuploads%2F1737454072124000.jpg?alt=media&token=9eae79fa-4c06-41a5-9f58-236c39efaac0" | |
| # File name for saving | |
| file_name = "downloaded_image.jpg" | |
| # Download the image | |
| response = requests.get(url) | |
| # Save the image to the current directory | |
| if response.status_code == 200: | |
| file_name = "downloaded_image.jpg" | |
| image = Image.open(io.BytesIO(response.content)) | |
| if image.mode == "RGBA": | |
| image = image.convert("RGB") | |
| image.save(file_name, "JPEG", quality=100) | |
| print(f"Image downloaded and saved as {file_name}") | |
| else: | |
| print(f"Failed to download image. Status code: {response.status_code}") | |
| # Load image | |
| image = cv2.imread(file_name) | |
| rf = Roboflow(api_key="LqD8Cs4OsoK8seO3CPkf") | |
| project_parts = rf.workspace().project("car-parts-segmentation") | |
| model_parts = project_parts.version(2).model | |
| project_damage = rf.workspace().project("car-damage-detection-ha5mm") | |
| model_damage = project_damage.version(1).model | |
| # Run the damage detection model | |
| result_damage = model_damage.predict( | |
| file_name, | |
| confidence=40, | |
| ).json() | |
| # Extract detections from the result | |
| detections_damage = sv.Detections.from_inference(result_damage) | |
| # Read the input image | |
| # Annotate damaged areas of the car | |
| mask_annotator = sv.MaskAnnotator() | |
| annotated_image_damage = mask_annotator.annotate( | |
| scene=image, detections=detections_damage | |
| ) | |
| # Create a temporary directory to save outputs | |
| temp_dir = tempfile.mkdtemp() | |
| # Define a repair cost dictionary (per part) | |
| repair_cost_dict = { | |
| "wheel": 100, # Base cost for wheel | |
| "door": 200, # Base cost for door | |
| "hood": 300, # Base cost for hood | |
| "front_bumper": 250, # Base cost for bumper | |
| "trunk": 200, | |
| "front_glass": 150, | |
| "back_left_door": 200, | |
| "left_mirror": 20, | |
| "back_glass": 150, | |
| } | |
| # Initialize total cost | |
| total_cost = 0 | |
| # Ensure coordinate processing is done in chunks of 4 | |
| coordinates = list(map(int, detections_damage.xyxy.flatten())) | |
| num_damages = ( | |
| len(coordinates) // 4 | |
| ) # Each damage has 4 coordinates (x1, y1, x2, y2) | |
| # Iterate through damages | |
| for i in range(num_damages): | |
| x1, y1, x2, y2 = coordinates[i * 4: (i + 1) * 4] | |
| # Ensure the coordinates are within image bounds | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2) | |
| # Crop the damaged region | |
| cropped_damage = image[y1:y2, x1:x2] | |
| # Check if the cropped region is valid | |
| if cropped_damage.size == 0: | |
| print(f"Skipping empty crop for damage region {i + 1}") | |
| continue | |
| # Save the cropped damaged area | |
| damage_image_path = os.path.join(temp_dir, f"damage_image_{i}.png") | |
| cv2.imwrite(damage_image_path, cropped_damage) | |
| # Run the parts detection model on the cropped damage | |
| result_parts = model_parts.predict( | |
| damage_image_path, confidence=15).json() | |
| detections_parts = sv.Detections.from_inference(result_parts) | |
| # Calculate repair cost for each detected part | |
| for part in result_parts["predictions"]: | |
| part_name = part["class"] | |
| damage_area = part["width"] * part["height"] | |
| cropped_area = (x2 - x1) * (y2 - y1) | |
| damage_percentage = (damage_area / cropped_area) * 100 | |
| # Lookup cost and add to total | |
| base_cost = repair_cost_dict.get( | |
| part_name, 0 | |
| ) # Default to 0 if part not in dict | |
| repair_cost = (damage_percentage / 100) * 10 * base_cost | |
| total_cost += round(repair_cost, ndigits=1) | |
| print( | |
| f"Damage {i + 1} - {part_name}: {damage_percentage:.2f}% damaged, Cost: ${repair_cost:.2f}" | |
| ) | |
| # Annotate and save the result | |
| part_annotator = sv.LabelAnnotator() | |
| annotated_parts_image = part_annotator.annotate( | |
| scene=cropped_damage, detections=detections_parts | |
| ) | |
| annotated_parts_path = os.path.join( | |
| temp_dir, f"annotated_parts_{i}.png") | |
| cv2.imwrite(annotated_parts_path, annotated_parts_image) | |
| # Save the overall annotated image | |
| annotated_image_path = os.path.join(temp_dir, "annotated_image_damage.png") | |
| cv2.imwrite(annotated_image_path, annotated_image_damage) | |
| # Return the total cost in the specified format | |
| result = {"total_cost": total_cost} | |
| print(result) | |
| return jsonify(result) | |
| def generate_report(): | |
| file = None | |
| if "report_url" in request.form: | |
| report_url = request.form["report_url"] | |
| insurance_url = request.form["insurance_url"] | |
| url = main(report_url, insurance_url, "output.pdf") | |
| result = {"url": url} | |
| return jsonify(result), 200 | |
| elif "file" in request.files: | |
| file = request.files["file"] | |
| with open("uploaded_report.pdf", "wb") as f: | |
| f.write(file.read()) | |
| return jsonify({"message": "Something happened!."}), 404 | |
| def predict(): | |
| file = request.files["file"] | |
| if not file: | |
| return jsonify({"error": "file not uploaded"}), 400 | |
| # Save file temporarily | |
| temp_path = os.path.join(tempfile.gettempdir(), file.filename) | |
| file.save(temp_path) | |
| if file.filename.lower().endswith((".png", ".jpg", ".jpeg")): | |
| image = Image.open(temp_path) | |
| image_save_path = os.path.join( | |
| tempfile.gettempdir(), file.filename.lower()) | |
| image.save(image_save_path) | |
| def is_mri_image(image_path): | |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
| if img is None: | |
| return False # Invalid image | |
| # Apply Canny edge detection | |
| edges = cv2.Canny(img, 50, 150) | |
| # Calculate edge density (MRI images have high edge presence) | |
| edge_density = np.sum(edges > 0) / edges.size | |
| print(edge_density) | |
| return edge_density > 0.05 | |
| if (not is_mri_image(temp_path)): | |
| return jsonify({"message": "Not an mri image", "confidence": 0.95, "saved_path": image_save_path}) | |
| a, b = model.check_file(temp_path) | |
| return jsonify({"message": a, "confidence": b, "saved_path": image_save_path}) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |