atalaydenknalbant commited on
Commit
5bec47d
·
verified ·
1 Parent(s): 951e2b3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +119 -44
app.py CHANGED
@@ -1,19 +1,22 @@
1
- import spaces
2
  import os
3
  import io
4
  import time
5
  import json
6
- from typing import List, Tuple
7
  import tempfile
8
  from uuid import uuid4
 
 
9
  import gradio as gr
10
  from PIL import Image
11
  import numpy as np
12
  import torch
13
  from transformers import AutoImageProcessor, AutoModel
 
14
 
15
 
16
- # Models
 
 
17
  MODELS = {
18
  "ViT-B/16 LVD-1689M": "facebook/dinov3-vitb16-pretrain-lvd1689m",
19
  "ViT-L/16 LVD-1689M": "facebook/dinov3-vitl16-pretrain-lvd1689m",
@@ -22,20 +25,27 @@ MODELS = {
22
  }
23
  DEFAULT_MODEL = "ViT-B/16 LVD-1689M"
24
 
25
- # Tokens
26
- HF_TOKEN = os.getenv("HF_TOKEN", None) # set in Space Secrets
27
 
28
- def _gpu_duration_single(image: Image.Image, *_, **__) -> int:
29
- # simple 120s booking for single image
 
 
 
 
30
  return 120
31
 
32
- def _gpu_duration_gallery(images: List[Image.Image], *_, **__) -> int:
33
- # 45s per image plus buffer
34
- n = max(1, len(images) if images else 1)
35
- return min(600, 45 * n + 60)
 
36
 
 
 
 
37
  def _load(model_id: str):
38
- # token works on current Transformers
39
  processor = AutoImageProcessor.from_pretrained(
40
  model_id,
41
  use_fast=True,
@@ -46,27 +56,38 @@ def _load(model_id: str):
46
  torch_dtype=torch.float16,
47
  low_cpu_mem_usage=True,
48
  token=HF_TOKEN if HF_TOKEN else None,
49
- ).to("cuda").eval()
 
50
  return processor, model
51
 
 
52
  def _extract_core(image: Image.Image, model_id: str, pooling: str, want_overlay: bool):
 
 
 
53
  t0 = time.time()
54
  processor, model = _load(model_id)
55
 
56
- # safer move to cuda for BatchFeature
57
- inputs = processor(images=image, return_tensors="pt")
58
- inputs = {k: v.to("cuda") for k, v in inputs.items()}
 
 
 
 
 
59
 
60
  with torch.amp.autocast("cuda", dtype=torch.float16), torch.inference_mode():
61
- out = model(**inputs)
62
 
 
63
  if pooling == "CLS":
64
  if getattr(out, "pooler_output", None) is not None:
65
  emb = out.pooler_output[0]
66
  else:
67
  emb = out.last_hidden_state[0, 0]
68
  else:
69
- # mean of patch tokens
70
  if out.last_hidden_state.ndim == 3:
71
  num_regs = getattr(model.config, "num_register_tokens", 0)
72
  patch_tokens = out.last_hidden_state[0, 1 + num_regs :]
@@ -76,12 +97,22 @@ def _extract_core(image: Image.Image, model_id: str, pooling: str, want_overlay:
76
  emb = feat.mean(dim=(1, 2))
77
  emb = emb.float().cpu().numpy()
78
 
 
79
  overlay = None
80
  if want_overlay and out.last_hidden_state.ndim == 3:
81
  num_regs = getattr(model.config, "num_register_tokens", 0)
82
- patch_tokens = out.last_hidden_state[0, 1 + num_regs :]
83
- H = W = inputs.pixel_values.shape[-1] // getattr(model.config, "patch_size", 16)
84
- mags = patch_tokens.norm(dim=1).reshape(H, W)
 
 
 
 
 
 
 
 
 
85
  mags = (mags - mags.min()) / max(1e-8, (mags.max() - mags.min()))
86
  m = (mags.cpu().numpy() * 255).astype(np.uint8)
87
  heat = Image.fromarray(m).resize(image.size, Image.BILINEAR).convert("RGB")
@@ -95,47 +126,77 @@ def _extract_core(image: Image.Image, model_id: str, pooling: str, want_overlay:
95
  }
96
  return emb, overlay, meta
97
 
 
 
 
 
98
  @spaces.GPU(duration=_gpu_duration_single)
99
  def extract_embedding(image: Image.Image, model_name: str, pooling: str, want_overlay: bool):
100
  if image is None:
101
  return None, "[]", {"error": "No image"}, None
102
  if not torch.cuda.is_available():
103
- raise RuntimeError("CUDA not available. Ensure ZeroGPU is active.")
104
 
105
  model_id = MODELS[model_name]
106
  emb, overlay, meta = _extract_core(image, model_id, pooling, want_overlay)
107
 
108
- # preview string
109
  head = ", ".join(f"{x:.4f}" for x in emb[:16])
110
  preview = f"[{head}{', ...' if emb.size > 16 else ''}]"
111
-
112
- # write to a temp file and return the path for gr.File
113
- tmp_path = os.path.join(tempfile.gettempdir(), f"embedding_{uuid4().hex}.npy")
114
- np.save(tmp_path, emb.astype(np.float32), allow_pickle=False)
115
-
116
- # gr.Image, gr.Textbox, gr.JSON, gr.File
117
- return overlay if overlay else image, preview, meta, tmp_path
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
  @spaces.GPU(duration=_gpu_duration_gallery)
120
- def batch_similarity(images: List[Image.Image], model_name: str, pooling: str):
121
- if not images or len(images) < 2:
 
 
122
  return "Upload at least 2 images", None
 
123
  if not torch.cuda.is_available():
124
- raise RuntimeError("CUDA not available. Ensure ZeroGPU is selected.")
 
125
  model_id = MODELS[model_name]
126
  embs = []
127
- for img in images:
128
  e, _, _ = _extract_core(img, model_id, pooling, want_overlay=False)
129
  embs.append(e)
 
 
 
 
130
  X = np.vstack(embs).astype(np.float32)
131
  Xn = X / np.clip(np.linalg.norm(X, axis=1, keepdims=True), 1e-8, None)
132
  S = Xn @ Xn.T
133
- path = "cosine_similarity_matrix.csv"
134
- np.savetxt(path, S, delimiter=",", fmt="%.6f")
135
- return f"Computed {len(embs)} embeddings", path
136
 
 
 
 
 
 
 
 
 
137
  with gr.Blocks() as app:
138
- gr.Markdown("# DINOv3 on ZeroGPU")
 
139
  with gr.Tab("Single"):
140
  with gr.Row():
141
  with gr.Column():
@@ -152,18 +213,32 @@ with gr.Blocks() as app:
152
  run_btn.click(extract_embedding, [img, model_dd, pooling, overlay], [out_img, preview, meta, download])
153
 
154
  with gr.Tab("Gallery"):
155
- gal = gr.Gallery(label="Images", columns=4, height=360, allow_preview=True)
 
 
 
156
  model_dd2 = gr.Dropdown(choices=list(MODELS.keys()), value=DEFAULT_MODEL, label="Backbone")
157
  pooling2 = gr.Radio(["CLS", "Mean of patch tokens"], value="CLS", label="Pooling")
158
  go = gr.Button("Compute cosine on GPU")
159
  status = gr.Textbox(label="Status", interactive=False)
160
  csv = gr.File(label="cosine_similarity_matrix.csv")
161
- go.click(batch_similarity, [gal, model_dd2, pooling2], [status, csv])
162
 
163
- gr.Markdown(
164
- "ViT 7B is gated. Accept the terms on the model page and set `HF_TOKEN` as a Space secret. "
165
- "This app always runs on GPU via ZeroGPU decorated calls."
166
- )
 
 
 
 
 
 
 
 
 
 
 
 
167
 
168
  if __name__ == "__main__":
169
  app.queue().launch()
 
 
1
  import os
2
  import io
3
  import time
4
  import json
 
5
  import tempfile
6
  from uuid import uuid4
7
+ from typing import List, Tuple
8
+
9
  import gradio as gr
10
  from PIL import Image
11
  import numpy as np
12
  import torch
13
  from transformers import AutoImageProcessor, AutoModel
14
+ import spaces
15
 
16
 
17
+ # ---------------------------
18
+ # Models and config
19
+ # ---------------------------
20
  MODELS = {
21
  "ViT-B/16 LVD-1689M": "facebook/dinov3-vitb16-pretrain-lvd1689m",
22
  "ViT-L/16 LVD-1689M": "facebook/dinov3-vitl16-pretrain-lvd1689m",
 
25
  }
26
  DEFAULT_MODEL = "ViT-B/16 LVD-1689M"
27
 
28
+ HF_TOKEN = os.getenv("HF_TOKEN", None) # set in Space Secrets after requesting gated access
 
29
 
30
+
31
+ # ---------------------------
32
+ # ZeroGPU booking helpers
33
+ # ---------------------------
34
+ def _gpu_duration_single(image: Image.Image, *_args, **_kwargs) -> int:
35
+ # 120 seconds is plenty for single image feature extraction
36
  return 120
37
 
38
+ def _gpu_duration_gallery(files: List[str], *_args, **_kwargs) -> int:
39
+ # 35s per image + 30s buffer capped at 10 minutes
40
+ n = max(1, len(files) if files else 1)
41
+ return min(600, 35 * n + 30)
42
+
43
 
44
+ # ---------------------------
45
+ # Model loading and core logic
46
+ # ---------------------------
47
  def _load(model_id: str):
48
+ # Use token for gated checkpoints
49
  processor = AutoImageProcessor.from_pretrained(
50
  model_id,
51
  use_fast=True,
 
56
  torch_dtype=torch.float16,
57
  low_cpu_mem_usage=True,
58
  token=HF_TOKEN if HF_TOKEN else None,
59
+ )
60
+ model.to("cuda").eval()
61
  return processor, model
62
 
63
+
64
  def _extract_core(image: Image.Image, model_id: str, pooling: str, want_overlay: bool):
65
+ """
66
+ Returns (embedding np.ndarray, optional overlay PIL.Image, meta dict)
67
+ """
68
  t0 = time.time()
69
  processor, model = _load(model_id)
70
 
71
+ # Keep BatchFeature when possible, but handle dict too
72
+ bf = processor(images=image, return_tensors="pt")
73
+ if hasattr(bf, "to"):
74
+ bf = bf.to("cuda")
75
+ pixel_values = bf["pixel_values"]
76
+ else:
77
+ bf = {k: v.to("cuda") for k, v in bf.items()}
78
+ pixel_values = bf["pixel_values"]
79
 
80
  with torch.amp.autocast("cuda", dtype=torch.float16), torch.inference_mode():
81
+ out = model(**bf)
82
 
83
+ # Embedding pooling
84
  if pooling == "CLS":
85
  if getattr(out, "pooler_output", None) is not None:
86
  emb = out.pooler_output[0]
87
  else:
88
  emb = out.last_hidden_state[0, 0]
89
  else:
90
+ # mean of patch tokens or mean over H,W for conv features
91
  if out.last_hidden_state.ndim == 3:
92
  num_regs = getattr(model.config, "num_register_tokens", 0)
93
  patch_tokens = out.last_hidden_state[0, 1 + num_regs :]
 
97
  emb = feat.mean(dim=(1, 2))
98
  emb = emb.float().cpu().numpy()
99
 
100
+ # Optional simple heat overlay for ViT
101
  overlay = None
102
  if want_overlay and out.last_hidden_state.ndim == 3:
103
  num_regs = getattr(model.config, "num_register_tokens", 0)
104
+ patch_tokens = out.last_hidden_state[0, 1 + num_regs :] # [N_patches, D]
105
+ num_patches = patch_tokens.shape[0]
106
+
107
+ # Prefer square grid from token count, else fall back to pixel/patch size
108
+ h = int(num_patches ** 0.5)
109
+ w = h
110
+ if h * w != num_patches:
111
+ patch = getattr(model.config, "patch_size", 16)
112
+ h = int(pixel_values.shape[-2] // patch)
113
+ w = int(pixel_values.shape[-1] // patch)
114
+
115
+ mags = patch_tokens.norm(dim=1).reshape(h, w)
116
  mags = (mags - mags.min()) / max(1e-8, (mags.max() - mags.min()))
117
  m = (mags.cpu().numpy() * 255).astype(np.uint8)
118
  heat = Image.fromarray(m).resize(image.size, Image.BILINEAR).convert("RGB")
 
126
  }
127
  return emb, overlay, meta
128
 
129
+
130
+ # ---------------------------
131
+ # Single image API (ZeroGPU)
132
+ # ---------------------------
133
  @spaces.GPU(duration=_gpu_duration_single)
134
  def extract_embedding(image: Image.Image, model_name: str, pooling: str, want_overlay: bool):
135
  if image is None:
136
  return None, "[]", {"error": "No image"}, None
137
  if not torch.cuda.is_available():
138
+ raise RuntimeError("CUDA not available. Ensure Space hardware is ZeroGPU.")
139
 
140
  model_id = MODELS[model_name]
141
  emb, overlay, meta = _extract_core(image, model_id, pooling, want_overlay)
142
 
143
+ # Preview + file save for gr.File
144
  head = ", ".join(f"{x:.4f}" for x in emb[:16])
145
  preview = f"[{head}{', ...' if emb.size > 16 else ''}]"
146
+ out_path = os.path.join(tempfile.gettempdir(), f"embedding_{uuid4().hex}.npy")
147
+ np.save(out_path, emb.astype(np.float32), allow_pickle=False)
148
+
149
+ # Return: gr.Image, gr.Textbox, gr.JSON, gr.File
150
+ return overlay if overlay else image, preview, meta, out_path
151
+
152
+
153
+ # ---------------------------
154
+ # Multi image similarity (ZeroGPU)
155
+ # ---------------------------
156
+ def _open_images_from_paths(paths: List[str]) -> List[Image.Image]:
157
+ imgs: List[Image.Image] = []
158
+ for p in paths:
159
+ try:
160
+ im = Image.open(p).convert("RGB")
161
+ imgs.append(im)
162
+ except Exception:
163
+ pass
164
+ return imgs
165
 
166
  @spaces.GPU(duration=_gpu_duration_gallery)
167
+ def batch_similarity(files: List[str], model_name: str, pooling: str):
168
+ # files is a list of filepaths from gr.Files
169
+ paths = files or []
170
+ if len(paths) < 2:
171
  return "Upload at least 2 images", None
172
+
173
  if not torch.cuda.is_available():
174
+ raise RuntimeError("CUDA not available. Ensure Space hardware is ZeroGPU.")
175
+
176
  model_id = MODELS[model_name]
177
  embs = []
178
+ for img in _open_images_from_paths(paths):
179
  e, _, _ = _extract_core(img, model_id, pooling, want_overlay=False)
180
  embs.append(e)
181
+
182
+ if len(embs) < 2:
183
+ return "Failed to read or embed images", None
184
+
185
  X = np.vstack(embs).astype(np.float32)
186
  Xn = X / np.clip(np.linalg.norm(X, axis=1, keepdims=True), 1e-8, None)
187
  S = Xn @ Xn.T
 
 
 
188
 
189
+ csv_path = os.path.join(tempfile.gettempdir(), f"cosine_{uuid4().hex}.csv")
190
+ np.savetxt(csv_path, S, delimiter=",", fmt="%.6f")
191
+ return f"Computed {len(embs)} embeddings", csv_path
192
+
193
+
194
+ # ---------------------------
195
+ # UI
196
+ # ---------------------------
197
  with gr.Blocks() as app:
198
+ gr.Markdown("# DINOv3 on ZeroGPU — Embeddings and Cosine Similarity")
199
+
200
  with gr.Tab("Single"):
201
  with gr.Row():
202
  with gr.Column():
 
213
  run_btn.click(extract_embedding, [img, model_dd, pooling, overlay], [out_img, preview, meta, download])
214
 
215
  with gr.Tab("Gallery"):
216
+ gr.Markdown("Upload multiple images. We compute a cosine similarity matrix on GPU and return a CSV.")
217
+ # Input as Files so you can multi-upload, plus a Gallery preview
218
+ files_in = gr.Files(label="Upload images", file_types=["image"], file_count="multiple", type="filepath")
219
+ gallery_preview = gr.Gallery(label="Preview", columns=4, height=300)
220
  model_dd2 = gr.Dropdown(choices=list(MODELS.keys()), value=DEFAULT_MODEL, label="Backbone")
221
  pooling2 = gr.Radio(["CLS", "Mean of patch tokens"], value="CLS", label="Pooling")
222
  go = gr.Button("Compute cosine on GPU")
223
  status = gr.Textbox(label="Status", interactive=False)
224
  csv = gr.File(label="cosine_similarity_matrix.csv")
 
225
 
226
+ # Simple preview hook
227
+ def _preview(paths: List[str]):
228
+ if not paths:
229
+ return []
230
+ imgs = []
231
+ for p in paths:
232
+ try:
233
+ imgs.append(Image.open(p).convert("RGB"))
234
+ except Exception:
235
+ pass
236
+ return imgs
237
+
238
+ files_in.change(_preview, inputs=files_in, outputs=gallery_preview)
239
+ go.click(batch_similarity, [files_in, model_dd2, pooling2], [status, csv])
240
+
241
+
242
 
243
  if __name__ == "__main__":
244
  app.queue().launch()