File size: 6,549 Bytes
ef53a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77939b9
 
 
 
 
 
ef53a2d
 
 
 
 
77939b9
ef53a2d
 
 
 
 
77939b9
 
 
 
 
ef53a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import os
import io
import time
import json
from typing import List, Tuple

import gradio as gr
from PIL import Image
import numpy as np
import torch
from transformers import AutoImageProcessor, AutoModel
import spaces  # ZeroGPU

# Models
MODELS = {
    "ViT-B/16 LVD-1689M": "facebook/dinov3-vitb16-pretrain-lvd1689m",
    "ViT-L/16 LVD-1689M": "facebook/dinov3-vitl16-pretrain-lvd1689m",
    "ConvNeXt-Tiny LVD-1689M": "facebook/dinov3-convnext-tiny-pretrain-lvd1689m",
    "ViT-7B/16 LVD-1689M gated": "facebook/dinov3-vit7b16-pretrain-lvd1689m",
}
DEFAULT_MODEL = "ViT-B/16 LVD-1689M"

# Tokens
HF_TOKEN = os.getenv("HF_TOKEN", None)  # set in Space Secrets

def _gpu_duration_single(image: Image.Image, *_, **__) -> int:
    # simple 120s booking for single image
    return 120

def _gpu_duration_gallery(images: List[Image.Image], *_, **__) -> int:
    # 45s per image plus buffer
    n = max(1, len(images) if images else 1)
    return min(600, 45 * n + 60)

def _load(model_id: str):
    # token works on current Transformers
    processor = AutoImageProcessor.from_pretrained(
        model_id,
        use_fast=True,
        token=HF_TOKEN if HF_TOKEN else None,
    )
    model = AutoModel.from_pretrained(
        model_id,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True,
        token=HF_TOKEN if HF_TOKEN else None,
    ).to("cuda").eval()
    return processor, model

def _extract_core(image: Image.Image, model_id: str, pooling: str, want_overlay: bool):
    t0 = time.time()
    processor, model = _load(model_id)

    # safer move to cuda for BatchFeature
    inputs = processor(images=image, return_tensors="pt")
    inputs = {k: v.to("cuda") for k, v in inputs.items()}

    with torch.cuda.amp.autocast(dtype=torch.float16), torch.inference_mode():
        out = model(**inputs)

    if pooling == "CLS":
        if getattr(out, "pooler_output", None) is not None:
            emb = out.pooler_output[0]
        else:
            emb = out.last_hidden_state[0, 0]
    else:
        # mean of patch tokens
        if out.last_hidden_state.ndim == 3:
            num_regs = getattr(model.config, "num_register_tokens", 0)
            patch_tokens = out.last_hidden_state[0, 1 + num_regs :]
            emb = patch_tokens.mean(dim=0)
        else:
            feat = out.last_hidden_state[0]  # [C,H,W]
            emb = feat.mean(dim=(1, 2))
    emb = emb.float().cpu().numpy()

    overlay = None
    if want_overlay and out.last_hidden_state.ndim == 3:
        num_regs = getattr(model.config, "num_register_tokens", 0)
        patch_tokens = out.last_hidden_state[0, 1 + num_regs :]
        H = W = inputs.pixel_values.shape[-1] // getattr(model.config, "patch_size", 16)
        mags = patch_tokens.norm(dim=1).reshape(H, W)
        mags = (mags - mags.min()) / max(1e-8, (mags.max() - mags.min()))
        m = (mags.cpu().numpy() * 255).astype(np.uint8)
        heat = Image.fromarray(m).resize(image.size, Image.BILINEAR).convert("RGB")
        overlay = Image.blend(image.convert("RGB"), heat, 0.35)

    meta = {
        "model_id": model_id,
        "dtype": "fp16 on cuda",
        "dim": int(emb.shape[0]),
        "seconds": round(time.time() - t0, 3),
    }
    return emb, overlay, meta

@spaces.GPU(duration=_gpu_duration_single)
def extract_embedding(image: Image.Image, model_name: str, pooling: str, want_overlay: bool):
    if image is None:
        return None, "[]", "No image", None, None
    if not torch.cuda.is_available():
        raise RuntimeError("CUDA not available. Ensure this Space uses ZeroGPU and the function is decorated with @spaces.GPU.")
    model_id = MODELS[model_name]
    emb, overlay, meta = _extract_core(image, model_id, pooling, want_overlay)
    buf = io.BytesIO()
    np.save(buf, emb.astype(np.float32), allow_pickle=False)
    buf.seek(0)
    head = ", ".join(f"{x:.4f}" for x in emb[:16])
    return overlay if overlay else image, f"[{head}{', ...' if emb.size > 16 else ''}]", json.dumps(meta, indent=2), ("embedding.npy", buf)

@spaces.GPU(duration=_gpu_duration_gallery)
def batch_similarity(images: List[Image.Image], model_name: str, pooling: str):
    if not images or len(images) < 2:
        return "Upload at least 2 images", None
    if not torch.cuda.is_available():
        raise RuntimeError("CUDA not available. Ensure ZeroGPU is selected.")
    model_id = MODELS[model_name]
    embs = []
    for img in images:
        e, _, _ = _extract_core(img, model_id, pooling, want_overlay=False)
        embs.append(e)
    X = np.vstack(embs).astype(np.float32)
    Xn = X / np.clip(np.linalg.norm(X, axis=1, keepdims=True), 1e-8, None)
    S = Xn @ Xn.T
    path = "cosine_similarity_matrix.csv"
    np.savetxt(path, S, delimiter=",", fmt="%.6f")
    return f"Computed {len(embs)} embeddings", path

with gr.Blocks() as app:
    gr.Markdown("# DINOv3 on ZeroGPU")
    with gr.Tab("Single"):
        with gr.Row():
            with gr.Column():
                img = gr.Image(type="pil", label="Image", height=360)
                model_dd = gr.Dropdown(choices=list(MODELS.keys()), value=DEFAULT_MODEL, label="Backbone")
                pooling = gr.Radio(["CLS", "Mean of patch tokens"], value="CLS", label="Pooling")
                overlay = gr.Checkbox(value=False, label="Show overlay")
                run_btn = gr.Button("Extract on GPU")
            with gr.Column():
                out_img = gr.Image(type="pil", label="Overlay or input", height=360)
                preview = gr.Textbox(label="Embedding head", max_lines=2)
                meta = gr.JSON(label="Meta")
                download = gr.File(label="embedding.npy")
        run_btn.click(extract_embedding, [img, model_dd, pooling, overlay], [out_img, preview, meta, download])

    with gr.Tab("Gallery"):
        gal = gr.Gallery(label="Images", columns=4, height=360, allow_preview=True)
        model_dd2 = gr.Dropdown(choices=list(MODELS.keys()), value=DEFAULT_MODEL, label="Backbone")
        pooling2 = gr.Radio(["CLS", "Mean of patch tokens"], value="CLS", label="Pooling")
        go = gr.Button("Compute cosine on GPU")
        status = gr.Textbox(label="Status", interactive=False)
        csv = gr.File(label="cosine_similarity_matrix.csv")
        go.click(batch_similarity, [gal, model_dd2, pooling2], [status, csv])

    gr.Markdown(
        "ViT 7B is gated. Accept the terms on the model page and set `HF_TOKEN` as a Space secret. "
        "This app always runs on GPU via ZeroGPU decorated calls."
    )

if __name__ == "__main__":
    app.queue().launch()