Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import cv2 | |
| import gradio as gr | |
| import numpy as np | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('app.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Add the project root to Python path | |
| project_root = os.path.dirname(os.path.abspath(__file__)) | |
| sys.path.append(project_root) | |
| from ANPR_IND.scripts.charExtraction import CharExtraction | |
| from ANPR_IND.scripts.bboxAnnotator import BBOXAnnotator | |
| from ultralytics import YOLO | |
| # Initialize ANPR models and classes | |
| wPathPlat = os.path.join(project_root, "ANPR_IND", "licence_plat.pt") | |
| wPathChar = os.path.join(project_root, "ANPR_IND", "licence_character.pt") | |
| classList = np.array(['A','B','C','D','E','F','G','H','I','J','K','L','M', | |
| 'N','O','P','Q','R','S','T','U','V','W','X','Y','Z', | |
| '0','1','2','3','4','5','6','7','8','9']) | |
| sizePlat = (416,200) | |
| # Initialize Helmet Detection model | |
| helmet_model_path = os.path.join(project_root, "Helmet-Detect-model", "best.pt") | |
| # Verify model files exist | |
| required_files = [wPathPlat, wPathChar, helmet_model_path] | |
| for file_path in required_files: | |
| if not os.path.exists(file_path): | |
| logger.error(f"Required model file not found: {file_path}") | |
| raise FileNotFoundError(f"Required model file not found: {file_path}") | |
| # Initialize models | |
| try: | |
| logger.info("Initializing models...") | |
| helmet_model = YOLO(helmet_model_path) | |
| extractor = CharExtraction(wPlatePath=wPathPlat, wCharacterPath=wPathChar, | |
| classList=classList, sizePlate=sizePlat, conf=0.5) | |
| annotator = BBOXAnnotator() | |
| logger.info("Models initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing models: {str(e)}") | |
| raise | |
| def process_image(image, conf=0.45): | |
| start_time = datetime.now() | |
| logger.info(f"Processing image with confidence threshold: {conf}") | |
| if image is None: | |
| logger.warning("No image provided") | |
| return None, "No image provided", "No image provided" | |
| try: | |
| # Convert PIL Image to cv2 format if needed | |
| if isinstance(image, str): | |
| if not os.path.exists(image): | |
| raise FileNotFoundError(f"Image file not found: {image}") | |
| image = cv2.imread(image) | |
| if image is None: | |
| raise ValueError(f"Failed to read image: {image}") | |
| else: | |
| image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # ANPR Detection | |
| logger.info("Running ANPR detection") | |
| bbox, plateNum, confidence = extractor.predict(image=image, conf=conf) | |
| anpr_image, plateNum = annotator.draw_bbox(image.copy(), bbox, plateNum) | |
| plate_text = ", ".join(plateNum) if plateNum else "No plate detected" | |
| logger.info(f"ANPR result: {plate_text}") | |
| # Helmet Detection | |
| logger.info("Running helmet detection") | |
| results = helmet_model(image) | |
| helmet_detected = len(results[0].boxes) > 0 | |
| helmet_status = "Helmet Detected" if helmet_detected else "No Helmet Detected" | |
| logger.info(f"Helmet detection result: {helmet_status}") | |
| # Get annotated image from helmet detection | |
| helmet_image = results[0].plot() | |
| # Combine annotations | |
| try: | |
| combined_image = cv2.addWeighted(anpr_image, 0.5, helmet_image, 0.5, 0) | |
| except Exception as e: | |
| logger.warning(f"Failed to combine annotations: {str(e)}") | |
| combined_image = helmet_image | |
| # Convert BGR to RGB for display | |
| if isinstance(combined_image, np.ndarray): | |
| combined_image = cv2.cvtColor(combined_image, cv2.COLOR_BGR2RGB) | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| logger.info(f"Processing completed in {processing_time:.2f} seconds") | |
| return combined_image, plate_text, helmet_status | |
| except Exception as e: | |
| logger.error(f"Error processing image: {str(e)}") | |
| return image, f"Error: {str(e)}", "Error processing image" | |
| # Create example images array | |
| example_images = [ | |
| os.path.join(project_root, "ANPR_IND", "sample_image2.jpg"), | |
| os.path.join(project_root, "ANPR_IND", "sample_image3.jpg"), | |
| os.path.join(project_root, "ANPR_IND", "sample_image5.jpg"), | |
| os.path.join(project_root, "ANPR_IND", "sample_image6.jpg") | |
| ] | |
| # Verify example images exist | |
| for img_path in example_images: | |
| if not os.path.exists(img_path): | |
| logger.warning(f"Example image not found: {img_path}") | |
| example_images.remove(img_path) | |
| # Create Gradio interface | |
| def create_interface(): | |
| with gr.Blocks(title="Traffic Violation Detection System", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Combined ANPR and Helmet Detection System") | |
| gr.Markdown("Upload an image to detect license plates and check for helmet usage.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_image = gr.Image(label="Input Image", type="pil") | |
| conf_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.45, | |
| label="Confidence Threshold") | |
| detect_button = gr.Button("Detect", variant="primary") | |
| with gr.Column(): | |
| output_image = gr.Image(label="Annotated Image") | |
| plate_output = gr.Textbox(label="License Plate") | |
| helmet_output = gr.Textbox(label="Helmet Status") | |
| # Set up example images | |
| if example_images: | |
| gr.Examples( | |
| examples=[[img, 0.45] for img in example_images], | |
| inputs=[input_image, conf_slider], | |
| outputs=[output_image, plate_output, helmet_output], | |
| fn=process_image, | |
| cache_examples=True | |
| ) | |
| # Set up the click event | |
| detect_button.click( | |
| fn=process_image, | |
| inputs=[input_image, conf_slider], | |
| outputs=[output_image, plate_output, helmet_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("Starting application...") | |
| demo = create_interface() | |
| demo.queue() # Enable queue separately | |
| # Configure FastAPI app with custom settings | |
| demo.launch( | |
| share=False, # Disable share link by default | |
| server_name="127.0.0.1", # Use localhost instead of 0.0.0.0 | |
| server_port=7860, | |
| favicon_path=None, | |
| auth=None, | |
| ssl_keyfile=None, | |
| ssl_certfile=None, | |
| ssl_verify=True, | |
| quiet=False, | |
| show_api=True, | |
| root_path="", | |
| _frontend=True, | |
| prevent_thread_lock=False, | |
| allowed_paths=None, | |
| blocked_paths=None, | |
| max_threads=40, | |
| debug=True # Enable debug mode for better error reporting | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to start application: {str(e)}") | |
| sys.exit(1) | |