Moraqeb / app.py
AnasKAN's picture
Initial commit: Add Gradio app + cleaned demo videos (via Git LFS) + preset cache + requirements
05f57ab
import os, time, json, hashlib, glob
import cv2, numpy as np
import torch, gradio as gr
from transformers import AutoProcessor, AutoModelForImageTextToText
# ----------------------------------------------------------------------------------
# Config
# ----------------------------------------------------------------------------------
MODEL_PATH = "AnasKAN/SmolVLM2-500M-Video-Instruct-video-feedback"
PROCESSOR_ID = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct"
# Directories (relative to repo root when running in HF Spaces)
REPO_ROOT = os.path.dirname(__file__)
PRESET_CACHE_DIR = os.path.join(REPO_ROOT, "preset_cache")
PRELOADED_CLEAN_DIR = os.path.join(REPO_ROOT, "assets", "videos_clean")
RUNTIME_CACHE_DIR = "/tmp/moraqeb_cache"
RUNTIME_CLEAN_DIR = "/tmp/clean_videos"
os.makedirs(RUNTIME_CACHE_DIR, exist_ok=True)
os.makedirs(RUNTIME_CLEAN_DIR, exist_ok=True)
# Device / dtype
has_cuda = torch.cuda.is_available()
device = torch.device("cuda" if has_cuda else "cpu")
dtype = torch.bfloat16 if has_cuda else torch.float32
# Video cleaning config (used only for uploaded videos)
target_size = (640, 360) # (w, h)
target_frames = 128
output_fps = 24
fourcc = cv2.VideoWriter_fourcc(*"XVID")
def sample_indices(n_frames_src, n_target):
if n_frames_src <= 0: return []
idxs = np.linspace(0, n_frames_src - 1, min(n_target, n_frames_src), dtype=int)
return idxs.tolist()
def clean_single_video(video_path, save_dir=RUNTIME_CLEAN_DIR, overwrite=True):
os.makedirs(save_dir, exist_ok=True)
base = os.path.splitext(os.path.basename(video_path))[0]
dst_path = os.path.join(save_dir, f"{base}_clean.avi")
if os.path.exists(dst_path) and not overwrite:
print(f"[SKIP] {dst_path} exists")
return dst_path
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"[ERROR] Cannot open {video_path}")
return None
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
idxs = sample_indices(total_frames, target_frames)
frames, grabbed, cur = [], 0, 0
next_targets = set(idxs)
while True:
ret, frame = cap.read()
if not ret: break
if cur in next_targets:
frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA)
frames.append(frame); grabbed += 1
if grabbed == len(idxs): break
cur += 1
cap.release()
if not frames:
print(f"[ERROR] No frames from {video_path}")
return None
while len(frames) < target_frames:
frames.append(frames[-1])
out = cv2.VideoWriter(dst_path, fourcc, output_fps, target_size)
for f in frames: out.write(f)
out.release()
print(f"[OK] Cleaned {video_path} -> {dst_path} ({len(frames)} frames @ {output_fps} FPS)")
return dst_path
# ----------------------------------------------------------------------------------
# Caching helpers
# ----------------------------------------------------------------------------------
def _sha1_file(path, chunk=1024*1024):
h = hashlib.sha1()
with open(path, "rb") as f:
while True:
b = f.read(chunk)
if not b: break
h.update(b)
return h.hexdigest()
def _make_cache_key(cleaned_path, question, gen_kwargs):
vid_hash = _sha1_file(cleaned_path)
payload = {
"model": MODEL_PATH,
"video_hash": vid_hash,
"question": (question or "").strip(),
"gen": {
"do_sample": bool(gen_kwargs.get("do_sample", False)),
"max_new_tokens": int(gen_kwargs.get("max_new_tokens", 128)),
"temperature": float(gen_kwargs.get("temperature", 0.7)) if gen_kwargs.get("do_sample") else None,
"top_p": float(gen_kwargs.get("top_p", 0.9)) if gen_kwargs.get("do_sample") else None,
},
}
raw = json.dumps(payload, sort_keys=True).encode("utf-8")
return hashlib.sha1(raw).hexdigest()
def _cache_path(dir_, key): return os.path.join(dir_, f"{key}.json")
def preset_cache_get(key):
p = _cache_path(PRESET_CACHE_DIR, key)
if os.path.exists(p):
try:
with open(p, "r", encoding="utf-8") as f:
print("[PRESET] HIT", os.path.basename(p))
return json.load(f)
except Exception as e:
print(f"[PRESET] Failed to read {p}: {e}")
return None
def runtime_cache_get(key):
p = _cache_path(RUNTIME_CACHE_DIR, key)
if os.path.exists(p):
try:
with open(p, "r", encoding="utf-8") as f:
print("[CACHE] HIT", os.path.basename(p))
return json.load(f)
except Exception as e:
print(f"[CACHE] Failed to read {p}: {e}")
return None
def runtime_cache_put(key, data):
p = _cache_path(RUNTIME_CACHE_DIR, key)
try:
with open(p, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
return True
except Exception as e:
print(f"[CACHE] Failed to write {p}: {e}")
return False
def cache_clear():
n = 0
for fp in glob.glob(os.path.join(RUNTIME_CACHE_DIR, "*.json")):
try: os.remove(fp); n += 1
except: pass
return n
# ----------------------------------------------------------------------------------
# Lazy pipeline (loaded only on cache miss)
# ----------------------------------------------------------------------------------
_model = None
_processor = None
def load_pipeline():
global _model, _processor
if _model is not None and _processor is not None:
print("[INFO] Pipeline already loaded")
return _processor, _model
print(f"[INFO] Loading processor: {PROCESSOR_ID}")
_processor = AutoProcessor.from_pretrained(PROCESSOR_ID)
print(f"[INFO] Loading model: {MODEL_PATH}")
_model = AutoModelForImageTextToText.from_pretrained(
MODEL_PATH,
torch_dtype=dtype,
_attn_implementation="eager",
low_cpu_mem_usage=True,
).to(device).eval()
print("[INFO] Pipeline ready βœ…")
return _processor, _model
def build_messages(video_path: str, question: str):
q = (question or "").strip() or "Caption the video."
return [{
"role": "user",
"content": [
{"type": "video", "path": video_path},
{"type": "text", "text": q},
],
}]
# ----------------------------------------------------------------------------------
# Inference handlers
# ----------------------------------------------------------------------------------
def _maybe_clean_uploaded(video_path):
if video_path and os.path.commonpath([os.path.abspath(video_path), PRELOADED_CLEAN_DIR]) == PRELOADED_CLEAN_DIR:
print("[INFO] Using preloaded cleaned video:", video_path)
return video_path
print(f"[INFO] Cleaning uploaded video {video_path} ...")
return clean_single_video(video_path, save_dir=RUNTIME_CLEAN_DIR, overwrite=True)
def _infer_core(cleaned_path, question, gen_kwargs, use_cache=True):
key = _make_cache_key(cleaned_path, question, gen_kwargs)
if use_cache:
preset = preset_cache_get(key)
if preset:
return preset["answer"], preset.get("latency_s", 0.0), preset.get("tokens_per_s", 0.0), preset.get("new_tokens", 0)
cached = runtime_cache_get(key)
if cached:
return cached["answer"], cached["latency_s"], cached["tokens_per_s"], cached["new_tokens"]
processor, model = load_pipeline()
print("[INFO] Tokenizing ...")
messages = build_messages(cleaned_path, question)
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
video_load_backend="decord",
).to(device, dtype=dtype)
print(f"[INFO] Generating with {gen_kwargs}")
start = time.time()
with torch.inference_mode():
generated_ids = model.generate(**inputs, **gen_kwargs)
elapsed = time.time() - start
print(f"[INFO] Generation took {elapsed:.2f}s")
raw_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
out_text = raw_text.split("Assistant:", 1)[1].strip() if "Assistant:" in raw_text else raw_text.strip()
new_tokens = int(generated_ids.shape[-1] - inputs["input_ids"].shape[-1])
tps = (new_tokens / elapsed) if elapsed > 0 else 0.0
print(f"[INFO] Decoded {new_tokens} tokens @ {tps:.2f} tok/s")
if use_cache:
runtime_cache_put(key, {
"answer": out_text,
"latency_s": round(elapsed, 3),
"tokens_per_s": round(tps, 2),
"new_tokens": new_tokens,
})
return out_text, round(elapsed, 3), round(tps, 2), new_tokens
# 1) For uploaded videos
def infer_upload(video, question, max_new_tokens, do_sample, temperature, top_p, use_cache):
if video is None:
return "Please upload a video.", 0.0, 0.0, 0
cleaned_path = _maybe_clean_uploaded(video)
if cleaned_path is None:
return "Video cleaning failed.", 0.0, 0.0, 0
gen_kwargs = dict(do_sample=bool(do_sample), max_new_tokens=int(max_new_tokens))
if do_sample:
gen_kwargs.update(temperature=float(temperature), top_p=float(top_p))
return _infer_core(cleaned_path, question, gen_kwargs, use_cache=bool(use_cache))
# 2) For preloaded cleaned videos (no cleaning)
def infer_preloaded(preloaded_name, question, max_new_tokens, do_sample, temperature, top_p, use_cache):
if not preloaded_name:
return "Pick a preloaded video.", 0.0, 0.0, 0
cleaned_path = os.path.join(PRELOADED_CLEAN_DIR, preloaded_name)
if not os.path.exists(cleaned_path):
return f"Video not found: {cleaned_path}", 0.0, 0.0, 0
gen_kwargs = dict(do_sample=bool(do_sample), max_new_tokens=int(max_new_tokens))
if do_sample:
gen_kwargs.update(temperature=float(temperature), top_p=float(top_p))
return _infer_core(cleaned_path, question, gen_kwargs, use_cache=bool(use_cache))
def _ui_clear_cache():
removed = cache_clear()
return gr.update(value=f"Cleared {removed} runtime cached items.")
# ----------------------------------------------------------------------------------
# UI helpers
# ----------------------------------------------------------------------------------
def _list_preloaded_clean():
if not os.path.isdir(PRELOADED_CLEAN_DIR):
return []
exts = (".avi", ".mp4", ".mov", ".mkv")
return [f for f in sorted(os.listdir(PRELOADED_CLEAN_DIR)) if f.lower().endswith(exts)]
def _resolve_preloaded_path(name):
if not name: return None
path = os.path.join(PRELOADED_CLEAN_DIR, name)
return path if os.path.exists(path) else None
# ----------------------------------------------------------------------------------
# UI
# ----------------------------------------------------------------------------------
with gr.Blocks(fill_height=True, theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ§ͺ Moraqeb β€” Distilled VLM Demo")
gr.Markdown(
"Choose a **preloaded cleaned video** (fast, uses preset cache if available) or **upload your own**. "
"On a cache hit, the app returns instantly **without loading the model**."
)
with gr.Tabs():
# --- Preloaded cleaned videos tab ---
with gr.Tab("Preloaded (recommended)"):
with gr.Row():
with gr.Column(scale=1):
preloaded = gr.Dropdown(
choices=_list_preloaded_clean(),
label="Pick a cleaned video (assets/videos_clean/)",
interactive=True
)
# πŸ‘‡ Video preview that updates when you pick a file
preview = gr.Video(label="Preview", autoplay=False)
# When dropdown changes, update the video preview source to the actual file path
preloaded.change(
fn=_resolve_preloaded_path,
inputs=preloaded,
outputs=preview,
)
with gr.Column(scale=1):
question_pre = gr.Textbox(
label="Question",
value="Can you describe the entire video in detail from start to finish?",
lines=3,
)
max_new_tokens_pre = gr.Slider(8, 512, value=128, step=8, label="Max new tokens")
do_sample_pre = gr.Checkbox(value=False, label="do_sample (enable sampling)")
temperature_pre = gr.Slider(0.1, 1.5, value=0.7, step=0.1, label="temperature")
top_p_pre = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="top_p")
use_cache_pre = gr.Checkbox(value=True, label="Use cache")
run_btn_pre = gr.Button("Run on Preloaded Video", variant="primary")
with gr.Row():
answer_pre = gr.Textbox(label="Answer", lines=8)
with gr.Row():
latency_pre = gr.Number(label="Latency (s)")
tps_pre = gr.Number(label="Speed (tokens/sec)")
ntoks_pre = gr.Number(label="New tokens")
# --- Upload tab ---
with gr.Tab("Upload your video"):
with gr.Row():
with gr.Column(scale=1):
video = gr.Video(label="Upload video", sources=["upload"], autoplay=False)
with gr.Column(scale=1):
question = gr.Textbox(
label="Question",
value="Can you describe the entire video in detail from start to finish?",
lines=3,
)
max_new_tokens = gr.Slider(8, 512, value=128, step=8, label="Max new tokens")
do_sample = gr.Checkbox(value=False, label="do_sample (enable sampling)")
temperature = gr.Slider(0.1, 1.5, value=0.7, step=0.1, label="temperature")
top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="top_p")
use_cache = gr.Checkbox(value=True, label="Use cache")
run_btn = gr.Button("Run Inference", variant="primary")
with gr.Row():
answer = gr.Textbox(label="Answer", lines=8)
with gr.Row():
latency = gr.Number(label="Latency (s)")
tps = gr.Number(label="Speed (tokens/sec)")
ntoks = gr.Number(label="New tokens")
# Cache controls
with gr.Row():
clear_cache_btn = gr.Button("Clear RUNTIME Cache")
clear_cache_status = gr.Textbox(label="Cache status", interactive=False)
# Wiring
run_btn_pre.click(
fn=infer_preloaded,
inputs=[preloaded, question_pre, max_new_tokens_pre, do_sample_pre, temperature_pre, top_p_pre, use_cache_pre],
outputs=[answer_pre, latency_pre, tps_pre, ntoks_pre],
api_name="infer_preloaded",
queue=True,
)
run_btn.click(
fn=infer_upload,
inputs=[video, question, max_new_tokens, do_sample, temperature, top_p, use_cache],
outputs=[answer, latency, tps, ntoks],
api_name="infer_upload",
queue=True,
)
clear_cache_btn.click(_ui_clear_cache, inputs=[], outputs=[clear_cache_status])
gr.Markdown(f"Hardware: **{'CUDA' if has_cuda else 'CPU'}**, dtype **{dtype}**")
if __name__ == "__main__":
demo.launch(server_port=7860, share=True)