space-sue's picture
fixed installation issues
76d3d08
raw
history blame
5.63 kB
import gradio as gr
from huggingface_hub import InferenceClient
import cv2
import numpy as np
import time
import os
from datetime import datetime
from ultralytics import YOLO
from transformers import BlipProcessor, BlipForQuestionAnswering
import torch
"""
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# Load YOLO-World model
model = YOLO('yolov8x-world.pt')
# Load BLIP model for VQA
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base")
def analyze_fire_scene(frame):
# Run YOLO-World inference with custom prompts
results = model(frame, text=["fire", "flame", "smoke", "burning", "wildfire"])
# Initialize detection flags and details
fire_detected = False
smoke_detected = False
fire_details = []
# Process results
for result in results:
boxes = result.boxes
for box in boxes:
confidence = float(box.conf[0])
if confidence > 0.5:
class_name = result.names[int(box.cls[0])]
if class_name in ['fire', 'flame', 'burning', 'wildfire']:
fire_detected = True
# Get bounding box coordinates
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
# Extract the region of interest
roi = frame[int(y1):int(y2), int(x1):int(x2)]
fire_details.append({
'type': class_name,
'confidence': confidence,
'location': (x1, y1, x2, y2),
'roi': roi
})
elif class_name == 'smoke':
smoke_detected = True
return fire_detected, smoke_detected, fire_details
def get_fire_analysis(frame, fire_details):
# Prepare image for BLIP
inputs = processor(frame, return_tensors="pt")
# Generate questions about the fire
questions = [
"Is there a fire in this image?",
"Is there smoke in this image?",
"Are there any people near the fire?",
"Is the fire spreading?",
"What is the size of the fire?"
]
analysis = []
for question in questions:
# Process question with BLIP
inputs["question"] = question
# Generate answer
with torch.no_grad():
outputs = vqa_model.generate(
**inputs,
max_length=20,
num_beams=3,
min_length=1,
top_p=0.9,
repetition_penalty=1.5,
length_penalty=1.0,
temperature=1.0,
)
answer = processor.decode(outputs[0], skip_special_tokens=True)
analysis.append(f"Q: {question}\nA: {answer}")
return analysis
def check_for_fire():
# Request webcam access
cap = cv2.VideoCapture(0)
if not cap.isOpened():
return "Error: Could not access webcam"
# Read a frame
ret, frame = cap.read()
if not ret:
cap.release()
return "Error: Could not read from webcam"
# Detect fire and smoke
fire_detected, smoke_detected, fire_details = analyze_fire_scene(frame)
# Release webcam
cap.release()
# Get location (you might want to implement a more sophisticated location detection)
location = "Webcam Location" # Replace with actual location detection
if fire_detected:
# Get detailed analysis of the fire
analysis = get_fire_analysis(frame, fire_details)
return f"Fire detected at {location}!\n\nAnalysis:\n" + "\n".join(analysis)
elif smoke_detected:
return f"Smoke detected at {location}!"
else:
return "No fire or smoke detected"
def respond(
message,
history: list[tuple[str, str]],
system_message,
max_tokens,
temperature,
top_p,
):
# Check if user wants to detect fire
if "detect fire" in message.lower():
return check_for_fire()
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
if message.choices[0].delta.content is not None:
token = message.choices[0].delta.content
response += token
yield response
"""
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface
"""
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.95,
step=0.05,
label="Top-p (nucleus sampling)",
),
],
)
if __name__ == "__main__":
demo.launch(mcp_server=True)