CCTV_Assistant / app.py
NikhilJoson's picture
Update app.py
9c81743 verified
import gradio as gr
import torch
import cv2
import os
import tempfile
import numpy as np
import pickle
from PIL import Image
from tqdm import tqdm
from torch.utils.data import DataLoader
from moviepy import VideoFileClip
from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
import spaces
import warnings
warnings.filterwarnings("ignore")
class VideoRAGProcessor:
"""Class to handle model initialization and video processing"""
def __init__(self):
"""Initialize model and processor directly"""
print("Loading ColQwen2.5 Omni model... This may take a few minutes.")
self.model = ColQwen2_5Omni.from_pretrained(
"vidore/colqwen-omni-v0.1",
torch_dtype=torch.bfloat16,
device_map="cuda" if torch.cuda.is_available() else "cpu",
attn_implementation="eager",
).eval()
self.processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1")
print("Model loaded successfully!")
def cut_video_into_clips(self, video_path, clip_duration=5):
"""Cut video into clips of specified duration (default 5 seconds)"""
clips = []
clip_paths = []
clip_timestamps = []
try:
clips_dir = "./video_clips"
os.makedirs(clips_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
print(f"Video info: {duration:.2f}s total, {fps:.2f} FPS, {total_frames} frames")
frames_per_clip = int(fps * clip_duration)
clip_count = 0
current_frame = 0
while current_frame < total_frames:
clip_filename = f"clip_{clip_count + 1}.mp4"
clip_path = os.path.join(clips_dir, clip_filename)
try:
start_time = current_frame / fps
end_time = min((current_frame + frames_per_clip) / fps, duration)
clip_duration_actual = end_time - start_time
if clip_duration_actual < 1.0:
print(f"Skipping clip {clip_count + 1} - too short ({clip_duration_actual:.1f}s)")
current_frame += frames_per_clip
continue
if end_time >= duration:
end_time = duration - 0.1
clip_duration_actual = end_time - start_time
if clip_duration_actual < 1.0:
print(f"Skipping final clip - too short after adjustment ({clip_duration_actual:.1f}s)")
break
try:
video_clip = VideoFileClip(video_path)
sub_clip = video_clip.subclip(start_time, end_time)
sub_clip.write_videofile(clip_path, verbose=False, logger=None)
video_clip.close()
except AttributeError:
try:
video_clip = VideoFileClip(video_path)
sub_clip = video_clip.subclipped(start_time, end_time)
sub_clip.write_videofile(clip_path, verbose=False, logger=None)
video_clip.close()
except (AttributeError, Exception):
import subprocess
cmd = [
'ffmpeg', '-i', video_path,
'-ss', str(start_time),
'-t', str(clip_duration_actual),
'-c', 'copy',
'-avoid_negative_ts', 'make_zero',
'-y', clip_path
]
subprocess.run(cmd, capture_output=True, check=True)
clip_timestamps.append({
'clip_id': clip_count + 1,
'start_time': start_time,
'end_time': end_time,
'duration': clip_duration_actual
})
if clip_duration_actual < clip_duration:
clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s) [Final clip - {clip_duration_actual:.1f}s]")
else:
clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s)")
clip_paths.append(clip_path)
clip_count += 1
current_frame += frames_per_clip
except Exception as e:
print(f"Error creating clip {clip_count}: {str(e)}")
current_frame += frames_per_clip
continue
cap.release()
print(f"Successfully created {len(clip_paths)} clips from {duration:.2f}s video")
return clips, clip_paths, clip_timestamps
except Exception as e:
print(f"Error in cut_video_into_clips: {str(e)}")
return [], [], []
def process_and_analyze_video(self, video_file, query=None):
"""Process video and optionally analyze with query in single GPU call"""
if video_file is None:
return "❌ Please upload a video file.", [], ""
try:
status_msg = "🎬 Processing video..."
# Clean up old clips
clips_dir = "./video_clips"
if os.path.exists(clips_dir):
for file in os.listdir(clips_dir):
try:
os.remove(os.path.join(clips_dir, file))
except:
pass
# Cut video into clips
status_msg += "\n🎬 Cutting video into 5-second clips..."
clips_info, clip_paths, clip_timestamps = self.cut_video_into_clips(video_file.name, clip_duration=5)
if not clip_paths:
return "❌ Error cutting video into clips.", [], ""
status_msg += f"\nβœ… Created {len(clip_paths)} clips"
# Generate embeddings
status_msg += "\nπŸ”„ Generating embeddings for video clips..."
embeddings = []
for i, clip_path in enumerate(tqdm(clip_paths, desc="Processing clips")):
try:
batch_doc = self.processor.process_videos([clip_path])
with torch.no_grad():
device = next(self.model.parameters()).device
batch_doc = {k: v.to(device) for k, v in batch_doc.items()}
embedding = self.model(**batch_doc)
embeddings.extend(list(torch.unbind(embedding.to("cpu"))))
except Exception as e:
print(f"Error processing clip {i+1}: {e}")
continue
status_msg += f"\nβœ… Generated embeddings for {len(embeddings)} clips"
# Save embeddings and metadata to disk for persistence
embeddings_data = {
'embeddings': embeddings,
'clip_timestamps': clip_timestamps,
'clips_info': clips_info
}
with open('./video_embeddings.pkl', 'wb') as f:
pickle.dump(embeddings_data, f)
status_msg += "\nπŸ’Ύ Embeddings saved to disk"
# If query provided, analyze immediately
analysis_result = ""
if query and query.strip():
status_msg += f"\nπŸ” Analyzing query: '{query}'"
analysis_result = self._analyze_with_embeddings(query, embeddings, clip_timestamps)
else:
status_msg += "\n🎯 Ready for queries!"
return status_msg, clips_info, analysis_result
except Exception as e:
return f"❌ Error processing video: {str(e)}", [], ""
def _analyze_with_embeddings(self, query, embeddings, clip_timestamps):
"""Internal method to analyze with provided embeddings"""
try:
# Process query
batch_queries = self.processor.process_queries([query])
device = next(self.model.parameters()).device
batch_queries = {k: v.to(device) for k, v in batch_queries.items()}
# Generate query embedding
with torch.no_grad():
query_embedding = self.model(**batch_queries)
# Calculate scores
scores = self.processor.score_multi_vector(query_embedding, embeddings)
relevance_threshold = 0.5
relevant_clips = []
for idx, score in enumerate(scores[0]):
if score.item() > relevance_threshold:
timestamp_info = clip_timestamps[idx]
relevant_clips.append({
'clip_id': idx + 1,
'score': score.item(),
'start_time': timestamp_info['start_time'],
'end_time': timestamp_info['end_time']
})
relevant_clips.sort(key=lambda x: x['score'], reverse=True)
# Generate response
if relevant_clips:
question_words = ['does', 'do', 'is', 'are', 'was', 'were', 'can', 'could', 'will', 'would', 'has', 'have']
is_question = any(query.lower().strip().startswith(word) for word in question_words) or query.strip().endswith('?')
if is_question:
response = f"βœ… **Yes.** The following moments show activity matching '{query}':\n\n"
else:
response = f"πŸ” **Analysis Results** for '{query}':\n\n"
response += "**πŸ“ Relevant Time Segments:**\n"
for i, clip in enumerate(relevant_clips[:5]):
start_min = int(clip['start_time'] // 60)
start_sec = int(clip['start_time'] % 60)
end_min = int(clip['end_time'] // 60)
end_sec = int(clip['end_time'] % 60)
confidence = "High" if clip['score'] > 0.8 else "Medium" if clip['score'] > 0.65 else "Low"
response += f"β€’ **{start_min:02d}:{start_sec:02d} - {end_min:02d}:{end_sec:02d}** "
response += f"(Confidence: {confidence}, Score: {clip['score']:.3f})\n"
total_duration = sum(clip['end_time'] - clip['start_time'] for clip in relevant_clips)
response += f"\nπŸ“Š **Summary:** {len(relevant_clips)} relevant segment(s) found, "
response += f"totaling {total_duration:.1f} seconds of relevant footage."
else:
is_question = any(query.lower().strip().startswith(word) for word in ['does', 'do', 'is', 'are', 'was', 'were', 'can', 'could', 'will', 'would', 'has', 'have']) or query.strip().endswith('?')
if is_question:
response = f"❌ **No.** No clear evidence found for '{query}' in the analyzed footage."
else:
response = f"πŸ” **No Results** found for '{query}' in the analyzed footage."
response += f"\n\nπŸ’‘ **Suggestion:** Try rephrasing your query or check if the activity occurs in a different time period."
best_score = max(scores[0]).item()
response += f"\n\nπŸ”§ **Technical Details:**\n"
response += f"β€’ Analyzed {len(embeddings)} video segments\n"
response += f"β€’ Highest similarity score: {best_score:.3f}\n"
response += f"β€’ Relevance threshold: {relevance_threshold}\n"
return response
except Exception as e:
return f"❌ Error during analysis: {str(e)}"
# Initialize processor instance (this will be recreated in each GPU call)
def get_video_rag():
return VideoRAGProcessor()
@spaces.GPU
def process_video_only(video_file):
"""Process video without query"""
video_rag = get_video_rag()
status, clips, _ = video_rag.process_and_analyze_video(video_file)
return status, clips
@spaces.GPU
def process_video_with_query(video_file, query):
"""Process video and analyze query in single GPU call"""
video_rag = get_video_rag()
status, clips, analysis = video_rag.process_and_analyze_video(video_file, query)
return status, clips, analysis
@spaces.GPU
def analyze_with_saved_embeddings(query):
"""Analyze query using saved embeddings"""
try:
# Load saved embeddings
if not os.path.exists('./video_embeddings.pkl'):
return "❌ No video processed. Please upload and process a video first."
with open('./video_embeddings.pkl', 'rb') as f:
data = pickle.load(f)
embeddings = data['embeddings']
clip_timestamps = data['clip_timestamps']
# Initialize processor for analysis
video_rag = get_video_rag()
result = video_rag._analyze_with_embeddings(query, embeddings, clip_timestamps)
return result
except Exception as e:
return f"❌ Error loading embeddings or analyzing: {str(e)}"
# Gradio interface functions
def process_video_interface(video_file):
"""Interface function for processing video only"""
return process_video_only(video_file)
def analyze_query_interface(query):
"""Interface function for analyzing query"""
return analyze_with_saved_embeddings(query)
def process_and_analyze_interface(video_file, query):
"""Interface function for processing video with immediate query"""
if query and query.strip():
return process_video_with_query(video_file, query)
else:
status, clips = process_video_only(video_file)
return status, clips, ""
# Create Gradio interface
def create_interface():
with gr.Blocks(title="Security Camera AI Assistant", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸŽ₯ Security Camera AI Assistant")
gr.Markdown("Upload security footage and ask questions about what happened. Get detailed analysis with precise timestamps!")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## πŸ“€ Upload Security Footage")
video_input = gr.File(
label="Upload Video File",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
# Option to process with immediate query
immediate_query = gr.Textbox(
label="Optional: Query to analyze immediately after processing",
placeholder="Leave empty to just process, or enter a query to analyze right away",
lines=2
)
process_btn = gr.Button("🎬 Process Video", variant="primary", size="lg")
processing_status = gr.Textbox(
label="Processing Status",
lines=8,
value="Model is ready! Upload a video and click 'Process Video' to start"
)
clips_list = gr.JSON(
label="Generated Clips",
value=[]
)
with gr.Column(scale=1):
gr.Markdown("## πŸ” Ask Questions About the Footage")
query_input = gr.Textbox(
label="Security Analysis Query",
placeholder="e.g., 'Does a person in grey shirt approach the building?', 'Is there any suspicious activity?', 'Show me when cars are parked'",
lines=3
)
analyze_btn = gr.Button("πŸ” Analyze Footage", variant="secondary", size="lg")
analysis_results = gr.Textbox(
label="Analysis Results",
lines=15,
value="Process a video first, then ask questions about what you want to find in the footage."
)
# Event handlers
process_btn.click(
process_and_analyze_interface,
inputs=[video_input, immediate_query],
outputs=[processing_status, clips_list, analysis_results]
)
analyze_btn.click(
analyze_query_interface,
inputs=[query_input],
outputs=[analysis_results]
)
query_input.submit(
analyze_query_interface,
inputs=[query_input],
outputs=[analysis_results]
)
gr.Markdown("""
## πŸ“ How to Use:
1. **Upload**: Choose your security camera video file (MP4, AVI, MOV, MKV, WebM)
2. **Process**: Click 'Process Video' to analyze the footage (this creates 5-second segments)
3. **Ask**: Type your question about what you want to find in the footage
4. **Analyze**: Click 'Analyze Footage' to get detailed results with timestamps
## πŸ’‘ Pro Tip:
- You can enter a query in the "Optional" field to analyze immediately after processing
- This saves GPU time by doing both operations in a single call
## πŸ” Example Questions:
- "Does a person in red clothing enter the building?"
- "Is there any suspicious activity near the entrance?"
- "Show me when vehicles are present"
- "Are there people walking by during daytime?"
- "Is there movement in the parking area?"
## πŸ”§ Features:
- βœ‚οΈ Automatic video segmentation into 5-second clips
- 🧠 AI-powered semantic video analysis using ColQwen2.5 Omni
- πŸ“ Precise timestamp reporting (MM:SS format)
- πŸ“Š Confidence scoring for each detection
- 🎯 Yes/No question answering for security queries
- ⚑ Smart relevance filtering to show only significant matches
- πŸ’Ύ Persistent embeddings storage for multiple queries
""")
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch()