|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import cv2 |
|
import numpy as np |
|
import threading |
|
import time |
|
from datetime import datetime |
|
from PIL import Image |
|
from transformers import BlipProcessor, BlipForQuestionAnswering |
|
import torch |
|
|
|
|
|
print("Loading BLIP-2 model...") |
|
vqa_processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") |
|
vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base", torch_dtype=torch.float16) |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
vqa_model = vqa_model.to(device) |
|
print(f"Model loaded on {device}") |
|
|
|
class FireDetectionMCP: |
|
def __init__(self): |
|
self.running = False |
|
self.current_frame = None |
|
self.status = "No video source" |
|
self.status_color = "#808080" |
|
self.last_analysis_time = 0 |
|
self.frame_count = 0 |
|
self.last_detection_time = None |
|
self.display_status = "No video source" |
|
|
|
def analyze_frame(self, frame): |
|
"""Analyze frame for fire/smoke""" |
|
try: |
|
|
|
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
image = image.resize((224, 224)) |
|
|
|
|
|
fire_question = "Is there fire or flames in this image?" |
|
smoke_question = "Is there smoke in this image?" |
|
|
|
|
|
fire_inputs = vqa_processor(image, fire_question, return_tensors="pt").to(device) |
|
with torch.no_grad(): |
|
fire_outputs = vqa_model.generate(**fire_inputs, max_length=10, return_dict_in_generate=True, output_scores=True) |
|
fire_answer = vqa_processor.decode(fire_outputs.sequences[0], skip_special_tokens=True).lower() |
|
fire_confidence = torch.softmax(fire_outputs.scores[0][0], dim=0).max().item() * 100 |
|
|
|
|
|
smoke_inputs = vqa_processor(image, smoke_question, return_tensors="pt").to(device) |
|
with torch.no_grad(): |
|
smoke_outputs = vqa_model.generate(**smoke_inputs, max_length=10, return_dict_in_generate=True, output_scores=True) |
|
smoke_answer = vqa_processor.decode(smoke_outputs.sequences[0], skip_special_tokens=True).lower() |
|
smoke_confidence = torch.softmax(smoke_outputs.scores[0][0], dim=0).max().item() * 100 |
|
|
|
|
|
has_fire = 'yes' in fire_answer or 'fire' in fire_answer or 'flame' in fire_answer |
|
has_smoke = 'yes' in smoke_answer or 'smoke' in smoke_answer |
|
|
|
if has_fire and has_smoke: |
|
status_with_emoji = f"π₯π¨ FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" |
|
status_no_emoji = f"FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" |
|
return status_with_emoji, status_no_emoji, "#FF0000" |
|
elif has_fire: |
|
status_with_emoji = f"π₯ FIRE DETECTED ({fire_confidence:.0f}%)" |
|
status_no_emoji = f"FIRE DETECTED ({fire_confidence:.0f}%)" |
|
return status_with_emoji, status_no_emoji, "#FF4500" |
|
elif has_smoke: |
|
status_with_emoji = f"π¨ SMOKE DETECTED ({smoke_confidence:.0f}%)" |
|
status_no_emoji = f"SMOKE DETECTED ({smoke_confidence:.0f}%)" |
|
return status_with_emoji, status_no_emoji, "#696969" |
|
else: |
|
status_with_emoji = f"β
ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" |
|
status_no_emoji = f"ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" |
|
return status_with_emoji, status_no_emoji, "#32CD32" |
|
|
|
except Exception as e: |
|
return f"β ERROR: {str(e)}", "#FF0000" |
|
|
|
def monitor_video(self, video_source): |
|
"""Monitor video source""" |
|
if video_source.isdigit(): |
|
cap = cv2.VideoCapture(int(video_source)) |
|
else: |
|
cap = cv2.VideoCapture(video_source) |
|
|
|
if not cap.isOpened(): |
|
self.status = "β Cannot open video source" |
|
self.status_color = "#FF0000" |
|
return |
|
|
|
|
|
is_mp4 = isinstance(video_source, str) and video_source.lower().endswith('.mp4') |
|
|
|
self.running = True |
|
self.frame_count = 0 |
|
|
|
while self.running: |
|
ret, frame = cap.read() |
|
|
|
|
|
if not ret and is_mp4: |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|
ret, frame = cap.read() |
|
self.frame_count = 0 |
|
|
|
if not ret: |
|
break |
|
|
|
self.frame_count += 1 |
|
current_time = time.time() |
|
|
|
|
|
display_frame = cv2.resize(frame, (640, 480)) |
|
|
|
|
|
if self.running and current_time - self.last_analysis_time >= 10.0: |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Analyzing frame {self.frame_count}...") |
|
self.status, self.display_status, self.status_color = self.analyze_frame(frame) |
|
self.last_analysis_time = current_time |
|
self.last_detection_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z') |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Result: {self.status}") |
|
|
|
|
|
cv2.rectangle(display_frame, (0, 0), (640, 80), (0, 0, 0), -1) |
|
|
|
|
|
if self.status_color == "#32CD32": |
|
color = (50, 205, 50) |
|
elif self.status_color == "#FF4500": |
|
color = (0, 69, 255) |
|
elif self.status_color == "#696969": |
|
color = (105, 105, 105) |
|
else: |
|
color = (0, 0, 255) |
|
|
|
|
|
|
|
cv2.putText(display_frame, self.display_status, (10, 60), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) |
|
|
|
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z') |
|
cv2.putText(display_frame, f"Time: {timestamp}", (10, 460), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) |
|
|
|
|
|
self.current_frame = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB) |
|
|
|
time.sleep(0.04) |
|
|
|
cap.release() |
|
self.status = "Monitoring stopped" |
|
self.status_color = "#808080" |
|
|
|
def start_monitoring(self, video_source): |
|
"""Start monitoring in thread""" |
|
if self.running: |
|
return "Already monitoring" |
|
|
|
if not video_source or (isinstance(video_source, str) and not video_source.strip()): |
|
return "Please provide a video source" |
|
|
|
thread = threading.Thread(target=self.monitor_video, args=(video_source,), daemon=True) |
|
thread.start() |
|
|
|
return f"β
Started monitoring: {video_source}" |
|
|
|
def stop_monitoring(self): |
|
"""Stop monitoring""" |
|
self.running = False |
|
self.current_frame = None |
|
self.status = "π Monitoring stopped" |
|
self.display_status = "Monitoring stopped" |
|
self.status_color = "#808080" |
|
return "π Monitoring stopped" |
|
|
|
def get_frame(self): |
|
"""Get current frame""" |
|
if self.current_frame is not None: |
|
return self.current_frame |
|
else: |
|
|
|
placeholder = np.zeros((480, 640, 3), dtype=np.uint8) |
|
cv2.putText(placeholder, "Waiting for video stream...", (150, 240), |
|
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) |
|
return placeholder |
|
|
|
def get_status(self): |
|
"""Get current status""" |
|
if self.last_detection_time: |
|
return f"{self.status} (Last check: {self.last_detection_time})" |
|
return self.status |
|
|
|
|
|
mcp_server = FireDetectionMCP() |
|
|
|
def create_interface(): |
|
"""Create Gradio interface""" |
|
|
|
with gr.Blocks(title="π₯ Fire Detection MCP Server", theme=gr.themes.Soft()) as interface: |
|
gr.Markdown("# π₯ Fire Detection MCP Server") |
|
gr.Markdown("Real-time fire and smoke detection from video streams (analyzes every 10 seconds)") |
|
gr.Markdown("β οΈ **Usage**: Upload your own video file or use live sources (webcam/RTSP). It may take few seconds to load stream and show analysis. Webcam may not work on HF Spaces.") |
|
gr.Markdown("π **Sample Videos**: [Fire Test Video](https://www.pexels.com/video/a-man-carrying-gear-walking-away-from-a-controlled-fire-8552246/) | [Smoke Test Video](https://www.pexels.com/video/aerial-view-of-controlled-forest-fire-in-spring-31361444/)") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Video Source Options") |
|
|
|
with gr.Tabs(): |
|
with gr.Tab("π Upload Video"): |
|
video_upload = gr.File( |
|
label="Upload MP4 Video", |
|
file_types=[".mp4", ".avi", ".mov"], |
|
type="filepath" |
|
) |
|
upload_btn = gr.Button("π Start Monitoring", variant="primary") |
|
|
|
with gr.Tab("πΉ Live Sources"): |
|
video_input = gr.Textbox( |
|
label="Video Source", |
|
placeholder="0 (webcam), rtsp://url, or path/to/video.mp4", |
|
value="0" |
|
) |
|
live_btn = gr.Button("π Start Monitoring", variant="primary") |
|
|
|
stop_btn = gr.Button("π Stop Monitoring", variant="secondary") |
|
|
|
control_output = gr.Textbox(label="Control Status", interactive=False) |
|
|
|
gr.Markdown("### Detection Status") |
|
status_display = gr.Textbox(label="Current Status", interactive=False) |
|
|
|
gr.Markdown("### Status Legend") |
|
gr.Markdown("π’ Clear | π Fire | β« Smoke | π΄ Error") |
|
|
|
with gr.Column(scale=2): |
|
gr.Markdown("### Live Video Stream") |
|
video_display = gr.Image( |
|
label="Video Feed", |
|
height=480, |
|
width=640, |
|
interactive=False |
|
) |
|
|
|
|
|
def update_display(): |
|
frame = mcp_server.get_frame() |
|
status = mcp_server.get_status() |
|
return frame, status |
|
|
|
|
|
def start_from_upload(video_file): |
|
mcp_server.stop_monitoring() |
|
if video_file is None: |
|
return "β Please upload a video file first" |
|
return mcp_server.start_monitoring(video_file) |
|
|
|
def start_live_source(video_source): |
|
mcp_server.stop_monitoring() |
|
return mcp_server.start_monitoring(video_source) |
|
|
|
upload_btn.click( |
|
fn=start_from_upload, |
|
inputs=video_upload, |
|
outputs=control_output |
|
) |
|
|
|
live_btn.click( |
|
fn=start_live_source, |
|
inputs=video_input, |
|
outputs=control_output |
|
) |
|
|
|
stop_btn.click( |
|
fn=mcp_server.stop_monitoring, |
|
outputs=control_output |
|
) |
|
|
|
|
|
timer = gr.Timer(0.5) |
|
timer.tick( |
|
fn=update_display, |
|
outputs=[video_display, status_display] |
|
) |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
interface = create_interface() |
|
interface.launch(mcp_server=True, server_port=7860, share=False) |