File size: 5,634 Bytes
ecde422 c53b4d4 7e73466 76d3d08 7e73466 ecde422 7e73466 76d3d08 7e73466 c53b4d4 7e73466 c53b4d4 7e73466 c53b4d4 7e73466 c53b4d4 7e73466 76d3d08 c53b4d4 7e73466 76d3d08 7e73466 76d3d08 7e73466 c53b4d4 7e73466 76d3d08 7e73466 76d3d08 7e73466 76d3d08 7e73466 c53b4d4 7e73466 c53b4d4 7e73466 c53b4d4 7e73466 c53b4d4 7e73466 ecde422 c53b4d4 ecde422 76d3d08 ecde422 c53b4d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import gradio as gr
from huggingface_hub import InferenceClient
import cv2
import numpy as np
import time
import os
from datetime import datetime
from ultralytics import YOLO
from transformers import BlipProcessor, BlipForQuestionAnswering
import torch
"""
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# Load YOLO-World model
model = YOLO('yolov8x-world.pt')
# Load BLIP model for VQA
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base")
def analyze_fire_scene(frame):
# Run YOLO-World inference with custom prompts
results = model(frame, text=["fire", "flame", "smoke", "burning", "wildfire"])
# Initialize detection flags and details
fire_detected = False
smoke_detected = False
fire_details = []
# Process results
for result in results:
boxes = result.boxes
for box in boxes:
confidence = float(box.conf[0])
if confidence > 0.5:
class_name = result.names[int(box.cls[0])]
if class_name in ['fire', 'flame', 'burning', 'wildfire']:
fire_detected = True
# Get bounding box coordinates
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
# Extract the region of interest
roi = frame[int(y1):int(y2), int(x1):int(x2)]
fire_details.append({
'type': class_name,
'confidence': confidence,
'location': (x1, y1, x2, y2),
'roi': roi
})
elif class_name == 'smoke':
smoke_detected = True
return fire_detected, smoke_detected, fire_details
def get_fire_analysis(frame, fire_details):
# Prepare image for BLIP
inputs = processor(frame, return_tensors="pt")
# Generate questions about the fire
questions = [
"Is there a fire in this image?",
"Is there smoke in this image?",
"Are there any people near the fire?",
"Is the fire spreading?",
"What is the size of the fire?"
]
analysis = []
for question in questions:
# Process question with BLIP
inputs["question"] = question
# Generate answer
with torch.no_grad():
outputs = vqa_model.generate(
**inputs,
max_length=20,
num_beams=3,
min_length=1,
top_p=0.9,
repetition_penalty=1.5,
length_penalty=1.0,
temperature=1.0,
)
answer = processor.decode(outputs[0], skip_special_tokens=True)
analysis.append(f"Q: {question}\nA: {answer}")
return analysis
def check_for_fire():
# Request webcam access
cap = cv2.VideoCapture(0)
if not cap.isOpened():
return "Error: Could not access webcam"
# Read a frame
ret, frame = cap.read()
if not ret:
cap.release()
return "Error: Could not read from webcam"
# Detect fire and smoke
fire_detected, smoke_detected, fire_details = analyze_fire_scene(frame)
# Release webcam
cap.release()
# Get location (you might want to implement a more sophisticated location detection)
location = "Webcam Location" # Replace with actual location detection
if fire_detected:
# Get detailed analysis of the fire
analysis = get_fire_analysis(frame, fire_details)
return f"Fire detected at {location}!\n\nAnalysis:\n" + "\n".join(analysis)
elif smoke_detected:
return f"Smoke detected at {location}!"
else:
return "No fire or smoke detected"
def respond(
message,
history: list[tuple[str, str]],
system_message,
max_tokens,
temperature,
top_p,
):
# Check if user wants to detect fire
if "detect fire" in message.lower():
return check_for_fire()
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
if message.choices[0].delta.content is not None:
token = message.choices[0].delta.content
response += token
yield response
"""
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface
"""
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.95,
step=0.05,
label="Top-p (nucleus sampling)",
),
],
)
if __name__ == "__main__":
demo.launch(mcp_server=True)
|