Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
import cv2 | |
import os | |
import tempfile | |
import numpy as np | |
import pickle | |
from PIL import Image | |
from tqdm import tqdm | |
from torch.utils.data import DataLoader | |
from moviepy import VideoFileClip | |
from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor | |
import spaces | |
import warnings | |
warnings.filterwarnings("ignore") | |
class VideoRAGProcessor: | |
"""Class to handle model initialization and video processing""" | |
def __init__(self): | |
"""Initialize model and processor directly""" | |
print("Loading ColQwen2.5 Omni model... This may take a few minutes.") | |
self.model = ColQwen2_5Omni.from_pretrained( | |
"vidore/colqwen-omni-v0.1", | |
torch_dtype=torch.bfloat16, | |
device_map="cuda" if torch.cuda.is_available() else "cpu", | |
attn_implementation="eager", | |
).eval() | |
self.processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1") | |
print("Model loaded successfully!") | |
def cut_video_into_clips(self, video_path, clip_duration=5): | |
"""Cut video into clips of specified duration (default 5 seconds)""" | |
clips = [] | |
clip_paths = [] | |
clip_timestamps = [] | |
try: | |
clips_dir = "./video_clips" | |
os.makedirs(clips_dir, exist_ok=True) | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = total_frames / fps | |
print(f"Video info: {duration:.2f}s total, {fps:.2f} FPS, {total_frames} frames") | |
frames_per_clip = int(fps * clip_duration) | |
clip_count = 0 | |
current_frame = 0 | |
while current_frame < total_frames: | |
clip_filename = f"clip_{clip_count + 1}.mp4" | |
clip_path = os.path.join(clips_dir, clip_filename) | |
try: | |
start_time = current_frame / fps | |
end_time = min((current_frame + frames_per_clip) / fps, duration) | |
clip_duration_actual = end_time - start_time | |
if clip_duration_actual < 1.0: | |
print(f"Skipping clip {clip_count + 1} - too short ({clip_duration_actual:.1f}s)") | |
current_frame += frames_per_clip | |
continue | |
if end_time >= duration: | |
end_time = duration - 0.1 | |
clip_duration_actual = end_time - start_time | |
if clip_duration_actual < 1.0: | |
print(f"Skipping final clip - too short after adjustment ({clip_duration_actual:.1f}s)") | |
break | |
try: | |
video_clip = VideoFileClip(video_path) | |
sub_clip = video_clip.subclip(start_time, end_time) | |
sub_clip.write_videofile(clip_path, verbose=False, logger=None) | |
video_clip.close() | |
except AttributeError: | |
try: | |
video_clip = VideoFileClip(video_path) | |
sub_clip = video_clip.subclipped(start_time, end_time) | |
sub_clip.write_videofile(clip_path, verbose=False, logger=None) | |
video_clip.close() | |
except (AttributeError, Exception): | |
import subprocess | |
cmd = [ | |
'ffmpeg', '-i', video_path, | |
'-ss', str(start_time), | |
'-t', str(clip_duration_actual), | |
'-c', 'copy', | |
'-avoid_negative_ts', 'make_zero', | |
'-y', clip_path | |
] | |
subprocess.run(cmd, capture_output=True, check=True) | |
clip_timestamps.append({ | |
'clip_id': clip_count + 1, | |
'start_time': start_time, | |
'end_time': end_time, | |
'duration': clip_duration_actual | |
}) | |
if clip_duration_actual < clip_duration: | |
clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s) [Final clip - {clip_duration_actual:.1f}s]") | |
else: | |
clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s)") | |
clip_paths.append(clip_path) | |
clip_count += 1 | |
current_frame += frames_per_clip | |
except Exception as e: | |
print(f"Error creating clip {clip_count}: {str(e)}") | |
current_frame += frames_per_clip | |
continue | |
cap.release() | |
print(f"Successfully created {len(clip_paths)} clips from {duration:.2f}s video") | |
return clips, clip_paths, clip_timestamps | |
except Exception as e: | |
print(f"Error in cut_video_into_clips: {str(e)}") | |
return [], [], [] | |
def process_and_analyze_video(self, video_file, query=None): | |
"""Process video and optionally analyze with query in single GPU call""" | |
if video_file is None: | |
return "β Please upload a video file.", [], "" | |
try: | |
status_msg = "π¬ Processing video..." | |
# Clean up old clips | |
clips_dir = "./video_clips" | |
if os.path.exists(clips_dir): | |
for file in os.listdir(clips_dir): | |
try: | |
os.remove(os.path.join(clips_dir, file)) | |
except: | |
pass | |
# Cut video into clips | |
status_msg += "\n㪠Cutting video into 5-second clips..." | |
clips_info, clip_paths, clip_timestamps = self.cut_video_into_clips(video_file.name, clip_duration=5) | |
if not clip_paths: | |
return "β Error cutting video into clips.", [], "" | |
status_msg += f"\nβ Created {len(clip_paths)} clips" | |
# Generate embeddings | |
status_msg += "\nπ Generating embeddings for video clips..." | |
embeddings = [] | |
for i, clip_path in enumerate(tqdm(clip_paths, desc="Processing clips")): | |
try: | |
batch_doc = self.processor.process_videos([clip_path]) | |
with torch.no_grad(): | |
device = next(self.model.parameters()).device | |
batch_doc = {k: v.to(device) for k, v in batch_doc.items()} | |
embedding = self.model(**batch_doc) | |
embeddings.extend(list(torch.unbind(embedding.to("cpu")))) | |
except Exception as e: | |
print(f"Error processing clip {i+1}: {e}") | |
continue | |
status_msg += f"\nβ Generated embeddings for {len(embeddings)} clips" | |
# Save embeddings and metadata to disk for persistence | |
embeddings_data = { | |
'embeddings': embeddings, | |
'clip_timestamps': clip_timestamps, | |
'clips_info': clips_info | |
} | |
with open('./video_embeddings.pkl', 'wb') as f: | |
pickle.dump(embeddings_data, f) | |
status_msg += "\nπΎ Embeddings saved to disk" | |
# If query provided, analyze immediately | |
analysis_result = "" | |
if query and query.strip(): | |
status_msg += f"\nπ Analyzing query: '{query}'" | |
analysis_result = self._analyze_with_embeddings(query, embeddings, clip_timestamps) | |
else: | |
status_msg += "\nπ― Ready for queries!" | |
return status_msg, clips_info, analysis_result | |
except Exception as e: | |
return f"β Error processing video: {str(e)}", [], "" | |
def _analyze_with_embeddings(self, query, embeddings, clip_timestamps): | |
"""Internal method to analyze with provided embeddings""" | |
try: | |
# Process query | |
batch_queries = self.processor.process_queries([query]) | |
device = next(self.model.parameters()).device | |
batch_queries = {k: v.to(device) for k, v in batch_queries.items()} | |
# Generate query embedding | |
with torch.no_grad(): | |
query_embedding = self.model(**batch_queries) | |
# Calculate scores | |
scores = self.processor.score_multi_vector(query_embedding, embeddings) | |
relevance_threshold = 0.5 | |
relevant_clips = [] | |
for idx, score in enumerate(scores[0]): | |
if score.item() > relevance_threshold: | |
timestamp_info = clip_timestamps[idx] | |
relevant_clips.append({ | |
'clip_id': idx + 1, | |
'score': score.item(), | |
'start_time': timestamp_info['start_time'], | |
'end_time': timestamp_info['end_time'] | |
}) | |
relevant_clips.sort(key=lambda x: x['score'], reverse=True) | |
# Generate response | |
if relevant_clips: | |
question_words = ['does', 'do', 'is', 'are', 'was', 'were', 'can', 'could', 'will', 'would', 'has', 'have'] | |
is_question = any(query.lower().strip().startswith(word) for word in question_words) or query.strip().endswith('?') | |
if is_question: | |
response = f"β **Yes.** The following moments show activity matching '{query}':\n\n" | |
else: | |
response = f"π **Analysis Results** for '{query}':\n\n" | |
response += "**π Relevant Time Segments:**\n" | |
for i, clip in enumerate(relevant_clips[:5]): | |
start_min = int(clip['start_time'] // 60) | |
start_sec = int(clip['start_time'] % 60) | |
end_min = int(clip['end_time'] // 60) | |
end_sec = int(clip['end_time'] % 60) | |
confidence = "High" if clip['score'] > 0.8 else "Medium" if clip['score'] > 0.65 else "Low" | |
response += f"β’ **{start_min:02d}:{start_sec:02d} - {end_min:02d}:{end_sec:02d}** " | |
response += f"(Confidence: {confidence}, Score: {clip['score']:.3f})\n" | |
total_duration = sum(clip['end_time'] - clip['start_time'] for clip in relevant_clips) | |
response += f"\nπ **Summary:** {len(relevant_clips)} relevant segment(s) found, " | |
response += f"totaling {total_duration:.1f} seconds of relevant footage." | |
else: | |
is_question = any(query.lower().strip().startswith(word) for word in ['does', 'do', 'is', 'are', 'was', 'were', 'can', 'could', 'will', 'would', 'has', 'have']) or query.strip().endswith('?') | |
if is_question: | |
response = f"β **No.** No clear evidence found for '{query}' in the analyzed footage." | |
else: | |
response = f"π **No Results** found for '{query}' in the analyzed footage." | |
response += f"\n\nπ‘ **Suggestion:** Try rephrasing your query or check if the activity occurs in a different time period." | |
best_score = max(scores[0]).item() | |
response += f"\n\nπ§ **Technical Details:**\n" | |
response += f"β’ Analyzed {len(embeddings)} video segments\n" | |
response += f"β’ Highest similarity score: {best_score:.3f}\n" | |
response += f"β’ Relevance threshold: {relevance_threshold}\n" | |
return response | |
except Exception as e: | |
return f"β Error during analysis: {str(e)}" | |
# Initialize processor instance (this will be recreated in each GPU call) | |
def get_video_rag(): | |
return VideoRAGProcessor() | |
def process_video_only(video_file): | |
"""Process video without query""" | |
video_rag = get_video_rag() | |
status, clips, _ = video_rag.process_and_analyze_video(video_file) | |
return status, clips | |
def process_video_with_query(video_file, query): | |
"""Process video and analyze query in single GPU call""" | |
video_rag = get_video_rag() | |
status, clips, analysis = video_rag.process_and_analyze_video(video_file, query) | |
return status, clips, analysis | |
def analyze_with_saved_embeddings(query): | |
"""Analyze query using saved embeddings""" | |
try: | |
# Load saved embeddings | |
if not os.path.exists('./video_embeddings.pkl'): | |
return "β No video processed. Please upload and process a video first." | |
with open('./video_embeddings.pkl', 'rb') as f: | |
data = pickle.load(f) | |
embeddings = data['embeddings'] | |
clip_timestamps = data['clip_timestamps'] | |
# Initialize processor for analysis | |
video_rag = get_video_rag() | |
result = video_rag._analyze_with_embeddings(query, embeddings, clip_timestamps) | |
return result | |
except Exception as e: | |
return f"β Error loading embeddings or analyzing: {str(e)}" | |
# Gradio interface functions | |
def process_video_interface(video_file): | |
"""Interface function for processing video only""" | |
return process_video_only(video_file) | |
def analyze_query_interface(query): | |
"""Interface function for analyzing query""" | |
return analyze_with_saved_embeddings(query) | |
def process_and_analyze_interface(video_file, query): | |
"""Interface function for processing video with immediate query""" | |
if query and query.strip(): | |
return process_video_with_query(video_file, query) | |
else: | |
status, clips = process_video_only(video_file) | |
return status, clips, "" | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="Security Camera AI Assistant", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π₯ Security Camera AI Assistant") | |
gr.Markdown("Upload security footage and ask questions about what happened. Get detailed analysis with precise timestamps!") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("## π€ Upload Security Footage") | |
video_input = gr.File( | |
label="Upload Video File", | |
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"], | |
type="filepath" | |
) | |
# Option to process with immediate query | |
immediate_query = gr.Textbox( | |
label="Optional: Query to analyze immediately after processing", | |
placeholder="Leave empty to just process, or enter a query to analyze right away", | |
lines=2 | |
) | |
process_btn = gr.Button("π¬ Process Video", variant="primary", size="lg") | |
processing_status = gr.Textbox( | |
label="Processing Status", | |
lines=8, | |
value="Model is ready! Upload a video and click 'Process Video' to start" | |
) | |
clips_list = gr.JSON( | |
label="Generated Clips", | |
value=[] | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("## π Ask Questions About the Footage") | |
query_input = gr.Textbox( | |
label="Security Analysis Query", | |
placeholder="e.g., 'Does a person in grey shirt approach the building?', 'Is there any suspicious activity?', 'Show me when cars are parked'", | |
lines=3 | |
) | |
analyze_btn = gr.Button("π Analyze Footage", variant="secondary", size="lg") | |
analysis_results = gr.Textbox( | |
label="Analysis Results", | |
lines=15, | |
value="Process a video first, then ask questions about what you want to find in the footage." | |
) | |
# Event handlers | |
process_btn.click( | |
process_and_analyze_interface, | |
inputs=[video_input, immediate_query], | |
outputs=[processing_status, clips_list, analysis_results] | |
) | |
analyze_btn.click( | |
analyze_query_interface, | |
inputs=[query_input], | |
outputs=[analysis_results] | |
) | |
query_input.submit( | |
analyze_query_interface, | |
inputs=[query_input], | |
outputs=[analysis_results] | |
) | |
gr.Markdown(""" | |
## π How to Use: | |
1. **Upload**: Choose your security camera video file (MP4, AVI, MOV, MKV, WebM) | |
2. **Process**: Click 'Process Video' to analyze the footage (this creates 5-second segments) | |
3. **Ask**: Type your question about what you want to find in the footage | |
4. **Analyze**: Click 'Analyze Footage' to get detailed results with timestamps | |
## π‘ Pro Tip: | |
- You can enter a query in the "Optional" field to analyze immediately after processing | |
- This saves GPU time by doing both operations in a single call | |
## π Example Questions: | |
- "Does a person in red clothing enter the building?" | |
- "Is there any suspicious activity near the entrance?" | |
- "Show me when vehicles are present" | |
- "Are there people walking by during daytime?" | |
- "Is there movement in the parking area?" | |
## π§ Features: | |
- βοΈ Automatic video segmentation into 5-second clips | |
- π§ AI-powered semantic video analysis using ColQwen2.5 Omni | |
- π Precise timestamp reporting (MM:SS format) | |
- π Confidence scoring for each detection | |
- π― Yes/No question answering for security queries | |
- β‘ Smart relevance filtering to show only significant matches | |
- πΎ Persistent embeddings storage for multiple queries | |
""") | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch() |