CCTV_Assistant / app.py
NikhilJoson's picture
Update app.py
3ce93a6 verified
raw
history blame
10.6 kB
import gradio as gr
import torch
import cv2
import os
import tempfile
import numpy as np
from PIL import Image
from tqdm import tqdm
from torch.utils.data import DataLoader
from moviepy.editor import VideoFileClip
from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
import warnings
warnings.filterwarnings("ignore")
# Global variables to store model, processor, and embeddings
model = None
processor = None
video_embeddings = []
video_clips = []
def initialize_model():
"""Initialize the ColQwen2.5 Omni model and processor"""
global model, processor
try:
# Load model with eager attention (no flash-attn)
model = ColQwen2_5Omni.from_pretrained(
"vidore/colqwen-omni-v0.1",
torch_dtype=torch.bfloat16,
device_map="cuda" if torch.cuda.is_available() else "cpu",
attn_implementation="eager", # Use eager instead of flash-attn
).eval()
processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1")
return "βœ… Model loaded successfully!"
except Exception as e:
return f"❌ Error loading model: {str(e)}"
def cut_video_into_clips(video_path, clip_duration=10):
"""Cut video into clips of specified duration (default 10 seconds)"""
clips = []
clip_paths = []
try:
# Use OpenCV for more reliable video processing on HF Spaces
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
# Calculate frames per clip
frames_per_clip = int(fps * clip_duration)
clip_count = 0
current_frame = 0
while current_frame < total_frames:
# Create temporary file for this clip
temp_clip = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
temp_clip_path = temp_clip.name
temp_clip.close()
# Use moviepy for the actual cutting (more reliable for output)
try:
start_time = current_frame / fps
end_time = min((current_frame + frames_per_clip) / fps, duration)
video_clip = VideoFileClip(video_path).subclip(start_time, end_time)
video_clip.write_videofile(temp_clip_path, verbose=False, logger=None)
video_clip.close()
clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s)")
clip_paths.append(temp_clip_path)
clip_count += 1
current_frame += frames_per_clip
except Exception as e:
print(f"Error creating clip {clip_count}: {str(e)}")
continue
cap.release()
return clips, clip_paths
except Exception as e:
return [], []
def process_video(video_file):
"""Process uploaded video: cut into clips and generate embeddings"""
global model, processor, video_embeddings, video_clips
if model is None:
return "❌ Model not loaded. Please wait for initialization to complete.", []
if video_file is None:
return "❌ Please upload a video file.", []
try:
# Reset previous data
video_embeddings = []
video_clips = []
# Cut video into 10-second clips
status_msg = "🎬 Cutting video into 10-second clips..."
clips_info, clip_paths = cut_video_into_clips(video_file.name, clip_duration=10)
if not clip_paths:
return "❌ Error cutting video into clips.", []
status_msg += f"\nβœ… Created {len(clip_paths)} clips"
# Process each clip with the model
status_msg += "\nπŸ”„ Generating embeddings for video clips..."
# Create dataloader for batch processing
dataloader = DataLoader(
dataset=clip_paths,
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_videos(x),
)
embeddings = []
for i, batch_doc in enumerate(tqdm(dataloader, desc="Processing clips")):
with torch.no_grad():
# Move to device
device = next(model.parameters()).device
batch_doc = {k: v.to(device) for k, v in batch_doc.items()}
# Generate embeddings
embedding = model(**batch_doc)
embeddings.extend(list(torch.unbind(embedding.to("cpu"))))
video_embeddings = embeddings
video_clips = clip_paths
status_msg += f"\nβœ… Generated embeddings for {len(embeddings)} clips"
status_msg += "\n🎯 Ready for queries!"
return status_msg, clips_info
except Exception as e:
return f"❌ Error processing video: {str(e)}", []
def search_video_clips(query):
"""Search through video clips using text query"""
global model, processor, video_embeddings, video_clips
if model is None:
return "❌ Model not loaded.", None, ""
if not video_embeddings:
return "❌ No video processed. Please upload and process a video first.", None, ""
if not query.strip():
return "❌ Please enter a search query.", None, ""
try:
# Process query
batch_queries = processor.process_queries([query])
device = next(model.parameters()).device
batch_queries = {k: v.to(device) for k, v in batch_queries.items()}
# Generate query embedding
with torch.no_grad():
query_embedding = model(**batch_queries)
# Calculate scores
scores = processor.score_multi_vector(query_embedding, video_embeddings)
# Find best match
best_clip_idx = scores[0].argmax().item()
best_score = scores[0][best_clip_idx].item()
# Get the best matching clip
best_clip_path = video_clips[best_clip_idx]
result_text = f"🎯 Best match: Clip {best_clip_idx + 1}\n"
result_text += f"πŸ“Š Similarity score: {best_score:.4f}\n"
result_text += f"πŸ” Query: '{query}'"
# Return top 3 results text
top_3_scores = torch.topk(scores[0], min(3, len(scores[0])))
rankings = "\n\nπŸ“‹ Top 3 Results:\n"
for i, (score, idx) in enumerate(zip(top_3_scores.values, top_3_scores.indices)):
rankings += f"{i+1}. Clip {idx+1} (Score: {score:.4f})\n"
return result_text + rankings, best_clip_path, f"Best matching clip for: '{query}'"
except Exception as e:
return f"❌ Error during search: {str(e)}", None, ""
# Create Gradio interface
def create_interface():
with gr.Blocks(title="Video RAG with ColQwen2.5 Omni", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🎬 Video RAG with ColQwen2.5 Omni")
gr.Markdown("Upload a video, and it will be automatically cut into 10-second clips. Then search through the clips using natural language queries!")
# Initialize model on startup
with gr.Row():
init_btn = gr.Button("πŸš€ Initialize Model", variant="primary")
init_status = gr.Textbox(label="Initialization Status", value="Click 'Initialize Model' to start")
init_btn.click(initialize_model, outputs=[init_status])
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## πŸ“€ Upload Video")
video_input = gr.File(
label="Upload Video File",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
process_btn = gr.Button("🎬 Process Video", variant="secondary")
processing_status = gr.Textbox(
label="Processing Status",
lines=6,
value="Upload a video and click 'Process Video' to start"
)
clips_list = gr.JSON(
label="Generated Clips",
value=[]
)
with gr.Column(scale=1):
gr.Markdown("## πŸ” Search Clips")
query_input = gr.Textbox(
label="Search Query",
placeholder="e.g., 'a dragon spitting fire', 'person running', 'car driving'",
lines=2
)
search_btn = gr.Button("🎯 Search", variant="primary")
search_results = gr.Textbox(
label="Search Results",
lines=8
)
with gr.Row():
result_video = gr.Video(
label="Best Matching Clip",
visible=True
)
# Event handlers
process_btn.click(
process_video,
inputs=[video_input],
outputs=[processing_status, clips_list]
)
search_btn.click(
search_video_clips,
inputs=[query_input],
outputs=[search_results, result_video, result_video]
)
# Auto-search on Enter
query_input.submit(
search_video_clips,
inputs=[query_input],
outputs=[search_results, result_video, result_video]
)
gr.Markdown("""
## πŸ“ Instructions:
1. **Initialize**: Click 'Initialize Model' and wait for completion
2. **Upload**: Choose a video file (MP4, AVI, MOV, MKV, WebM)
3. **Process**: Click 'Process Video' to cut it into 10-second clips
4. **Search**: Enter a query describing what you're looking for
5. **Results**: View the best matching clip and similarity scores
## πŸ”§ Features:
- βœ‚οΈ Automatic video segmentation into 10-second clips
- 🧠 AI-powered semantic video search using ColQwen2.5 Omni
- 🎯 Real-time similarity scoring and ranking
- πŸ“± OpenCV-based video processing for HF Spaces compatibility
- ⚑ Eager attention implementation (no flash-attn dependency)
""")
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch()