|
import os |
|
import sys |
|
import cv2 |
|
import gradio as gr |
|
import numpy as np |
|
import logging |
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('app.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
project_root = os.path.dirname(os.path.abspath(__file__)) |
|
sys.path.append(project_root) |
|
|
|
from ANPR_IND.scripts.charExtraction import CharExtraction |
|
from ANPR_IND.scripts.bboxAnnotator import BBOXAnnotator |
|
from ultralytics import YOLO |
|
|
|
wPathPlat = os.path.join(project_root, "ANPR_IND", "licence_plat.pt") |
|
wPathChar = os.path.join(project_root, "ANPR_IND", "licence_character.pt") |
|
classList = np.array([ |
|
'A','B','C','D','E','F','G','H','I','J','K','L','M', |
|
'N','O','P','Q','R','S','T','U','V','W','X','Y','Z', |
|
'0','1','2','3','4','5','6','7','8','9' |
|
]) |
|
sizePlat = (416, 200) |
|
|
|
helmet_model_path = os.path.join(project_root, "Helmet-Detect-model", "best.pt") |
|
|
|
required_files = [wPathPlat, wPathChar, helmet_model_path] |
|
for file_path in required_files: |
|
if not os.path.exists(file_path): |
|
logger.error(f"Required model file not found: {file_path}") |
|
raise FileNotFoundError(f"Required model file not found: {file_path}") |
|
|
|
try: |
|
logger.info("Initializing models...") |
|
helmet_model = YOLO(helmet_model_path) |
|
extractor = CharExtraction( |
|
wPlatePath=wPathPlat, |
|
wCharacterPath=wPathChar, |
|
classList=classList, |
|
sizePlate=sizePlat, |
|
conf=0.5 |
|
) |
|
annotator = BBOXAnnotator() |
|
logger.info("Models initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Error initializing models: {str(e)}") |
|
raise |
|
|
|
def process_image(image, conf=0.45): |
|
start_time = datetime.now() |
|
logger.info(f"Processing image with confidence threshold: {conf}") |
|
|
|
if image is None: |
|
logger.warning("No image provided") |
|
return None, "No image provided", "No image provided" |
|
|
|
try: |
|
if isinstance(image, str): |
|
if not os.path.exists(image): |
|
raise FileNotFoundError(f"Image file not found: {image}") |
|
image = cv2.imread(image) |
|
if image is None: |
|
raise ValueError("Failed to read image from the provided path.") |
|
else: |
|
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
|
|
|
logger.info("Running ANPR detection") |
|
bbox, plateNum, confidence = extractor.predict(image=image, conf=conf) |
|
anpr_image, plateNum = annotator.draw_bbox(image.copy(), bbox, plateNum) |
|
plate_text = ", ".join(plateNum) if plateNum else "No plate detected" |
|
logger.info(f"ANPR result: {plate_text}") |
|
|
|
logger.info("Running helmet detection") |
|
results = helmet_model(image) |
|
helmet_detected = len(results.boxes) > 0 |
|
helmet_status = "Helmet Detected" if helmet_detected else "No Helmet Detected" |
|
logger.info(f"Helmet detection result: {helmet_status}") |
|
|
|
helmet_image = results.plot() |
|
|
|
try: |
|
combined_image = cv2.addWeighted(anpr_image, 0.5, helmet_image, 0.5, 0) |
|
except Exception as e: |
|
logger.warning(f"Failed to combine annotations: {str(e)}") |
|
combined_image = helmet_image |
|
|
|
if isinstance(combined_image, np.ndarray): |
|
combined_image = cv2.cvtColor(combined_image, cv2.COLOR_BGR2RGB) |
|
|
|
processing_time = (datetime.now() - start_time).total_seconds() |
|
logger.info(f"Processing completed in {processing_time:.2f} seconds") |
|
|
|
return combined_image, plate_text, helmet_status |
|
except Exception as e: |
|
logger.error(f"Error processing image: {str(e)}") |
|
return image, f"Error: {str(e)}", "Error processing image" |
|
|
|
example_images = [ |
|
os.path.join(project_root, "ANPR_IND", "sample_image2.jpg"), |
|
os.path.join(project_root, "ANPR_IND", "sample_image3.jpg"), |
|
os.path.join(project_root, "ANPR_IND", "sample_image5.jpg"), |
|
os.path.join(project_root, "ANPR_IND", "sample_image6.jpg") |
|
] |
|
|
|
for img_path in example_images.copy(): |
|
if not os.path.exists(img_path): |
|
logger.warning(f"Example image not found: {img_path}") |
|
example_images.remove(img_path) |
|
|
|
def create_interface(): |
|
with gr.Blocks(title="Traffic Violation Detection System", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# Combined ANPR and Helmet Detection System") |
|
gr.Markdown("Upload an image to detect license plates and check for helmet usage.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
input_image = gr.Image(label="Input Image", type="pil") |
|
conf_slider = gr.Slider( |
|
minimum=0.1, |
|
maximum=1.0, |
|
value=0.45, |
|
label="Confidence Threshold" |
|
) |
|
detect_button = gr.Button("Detect", variant="primary") |
|
with gr.Column(): |
|
output_image = gr.Image(label="Annotated Image") |
|
plate_output = gr.Textbox(label="License Plate") |
|
helmet_output = gr.Textbox(label="Helmet Status") |
|
|
|
if example_images: |
|
gr.Examples( |
|
examples=[[img, 0.45] for img in example_images], |
|
inputs=[input_image, conf_slider], |
|
outputs=[output_image, plate_output, helmet_output], |
|
fn=process_image, |
|
cache_examples=True |
|
) |
|
|
|
detect_button.click( |
|
fn=process_image, |
|
inputs=[input_image, conf_slider], |
|
outputs=[output_image, plate_output, helmet_output] |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
try: |
|
logger.info("Starting application...") |
|
demo = create_interface() |
|
demo.queue() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
debug=True |
|
) |
|
except Exception as e: |
|
logger.error(f"Failed to start application: {str(e)}") |
|
sys.exit(1) |
|
|