File size: 5,701 Bytes
ca069cd 40a6300 ca069cd 03cff7d c75d2db 03cff7d ca069cd 03cff7d ca069cd a4bd236 ca069cd cc7270d ca069cd cc7270d ca069cd cc7270d ca069cd cc7270d ca069cd cc7270d ca069cd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import os
import time
import gradio as gr
import numpy as np
import requests
import spaces
import supervision as sv
import torch
from PIL import Image
from tqdm import tqdm
from transformers import AutoModelForObjectDetection, AutoProcessor
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
processor = AutoProcessor.from_pretrained("PekingU/rtdetr_r50vd_coco_o365")
model = AutoModelForObjectDetection.from_pretrained(
"PekingU/rtdetr_r50vd_coco_o365",
disable_custom_kernels=False,
torch_dtype=torch.float16,
).to(device)
model_compiled = torch.compile(
model,
mode="reduce-overhead",
)
@spaces.GPU
def init_compiled_model():
print("Compiling model...")
start_time = time.time()
with torch.no_grad():
for _ in range(10):
outputs = model_compiled(**inputs)
_ = outputs[0].cpu()
print(f"Model compiled in {time.time() - start_time:.2f} seconds.")
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
image = Image.open(requests.get(url, stream=True).raw)
inputs = processor(images=image, return_tensors="pt").to(device).to(torch.float16)
init_compiled_model()
BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator()
MASK_ANNOTATOR = sv.MaskAnnotator()
LABEL_ANNOTATOR = sv.LabelAnnotator()
TRACKER = sv.ByteTrack()
def calculate_end_frame_index(source_video_path):
video_info = sv.VideoInfo.from_video_path(source_video_path)
return min(video_info.total_frames, video_info.fps * 5)
def annotate_image(input_image, detections, labels) -> np.ndarray:
output_image = MASK_ANNOTATOR.annotate(input_image, detections)
output_image = BOUNDING_BOX_ANNOTATOR.annotate(output_image, detections)
output_image = LABEL_ANNOTATOR.annotate(output_image, detections, labels=labels)
return output_image
@spaces.GPU
def process_video(
input_video,
confidence_threshold,
progress=gr.Progress(track_tqdm=True),
):
video_info = sv.VideoInfo.from_video_path(input_video)
total = calculate_end_frame_index(input_video)
frame_generator = sv.get_video_frames_generator(source_path=input_video, end=total)
result_file_name = "output.mp4"
result_file_path = os.path.join(os.getcwd(), result_file_name)
all_fps = []
with sv.VideoSink(result_file_path, video_info=video_info) as sink:
for _ in tqdm(range(total), desc="Processing video.."):
try:
frame = next(frame_generator)
except StopIteration:
break
results, fps = query(frame, confidence_threshold)
all_fps.append(fps)
final_labels = []
detections = []
detections = sv.Detections.from_transformers(results[0])
detections = TRACKER.update_with_detections(detections)
for label in detections.class_id.tolist():
final_labels.append(model.config.id2label[label])
frame = annotate_image(
input_image=frame,
detections=detections,
labels=final_labels,
)
sink.write_frame(frame)
avg_fps = np.mean(all_fps)
return result_file_path, gr.Markdown(
f'<h3 style="text-align: center;">Model inference FPS: {avg_fps:.2f}</h3>',
visible=True,
)
def query(frame, confidence_threshold):
image = Image.fromarray(frame)
inputs = processor(images=image, return_tensors="pt").to(device, torch.float16)
with torch.no_grad():
start = time.time()
outputs = model_compiled(**inputs)
outputs[0].cpu()
fps = 1 / (time.time() - start)
target_sizes = torch.tensor([frame.shape[:2]]).to(device)
results = processor.post_process_object_detection(
outputs=outputs,
threshold=confidence_threshold,
target_sizes=target_sizes,
)
return results, fps
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("## Real Time Object Detection with compiled RT-DETR")
gr.Markdown(
"""
This is a demo for real-time object detection using RT-DETR compiled.<br>
It runs on ZeroGPU which captures GPU every first time you infer.<br>
This combined with video processing time means that the demo inference time is slower than the model's actual inference time.<br>
The actual model average inference FPS is displayed under the processed video after inference.
"""
)
gr.Markdown(
"Simply upload a video! You can also play with confidence threshold or try the examples below. 👇"
)
with gr.Row():
with gr.Column():
input_video = gr.Video(label="Input Video")
with gr.Column():
output_video = gr.Video(label="Output Video (5s max)")
actual_fps = gr.Markdown("", visible=False)
with gr.Row():
conf = gr.Slider(
label="Confidence Threshold",
minimum=0.1,
maximum=1.0,
value=0.3,
step=0.05,
)
with gr.Row():
submit = gr.Button(variant="primary")
example = gr.Examples(
examples=[
["./football.mp4", 0.3, 640],
["./cat.mp4", 0.3, 640],
["./safari2.mp4", 0.3, 640],
],
inputs=[input_video, conf],
outputs=output_video,
)
submit.click(
fn=process_video,
inputs=[input_video, conf],
outputs=[output_video, actual_fps],
)
if __name__ == "__main__":
demo.launch(show_error=True)
|