Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
import cv2 | |
import os | |
import tempfile | |
import numpy as np | |
from PIL import Image | |
from tqdm import tqdm | |
from torch.utils.data import DataLoader | |
from moviepy.editor import VideoFileClip | |
from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor | |
import warnings | |
warnings.filterwarnings("ignore") | |
# Global variables to store model, processor, and embeddings | |
model = None | |
processor = None | |
video_embeddings = [] | |
video_clips = [] | |
def initialize_model(): | |
"""Initialize the ColQwen2.5 Omni model and processor""" | |
global model, processor | |
try: | |
# Load model with eager attention (no flash-attn) | |
model = ColQwen2_5Omni.from_pretrained( | |
"vidore/colqwen-omni-v0.1", | |
torch_dtype=torch.bfloat16, | |
device_map="cuda" if torch.cuda.is_available() else "cpu", | |
attn_implementation="eager", # Use eager instead of flash-attn | |
).eval() | |
processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1") | |
return "β Model loaded successfully!" | |
except Exception as e: | |
return f"β Error loading model: {str(e)}" | |
def cut_video_into_clips(video_path, clip_duration=10): | |
"""Cut video into clips of specified duration (default 10 seconds)""" | |
clips = [] | |
clip_paths = [] | |
try: | |
# Use OpenCV for more reliable video processing on HF Spaces | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = total_frames / fps | |
# Calculate frames per clip | |
frames_per_clip = int(fps * clip_duration) | |
clip_count = 0 | |
current_frame = 0 | |
while current_frame < total_frames: | |
# Create temporary file for this clip | |
temp_clip = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
temp_clip_path = temp_clip.name | |
temp_clip.close() | |
# Use moviepy for the actual cutting (more reliable for output) | |
try: | |
start_time = current_frame / fps | |
end_time = min((current_frame + frames_per_clip) / fps, duration) | |
video_clip = VideoFileClip(video_path).subclip(start_time, end_time) | |
video_clip.write_videofile(temp_clip_path, verbose=False, logger=None) | |
video_clip.close() | |
clips.append(f"Clip {clip_count + 1} ({start_time:.1f}s - {end_time:.1f}s)") | |
clip_paths.append(temp_clip_path) | |
clip_count += 1 | |
current_frame += frames_per_clip | |
except Exception as e: | |
print(f"Error creating clip {clip_count}: {str(e)}") | |
continue | |
cap.release() | |
return clips, clip_paths | |
except Exception as e: | |
return [], [] | |
def process_video(video_file): | |
"""Process uploaded video: cut into clips and generate embeddings""" | |
global model, processor, video_embeddings, video_clips | |
if model is None: | |
return "β Model not loaded. Please wait for initialization to complete.", [] | |
if video_file is None: | |
return "β Please upload a video file.", [] | |
try: | |
# Reset previous data | |
video_embeddings = [] | |
video_clips = [] | |
# Cut video into 10-second clips | |
status_msg = "π¬ Cutting video into 10-second clips..." | |
clips_info, clip_paths = cut_video_into_clips(video_file.name, clip_duration=10) | |
if not clip_paths: | |
return "β Error cutting video into clips.", [] | |
status_msg += f"\nβ Created {len(clip_paths)} clips" | |
# Process each clip with the model | |
status_msg += "\nπ Generating embeddings for video clips..." | |
# Create dataloader for batch processing | |
dataloader = DataLoader( | |
dataset=clip_paths, | |
batch_size=1, | |
shuffle=False, | |
collate_fn=lambda x: processor.process_videos(x), | |
) | |
embeddings = [] | |
for i, batch_doc in enumerate(tqdm(dataloader, desc="Processing clips")): | |
with torch.no_grad(): | |
# Move to device | |
device = next(model.parameters()).device | |
batch_doc = {k: v.to(device) for k, v in batch_doc.items()} | |
# Generate embeddings | |
embedding = model(**batch_doc) | |
embeddings.extend(list(torch.unbind(embedding.to("cpu")))) | |
video_embeddings = embeddings | |
video_clips = clip_paths | |
status_msg += f"\nβ Generated embeddings for {len(embeddings)} clips" | |
status_msg += "\nπ― Ready for queries!" | |
return status_msg, clips_info | |
except Exception as e: | |
return f"β Error processing video: {str(e)}", [] | |
def search_video_clips(query): | |
"""Search through video clips using text query""" | |
global model, processor, video_embeddings, video_clips | |
if model is None: | |
return "β Model not loaded.", None, "" | |
if not video_embeddings: | |
return "β No video processed. Please upload and process a video first.", None, "" | |
if not query.strip(): | |
return "β Please enter a search query.", None, "" | |
try: | |
# Process query | |
batch_queries = processor.process_queries([query]) | |
device = next(model.parameters()).device | |
batch_queries = {k: v.to(device) for k, v in batch_queries.items()} | |
# Generate query embedding | |
with torch.no_grad(): | |
query_embedding = model(**batch_queries) | |
# Calculate scores | |
scores = processor.score_multi_vector(query_embedding, video_embeddings) | |
# Find best match | |
best_clip_idx = scores[0].argmax().item() | |
best_score = scores[0][best_clip_idx].item() | |
# Get the best matching clip | |
best_clip_path = video_clips[best_clip_idx] | |
result_text = f"π― Best match: Clip {best_clip_idx + 1}\n" | |
result_text += f"π Similarity score: {best_score:.4f}\n" | |
result_text += f"π Query: '{query}'" | |
# Return top 3 results text | |
top_3_scores = torch.topk(scores[0], min(3, len(scores[0]))) | |
rankings = "\n\nπ Top 3 Results:\n" | |
for i, (score, idx) in enumerate(zip(top_3_scores.values, top_3_scores.indices)): | |
rankings += f"{i+1}. Clip {idx+1} (Score: {score:.4f})\n" | |
return result_text + rankings, best_clip_path, f"Best matching clip for: '{query}'" | |
except Exception as e: | |
return f"β Error during search: {str(e)}", None, "" | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="Video RAG with ColQwen2.5 Omni", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π¬ Video RAG with ColQwen2.5 Omni") | |
gr.Markdown("Upload a video, and it will be automatically cut into 10-second clips. Then search through the clips using natural language queries!") | |
# Initialize model on startup | |
with gr.Row(): | |
init_btn = gr.Button("π Initialize Model", variant="primary") | |
init_status = gr.Textbox(label="Initialization Status", value="Click 'Initialize Model' to start") | |
init_btn.click(initialize_model, outputs=[init_status]) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("## π€ Upload Video") | |
video_input = gr.File( | |
label="Upload Video File", | |
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"], | |
type="filepath" | |
) | |
process_btn = gr.Button("π¬ Process Video", variant="secondary") | |
processing_status = gr.Textbox( | |
label="Processing Status", | |
lines=6, | |
value="Upload a video and click 'Process Video' to start" | |
) | |
clips_list = gr.JSON( | |
label="Generated Clips", | |
value=[] | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("## π Search Clips") | |
query_input = gr.Textbox( | |
label="Search Query", | |
placeholder="e.g., 'a dragon spitting fire', 'person running', 'car driving'", | |
lines=2 | |
) | |
search_btn = gr.Button("π― Search", variant="primary") | |
search_results = gr.Textbox( | |
label="Search Results", | |
lines=8 | |
) | |
with gr.Row(): | |
result_video = gr.Video( | |
label="Best Matching Clip", | |
visible=True | |
) | |
# Event handlers | |
process_btn.click( | |
process_video, | |
inputs=[video_input], | |
outputs=[processing_status, clips_list] | |
) | |
search_btn.click( | |
search_video_clips, | |
inputs=[query_input], | |
outputs=[search_results, result_video, result_video] | |
) | |
# Auto-search on Enter | |
query_input.submit( | |
search_video_clips, | |
inputs=[query_input], | |
outputs=[search_results, result_video, result_video] | |
) | |
gr.Markdown(""" | |
## π Instructions: | |
1. **Initialize**: Click 'Initialize Model' and wait for completion | |
2. **Upload**: Choose a video file (MP4, AVI, MOV, MKV, WebM) | |
3. **Process**: Click 'Process Video' to cut it into 10-second clips | |
4. **Search**: Enter a query describing what you're looking for | |
5. **Results**: View the best matching clip and similarity scores | |
## π§ Features: | |
- βοΈ Automatic video segmentation into 10-second clips | |
- π§ AI-powered semantic video search using ColQwen2.5 Omni | |
- π― Real-time similarity scoring and ranking | |
- π± OpenCV-based video processing for HF Spaces compatibility | |
- β‘ Eager attention implementation (no flash-attn dependency) | |
""") | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch() |