import os, time, json, hashlib, glob import cv2, numpy as np import torch, gradio as gr from transformers import AutoProcessor, AutoModelForImageTextToText # ---------------------------------------------------------------------------------- # Config # ---------------------------------------------------------------------------------- MODEL_PATH = "AnasKAN/SmolVLM2-500M-Video-Instruct-video-feedback" PROCESSOR_ID = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" # Directories (relative to repo root when running in HF Spaces) REPO_ROOT = os.path.dirname(__file__) PRESET_CACHE_DIR = os.path.join(REPO_ROOT, "preset_cache") PRELOADED_CLEAN_DIR = os.path.join(REPO_ROOT, "assets", "videos_clean") RUNTIME_CACHE_DIR = "/tmp/moraqeb_cache" RUNTIME_CLEAN_DIR = "/tmp/clean_videos" os.makedirs(RUNTIME_CACHE_DIR, exist_ok=True) os.makedirs(RUNTIME_CLEAN_DIR, exist_ok=True) # Device / dtype has_cuda = torch.cuda.is_available() device = torch.device("cuda" if has_cuda else "cpu") dtype = torch.bfloat16 if has_cuda else torch.float32 # Video cleaning config (used only for uploaded videos) target_size = (640, 360) # (w, h) target_frames = 128 output_fps = 24 fourcc = cv2.VideoWriter_fourcc(*"XVID") def sample_indices(n_frames_src, n_target): if n_frames_src <= 0: return [] idxs = np.linspace(0, n_frames_src - 1, min(n_target, n_frames_src), dtype=int) return idxs.tolist() def clean_single_video(video_path, save_dir=RUNTIME_CLEAN_DIR, overwrite=True): os.makedirs(save_dir, exist_ok=True) base = os.path.splitext(os.path.basename(video_path))[0] dst_path = os.path.join(save_dir, f"{base}_clean.avi") if os.path.exists(dst_path) and not overwrite: print(f"[SKIP] {dst_path} exists") return dst_path cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"[ERROR] Cannot open {video_path}") return None total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) idxs = sample_indices(total_frames, target_frames) frames, grabbed, cur = [], 0, 0 next_targets = set(idxs) while True: ret, frame = cap.read() if not ret: break if cur in next_targets: frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA) frames.append(frame); grabbed += 1 if grabbed == len(idxs): break cur += 1 cap.release() if not frames: print(f"[ERROR] No frames from {video_path}") return None while len(frames) < target_frames: frames.append(frames[-1]) out = cv2.VideoWriter(dst_path, fourcc, output_fps, target_size) for f in frames: out.write(f) out.release() print(f"[OK] Cleaned {video_path} -> {dst_path} ({len(frames)} frames @ {output_fps} FPS)") return dst_path # ---------------------------------------------------------------------------------- # Caching helpers # ---------------------------------------------------------------------------------- def _sha1_file(path, chunk=1024*1024): h = hashlib.sha1() with open(path, "rb") as f: while True: b = f.read(chunk) if not b: break h.update(b) return h.hexdigest() def _make_cache_key(cleaned_path, question, gen_kwargs): vid_hash = _sha1_file(cleaned_path) payload = { "model": MODEL_PATH, "video_hash": vid_hash, "question": (question or "").strip(), "gen": { "do_sample": bool(gen_kwargs.get("do_sample", False)), "max_new_tokens": int(gen_kwargs.get("max_new_tokens", 128)), "temperature": float(gen_kwargs.get("temperature", 0.7)) if gen_kwargs.get("do_sample") else None, "top_p": float(gen_kwargs.get("top_p", 0.9)) if gen_kwargs.get("do_sample") else None, }, } raw = json.dumps(payload, sort_keys=True).encode("utf-8") return hashlib.sha1(raw).hexdigest() def _cache_path(dir_, key): return os.path.join(dir_, f"{key}.json") def preset_cache_get(key): p = _cache_path(PRESET_CACHE_DIR, key) if os.path.exists(p): try: with open(p, "r", encoding="utf-8") as f: print("[PRESET] HIT", os.path.basename(p)) return json.load(f) except Exception as e: print(f"[PRESET] Failed to read {p}: {e}") return None def runtime_cache_get(key): p = _cache_path(RUNTIME_CACHE_DIR, key) if os.path.exists(p): try: with open(p, "r", encoding="utf-8") as f: print("[CACHE] HIT", os.path.basename(p)) return json.load(f) except Exception as e: print(f"[CACHE] Failed to read {p}: {e}") return None def runtime_cache_put(key, data): p = _cache_path(RUNTIME_CACHE_DIR, key) try: with open(p, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) return True except Exception as e: print(f"[CACHE] Failed to write {p}: {e}") return False def cache_clear(): n = 0 for fp in glob.glob(os.path.join(RUNTIME_CACHE_DIR, "*.json")): try: os.remove(fp); n += 1 except: pass return n # ---------------------------------------------------------------------------------- # Lazy pipeline (loaded only on cache miss) # ---------------------------------------------------------------------------------- _model = None _processor = None def load_pipeline(): global _model, _processor if _model is not None and _processor is not None: print("[INFO] Pipeline already loaded") return _processor, _model print(f"[INFO] Loading processor: {PROCESSOR_ID}") _processor = AutoProcessor.from_pretrained(PROCESSOR_ID) print(f"[INFO] Loading model: {MODEL_PATH}") _model = AutoModelForImageTextToText.from_pretrained( MODEL_PATH, torch_dtype=dtype, _attn_implementation="eager", low_cpu_mem_usage=True, ).to(device).eval() print("[INFO] Pipeline ready โœ…") return _processor, _model def build_messages(video_path: str, question: str): q = (question or "").strip() or "Caption the video." return [{ "role": "user", "content": [ {"type": "video", "path": video_path}, {"type": "text", "text": q}, ], }] # ---------------------------------------------------------------------------------- # Inference handlers # ---------------------------------------------------------------------------------- def _maybe_clean_uploaded(video_path): if video_path and os.path.commonpath([os.path.abspath(video_path), PRELOADED_CLEAN_DIR]) == PRELOADED_CLEAN_DIR: print("[INFO] Using preloaded cleaned video:", video_path) return video_path print(f"[INFO] Cleaning uploaded video {video_path} ...") return clean_single_video(video_path, save_dir=RUNTIME_CLEAN_DIR, overwrite=True) def _infer_core(cleaned_path, question, gen_kwargs, use_cache=True): key = _make_cache_key(cleaned_path, question, gen_kwargs) if use_cache: preset = preset_cache_get(key) if preset: return preset["answer"], preset.get("latency_s", 0.0), preset.get("tokens_per_s", 0.0), preset.get("new_tokens", 0) cached = runtime_cache_get(key) if cached: return cached["answer"], cached["latency_s"], cached["tokens_per_s"], cached["new_tokens"] processor, model = load_pipeline() print("[INFO] Tokenizing ...") messages = build_messages(cleaned_path, question) inputs = processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", video_load_backend="decord", ).to(device, dtype=dtype) print(f"[INFO] Generating with {gen_kwargs}") start = time.time() with torch.inference_mode(): generated_ids = model.generate(**inputs, **gen_kwargs) elapsed = time.time() - start print(f"[INFO] Generation took {elapsed:.2f}s") raw_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] out_text = raw_text.split("Assistant:", 1)[1].strip() if "Assistant:" in raw_text else raw_text.strip() new_tokens = int(generated_ids.shape[-1] - inputs["input_ids"].shape[-1]) tps = (new_tokens / elapsed) if elapsed > 0 else 0.0 print(f"[INFO] Decoded {new_tokens} tokens @ {tps:.2f} tok/s") if use_cache: runtime_cache_put(key, { "answer": out_text, "latency_s": round(elapsed, 3), "tokens_per_s": round(tps, 2), "new_tokens": new_tokens, }) return out_text, round(elapsed, 3), round(tps, 2), new_tokens # 1) For uploaded videos def infer_upload(video, question, max_new_tokens, do_sample, temperature, top_p, use_cache): if video is None: return "Please upload a video.", 0.0, 0.0, 0 cleaned_path = _maybe_clean_uploaded(video) if cleaned_path is None: return "Video cleaning failed.", 0.0, 0.0, 0 gen_kwargs = dict(do_sample=bool(do_sample), max_new_tokens=int(max_new_tokens)) if do_sample: gen_kwargs.update(temperature=float(temperature), top_p=float(top_p)) return _infer_core(cleaned_path, question, gen_kwargs, use_cache=bool(use_cache)) # 2) For preloaded cleaned videos (no cleaning) def infer_preloaded(preloaded_name, question, max_new_tokens, do_sample, temperature, top_p, use_cache): if not preloaded_name: return "Pick a preloaded video.", 0.0, 0.0, 0 cleaned_path = os.path.join(PRELOADED_CLEAN_DIR, preloaded_name) if not os.path.exists(cleaned_path): return f"Video not found: {cleaned_path}", 0.0, 0.0, 0 gen_kwargs = dict(do_sample=bool(do_sample), max_new_tokens=int(max_new_tokens)) if do_sample: gen_kwargs.update(temperature=float(temperature), top_p=float(top_p)) return _infer_core(cleaned_path, question, gen_kwargs, use_cache=bool(use_cache)) def _ui_clear_cache(): removed = cache_clear() return gr.update(value=f"Cleared {removed} runtime cached items.") # ---------------------------------------------------------------------------------- # UI helpers # ---------------------------------------------------------------------------------- def _list_preloaded_clean(): if not os.path.isdir(PRELOADED_CLEAN_DIR): return [] exts = (".avi", ".mp4", ".mov", ".mkv") return [f for f in sorted(os.listdir(PRELOADED_CLEAN_DIR)) if f.lower().endswith(exts)] def _resolve_preloaded_path(name): if not name: return None path = os.path.join(PRELOADED_CLEAN_DIR, name) return path if os.path.exists(path) else None # ---------------------------------------------------------------------------------- # UI # ---------------------------------------------------------------------------------- with gr.Blocks(fill_height=True, theme=gr.themes.Soft()) as demo: gr.Markdown("# ๐Ÿงช Moraqeb โ€” Distilled VLM Demo") gr.Markdown( "Choose a **preloaded cleaned video** (fast, uses preset cache if available) or **upload your own**. " "On a cache hit, the app returns instantly **without loading the model**." ) with gr.Tabs(): # --- Preloaded cleaned videos tab --- with gr.Tab("Preloaded (recommended)"): with gr.Row(): with gr.Column(scale=1): preloaded = gr.Dropdown( choices=_list_preloaded_clean(), label="Pick a cleaned video (assets/videos_clean/)", interactive=True ) # ๐Ÿ‘‡ Video preview that updates when you pick a file preview = gr.Video(label="Preview", autoplay=False) # When dropdown changes, update the video preview source to the actual file path preloaded.change( fn=_resolve_preloaded_path, inputs=preloaded, outputs=preview, ) with gr.Column(scale=1): question_pre = gr.Textbox( label="Question", value="Can you describe the entire video in detail from start to finish?", lines=3, ) max_new_tokens_pre = gr.Slider(8, 512, value=128, step=8, label="Max new tokens") do_sample_pre = gr.Checkbox(value=False, label="do_sample (enable sampling)") temperature_pre = gr.Slider(0.1, 1.5, value=0.7, step=0.1, label="temperature") top_p_pre = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="top_p") use_cache_pre = gr.Checkbox(value=True, label="Use cache") run_btn_pre = gr.Button("Run on Preloaded Video", variant="primary") with gr.Row(): answer_pre = gr.Textbox(label="Answer", lines=8) with gr.Row(): latency_pre = gr.Number(label="Latency (s)") tps_pre = gr.Number(label="Speed (tokens/sec)") ntoks_pre = gr.Number(label="New tokens") # --- Upload tab --- with gr.Tab("Upload your video"): with gr.Row(): with gr.Column(scale=1): video = gr.Video(label="Upload video", sources=["upload"], autoplay=False) with gr.Column(scale=1): question = gr.Textbox( label="Question", value="Can you describe the entire video in detail from start to finish?", lines=3, ) max_new_tokens = gr.Slider(8, 512, value=128, step=8, label="Max new tokens") do_sample = gr.Checkbox(value=False, label="do_sample (enable sampling)") temperature = gr.Slider(0.1, 1.5, value=0.7, step=0.1, label="temperature") top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="top_p") use_cache = gr.Checkbox(value=True, label="Use cache") run_btn = gr.Button("Run Inference", variant="primary") with gr.Row(): answer = gr.Textbox(label="Answer", lines=8) with gr.Row(): latency = gr.Number(label="Latency (s)") tps = gr.Number(label="Speed (tokens/sec)") ntoks = gr.Number(label="New tokens") # Cache controls with gr.Row(): clear_cache_btn = gr.Button("Clear RUNTIME Cache") clear_cache_status = gr.Textbox(label="Cache status", interactive=False) # Wiring run_btn_pre.click( fn=infer_preloaded, inputs=[preloaded, question_pre, max_new_tokens_pre, do_sample_pre, temperature_pre, top_p_pre, use_cache_pre], outputs=[answer_pre, latency_pre, tps_pre, ntoks_pre], api_name="infer_preloaded", queue=True, ) run_btn.click( fn=infer_upload, inputs=[video, question, max_new_tokens, do_sample, temperature, top_p, use_cache], outputs=[answer, latency, tps, ntoks], api_name="infer_upload", queue=True, ) clear_cache_btn.click(_ui_clear_cache, inputs=[], outputs=[clear_cache_status]) gr.Markdown(f"Hardware: **{'CUDA' if has_cuda else 'CPU'}**, dtype **{dtype}**") if __name__ == "__main__": demo.launch(server_port=7860, share=True)