import os import time import gradio as gr import numpy as np import requests import spaces import supervision as sv import torch from PIL import Image from tqdm import tqdm from transformers import AutoModelForObjectDetection, AutoProcessor device = torch.device("cuda" if torch.cuda.is_available() else "cpu") processor = AutoProcessor.from_pretrained("PekingU/rtdetr_r50vd_coco_o365") model = AutoModelForObjectDetection.from_pretrained( "PekingU/rtdetr_r50vd_coco_o365", disable_custom_kernels=False, torch_dtype=torch.float16, ).to(device) model_compiled = torch.compile( model, mode="reduce-overhead", ) @spaces.GPU def init_compiled_model(): print("Compiling model...") start_time = time.time() with torch.no_grad(): for _ in range(10): outputs = model_compiled(**inputs) _ = outputs[0].cpu() print(f"Model compiled in {time.time() - start_time:.2f} seconds.") url = "http://images.cocodataset.org/val2017/000000039769.jpg" image = Image.open(requests.get(url, stream=True).raw) inputs = processor(images=image, return_tensors="pt").to(device).to(torch.float16) init_compiled_model() BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator() MASK_ANNOTATOR = sv.MaskAnnotator() LABEL_ANNOTATOR = sv.LabelAnnotator() TRACKER = sv.ByteTrack() def calculate_end_frame_index(source_video_path): video_info = sv.VideoInfo.from_video_path(source_video_path) return min(video_info.total_frames, video_info.fps * 5) def annotate_image(input_image, detections, labels) -> np.ndarray: output_image = MASK_ANNOTATOR.annotate(input_image, detections) output_image = BOUNDING_BOX_ANNOTATOR.annotate(output_image, detections) output_image = LABEL_ANNOTATOR.annotate(output_image, detections, labels=labels) return output_image @spaces.GPU def process_video( input_video, confidence_threshold, progress=gr.Progress(track_tqdm=True), ): video_info = sv.VideoInfo.from_video_path(input_video) total = calculate_end_frame_index(input_video) frame_generator = sv.get_video_frames_generator(source_path=input_video, end=total) result_file_name = "output.mp4" result_file_path = os.path.join(os.getcwd(), result_file_name) all_fps = [] with sv.VideoSink(result_file_path, video_info=video_info) as sink: for _ in tqdm(range(total), desc="Processing video.."): try: frame = next(frame_generator) except StopIteration: break results, fps = query(frame, confidence_threshold) all_fps.append(fps) final_labels = [] detections = [] detections = sv.Detections.from_transformers(results[0]) detections = TRACKER.update_with_detections(detections) for label in detections.class_id.tolist(): final_labels.append(model.config.id2label[label]) frame = annotate_image( input_image=frame, detections=detections, labels=final_labels, ) sink.write_frame(frame) avg_fps = np.mean(all_fps) return result_file_path, gr.Markdown( f'