import gradio as gr import torch import cv2 import os import tempfile import numpy as np import pickle from PIL import Image from tqdm import tqdm from torch.utils.data import DataLoader from moviepy import VideoFileClip from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor import spaces import warnings warnings.filterwarnings("ignore") class VideoRAGProcessor: """Class to handle model initialization and video processing""" def __init__(self): """Initialize model and processor directly""" print("Loading ColQwen2.5 Omni model... This may take a few minutes.") self.model = ColQwen2_5Omni.from_pretrained( "vidore/colqwen-omni-v0.1", torch_dtype=torch.bfloat16, device_map="cuda" if torch.cuda.is_available() else "cpu", attn_implementation="eager", ).eval() self.processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1") print("Model loaded successfully!") def cut_video_into_clips(self, video_path, clip_duration=5): """Cut video into clips of specified duration (default 5 seconds)""" clips = [] clip_paths = [] clip_timestamps = [] try: clips_dir = "./video_clips" os.makedirs(clips_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps print(f"Video info: {duration:.2f}s total, {fps:.2f} FPS, {total_frames} frames") frames_per_clip = int(fps * clip_duration) clip_count = 0 current_frame = 0 while current_frame < total_frames: clip_filename = f"clip_{clip_count + 1}.mp4" clip_path = os.path.join(clips_dir, clip_filename) try: start_time = current_frame / fps end_time = min((current_frame + frames_per_clip) / fps, duration) clip_duration_actual = end_time - start_time if clip_duration_actual < 1.0: print(f"Skipping clip {clip_count + 1} - too short ({clip_duration_actual:.1f}s)") current_frame += frames_per_clip continue if end_time >= duration: end_time = duration - 0.1 clip_duration_actual = end_time - start_time if clip_duration_actual < 1.0: print(f"Skipping final clip - too short after adjustment ({clip_duration_actual:.1f}s)") break try: video_clip = VideoFileClip(video_path) sub_clip = video_clip.subclip(start_time, end_time) sub_clip.write_videofile(clip_path, verbose=False, logger=None) video_clip.close() except AttributeError: try: video_clip = VideoFileClip(video_path) sub_clip = video_clip.subclipped(start_time, end_time) sub_clip.write_videofile(clip_path, verbose=False, logger=None) video_clip.close() except (AttributeError, Exception): import subprocess cmd = [ 'ffmpeg', '-i', video_path, '-ss', str(start_time), '-t', str(clip_duration_actual), '-c', 'copy', '-avoid_negative_ts', 'make_zero', '-y', clip_path ] subprocess.run(cmd, capture_output=True, check=True) clip_timestamps.append({ 'clip_id': clip_count + 1, 'start_time': start_time, 'end_time': end_time, 'duration': clip_duration_actual }) if clip_duration_actual < clip_duration: clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s) [Final clip - {clip_duration_actual:.1f}s]") else: clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s)") clip_paths.append(clip_path) clip_count += 1 current_frame += frames_per_clip except Exception as e: print(f"Error creating clip {clip_count}: {str(e)}") current_frame += frames_per_clip continue cap.release() print(f"Successfully created {len(clip_paths)} clips from {duration:.2f}s video") return clips, clip_paths, clip_timestamps except Exception as e: print(f"Error in cut_video_into_clips: {str(e)}") return [], [], [] def process_and_analyze_video(self, video_file, query=None): """Process video and optionally analyze with query in single GPU call""" if video_file is None: return "āŒ Please upload a video file.", [], "" try: status_msg = "šŸŽ¬ Processing video..." # Clean up old clips clips_dir = "./video_clips" if os.path.exists(clips_dir): for file in os.listdir(clips_dir): try: os.remove(os.path.join(clips_dir, file)) except: pass # Cut video into clips status_msg += "\nšŸŽ¬ Cutting video into 5-second clips..." clips_info, clip_paths, clip_timestamps = self.cut_video_into_clips(video_file.name, clip_duration=5) if not clip_paths: return "āŒ Error cutting video into clips.", [], "" status_msg += f"\nāœ… Created {len(clip_paths)} clips" # Generate embeddings status_msg += "\nšŸ”„ Generating embeddings for video clips..." embeddings = [] for i, clip_path in enumerate(tqdm(clip_paths, desc="Processing clips")): try: batch_doc = self.processor.process_videos([clip_path]) with torch.no_grad(): device = next(self.model.parameters()).device batch_doc = {k: v.to(device) for k, v in batch_doc.items()} embedding = self.model(**batch_doc) embeddings.extend(list(torch.unbind(embedding.to("cpu")))) except Exception as e: print(f"Error processing clip {i+1}: {e}") continue status_msg += f"\nāœ… Generated embeddings for {len(embeddings)} clips" # Save embeddings and metadata to disk for persistence embeddings_data = { 'embeddings': embeddings, 'clip_timestamps': clip_timestamps, 'clips_info': clips_info } with open('./video_embeddings.pkl', 'wb') as f: pickle.dump(embeddings_data, f) status_msg += "\nšŸ’¾ Embeddings saved to disk" # If query provided, analyze immediately analysis_result = "" if query and query.strip(): status_msg += f"\nšŸ” Analyzing query: '{query}'" analysis_result = self._analyze_with_embeddings(query, embeddings, clip_timestamps) else: status_msg += "\nšŸŽÆ Ready for queries!" return status_msg, clips_info, analysis_result except Exception as e: return f"āŒ Error processing video: {str(e)}", [], "" def _analyze_with_embeddings(self, query, embeddings, clip_timestamps): """Internal method to analyze with provided embeddings""" try: # Process query batch_queries = self.processor.process_queries([query]) device = next(self.model.parameters()).device batch_queries = {k: v.to(device) for k, v in batch_queries.items()} # Generate query embedding with torch.no_grad(): query_embedding = self.model(**batch_queries) # Calculate scores scores = self.processor.score_multi_vector(query_embedding, embeddings) relevance_threshold = 0.5 relevant_clips = [] for idx, score in enumerate(scores[0]): if score.item() > relevance_threshold: timestamp_info = clip_timestamps[idx] relevant_clips.append({ 'clip_id': idx + 1, 'score': score.item(), 'start_time': timestamp_info['start_time'], 'end_time': timestamp_info['end_time'] }) relevant_clips.sort(key=lambda x: x['score'], reverse=True) # Generate response if relevant_clips: question_words = ['does', 'do', 'is', 'are', 'was', 'were', 'can', 'could', 'will', 'would', 'has', 'have'] is_question = any(query.lower().strip().startswith(word) for word in question_words) or query.strip().endswith('?') if is_question: response = f"āœ… **Yes.** The following moments show activity matching '{query}':\n\n" else: response = f"šŸ” **Analysis Results** for '{query}':\n\n" response += "**šŸ“ Relevant Time Segments:**\n" for i, clip in enumerate(relevant_clips[:5]): start_min = int(clip['start_time'] // 60) start_sec = int(clip['start_time'] % 60) end_min = int(clip['end_time'] // 60) end_sec = int(clip['end_time'] % 60) confidence = "High" if clip['score'] > 0.8 else "Medium" if clip['score'] > 0.65 else "Low" response += f"• **{start_min:02d}:{start_sec:02d} - {end_min:02d}:{end_sec:02d}** " response += f"(Confidence: {confidence}, Score: {clip['score']:.3f})\n" total_duration = sum(clip['end_time'] - clip['start_time'] for clip in relevant_clips) response += f"\nšŸ“Š **Summary:** {len(relevant_clips)} relevant segment(s) found, " response += f"totaling {total_duration:.1f} seconds of relevant footage." else: is_question = any(query.lower().strip().startswith(word) for word in ['does', 'do', 'is', 'are', 'was', 'were', 'can', 'could', 'will', 'would', 'has', 'have']) or query.strip().endswith('?') if is_question: response = f"āŒ **No.** No clear evidence found for '{query}' in the analyzed footage." else: response = f"šŸ” **No Results** found for '{query}' in the analyzed footage." response += f"\n\nšŸ’” **Suggestion:** Try rephrasing your query or check if the activity occurs in a different time period." best_score = max(scores[0]).item() response += f"\n\nšŸ”§ **Technical Details:**\n" response += f"• Analyzed {len(embeddings)} video segments\n" response += f"• Highest similarity score: {best_score:.3f}\n" response += f"• Relevance threshold: {relevance_threshold}\n" return response except Exception as e: return f"āŒ Error during analysis: {str(e)}" # Initialize processor instance (this will be recreated in each GPU call) def get_video_rag(): return VideoRAGProcessor() @spaces.GPU def process_video_only(video_file): """Process video without query""" video_rag = get_video_rag() status, clips, _ = video_rag.process_and_analyze_video(video_file) return status, clips @spaces.GPU def process_video_with_query(video_file, query): """Process video and analyze query in single GPU call""" video_rag = get_video_rag() status, clips, analysis = video_rag.process_and_analyze_video(video_file, query) return status, clips, analysis @spaces.GPU def analyze_with_saved_embeddings(query): """Analyze query using saved embeddings""" try: # Load saved embeddings if not os.path.exists('./video_embeddings.pkl'): return "āŒ No video processed. Please upload and process a video first." with open('./video_embeddings.pkl', 'rb') as f: data = pickle.load(f) embeddings = data['embeddings'] clip_timestamps = data['clip_timestamps'] # Initialize processor for analysis video_rag = get_video_rag() result = video_rag._analyze_with_embeddings(query, embeddings, clip_timestamps) return result except Exception as e: return f"āŒ Error loading embeddings or analyzing: {str(e)}" # Gradio interface functions def process_video_interface(video_file): """Interface function for processing video only""" return process_video_only(video_file) def analyze_query_interface(query): """Interface function for analyzing query""" return analyze_with_saved_embeddings(query) def process_and_analyze_interface(video_file, query): """Interface function for processing video with immediate query""" if query and query.strip(): return process_video_with_query(video_file, query) else: status, clips = process_video_only(video_file) return status, clips, "" # Create Gradio interface def create_interface(): with gr.Blocks(title="Security Camera AI Assistant", theme=gr.themes.Soft()) as demo: gr.Markdown("# šŸŽ„ Security Camera AI Assistant") gr.Markdown("Upload security footage and ask questions about what happened. Get detailed analysis with precise timestamps!") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## šŸ“¤ Upload Security Footage") video_input = gr.File( label="Upload Video File", file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"], type="filepath" ) # Option to process with immediate query immediate_query = gr.Textbox( label="Optional: Query to analyze immediately after processing", placeholder="Leave empty to just process, or enter a query to analyze right away", lines=2 ) process_btn = gr.Button("šŸŽ¬ Process Video", variant="primary", size="lg") processing_status = gr.Textbox( label="Processing Status", lines=8, value="Model is ready! Upload a video and click 'Process Video' to start" ) clips_list = gr.JSON( label="Generated Clips", value=[] ) with gr.Column(scale=1): gr.Markdown("## šŸ” Ask Questions About the Footage") query_input = gr.Textbox( label="Security Analysis Query", placeholder="e.g., 'Does a person in grey shirt approach the building?', 'Is there any suspicious activity?', 'Show me when cars are parked'", lines=3 ) analyze_btn = gr.Button("šŸ” Analyze Footage", variant="secondary", size="lg") analysis_results = gr.Textbox( label="Analysis Results", lines=15, value="Process a video first, then ask questions about what you want to find in the footage." ) # Event handlers process_btn.click( process_and_analyze_interface, inputs=[video_input, immediate_query], outputs=[processing_status, clips_list, analysis_results] ) analyze_btn.click( analyze_query_interface, inputs=[query_input], outputs=[analysis_results] ) query_input.submit( analyze_query_interface, inputs=[query_input], outputs=[analysis_results] ) gr.Markdown(""" ## šŸ“ How to Use: 1. **Upload**: Choose your security camera video file (MP4, AVI, MOV, MKV, WebM) 2. **Process**: Click 'Process Video' to analyze the footage (this creates 5-second segments) 3. **Ask**: Type your question about what you want to find in the footage 4. **Analyze**: Click 'Analyze Footage' to get detailed results with timestamps ## šŸ’” Pro Tip: - You can enter a query in the "Optional" field to analyze immediately after processing - This saves GPU time by doing both operations in a single call ## šŸ” Example Questions: - "Does a person in red clothing enter the building?" - "Is there any suspicious activity near the entrance?" - "Show me when vehicles are present" - "Are there people walking by during daytime?" - "Is there movement in the parking area?" ## šŸ”§ Features: - āœ‚ļø Automatic video segmentation into 5-second clips - 🧠 AI-powered semantic video analysis using ColQwen2.5 Omni - šŸ“ Precise timestamp reporting (MM:SS format) - šŸ“Š Confidence scoring for each detection - šŸŽÆ Yes/No question answering for security queries - ⚔ Smart relevance filtering to show only significant matches - šŸ’¾ Persistent embeddings storage for multiple queries """) return demo if __name__ == "__main__": demo = create_interface() demo.launch()