|
|
|
import os, time, json, hashlib, glob |
|
import cv2, numpy as np |
|
import torch, gradio as gr |
|
from transformers import AutoProcessor, AutoModelForImageTextToText |
|
|
|
|
|
|
|
|
|
MODEL_PATH = "AnasKAN/SmolVLM2-500M-Video-Instruct-video-feedback" |
|
PROCESSOR_ID = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" |
|
|
|
|
|
REPO_ROOT = os.path.dirname(__file__) |
|
PRESET_CACHE_DIR = os.path.join(REPO_ROOT, "preset_cache") |
|
PRELOADED_CLEAN_DIR = os.path.join(REPO_ROOT, "assets", "videos_clean") |
|
RUNTIME_CACHE_DIR = "/tmp/moraqeb_cache" |
|
RUNTIME_CLEAN_DIR = "/tmp/clean_videos" |
|
|
|
os.makedirs(RUNTIME_CACHE_DIR, exist_ok=True) |
|
os.makedirs(RUNTIME_CLEAN_DIR, exist_ok=True) |
|
|
|
|
|
has_cuda = torch.cuda.is_available() |
|
device = torch.device("cuda" if has_cuda else "cpu") |
|
dtype = torch.bfloat16 if has_cuda else torch.float32 |
|
|
|
|
|
target_size = (640, 360) |
|
target_frames = 128 |
|
output_fps = 24 |
|
fourcc = cv2.VideoWriter_fourcc(*"XVID") |
|
|
|
def sample_indices(n_frames_src, n_target): |
|
if n_frames_src <= 0: return [] |
|
idxs = np.linspace(0, n_frames_src - 1, min(n_target, n_frames_src), dtype=int) |
|
return idxs.tolist() |
|
|
|
def clean_single_video(video_path, save_dir=RUNTIME_CLEAN_DIR, overwrite=True): |
|
os.makedirs(save_dir, exist_ok=True) |
|
base = os.path.splitext(os.path.basename(video_path))[0] |
|
dst_path = os.path.join(save_dir, f"{base}_clean.avi") |
|
if os.path.exists(dst_path) and not overwrite: |
|
print(f"[SKIP] {dst_path} exists") |
|
return dst_path |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
print(f"[ERROR] Cannot open {video_path}") |
|
return None |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
idxs = sample_indices(total_frames, target_frames) |
|
frames, grabbed, cur = [], 0, 0 |
|
next_targets = set(idxs) |
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: break |
|
if cur in next_targets: |
|
frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA) |
|
frames.append(frame); grabbed += 1 |
|
if grabbed == len(idxs): break |
|
cur += 1 |
|
cap.release() |
|
if not frames: |
|
print(f"[ERROR] No frames from {video_path}") |
|
return None |
|
while len(frames) < target_frames: |
|
frames.append(frames[-1]) |
|
out = cv2.VideoWriter(dst_path, fourcc, output_fps, target_size) |
|
for f in frames: out.write(f) |
|
out.release() |
|
print(f"[OK] Cleaned {video_path} -> {dst_path} ({len(frames)} frames @ {output_fps} FPS)") |
|
return dst_path |
|
|
|
|
|
|
|
|
|
def _sha1_file(path, chunk=1024*1024): |
|
h = hashlib.sha1() |
|
with open(path, "rb") as f: |
|
while True: |
|
b = f.read(chunk) |
|
if not b: break |
|
h.update(b) |
|
return h.hexdigest() |
|
|
|
def _make_cache_key(cleaned_path, question, gen_kwargs): |
|
vid_hash = _sha1_file(cleaned_path) |
|
payload = { |
|
"model": MODEL_PATH, |
|
"video_hash": vid_hash, |
|
"question": (question or "").strip(), |
|
"gen": { |
|
"do_sample": bool(gen_kwargs.get("do_sample", False)), |
|
"max_new_tokens": int(gen_kwargs.get("max_new_tokens", 128)), |
|
"temperature": float(gen_kwargs.get("temperature", 0.7)) if gen_kwargs.get("do_sample") else None, |
|
"top_p": float(gen_kwargs.get("top_p", 0.9)) if gen_kwargs.get("do_sample") else None, |
|
}, |
|
} |
|
raw = json.dumps(payload, sort_keys=True).encode("utf-8") |
|
return hashlib.sha1(raw).hexdigest() |
|
|
|
def _cache_path(dir_, key): return os.path.join(dir_, f"{key}.json") |
|
|
|
def preset_cache_get(key): |
|
p = _cache_path(PRESET_CACHE_DIR, key) |
|
if os.path.exists(p): |
|
try: |
|
with open(p, "r", encoding="utf-8") as f: |
|
print("[PRESET] HIT", os.path.basename(p)) |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"[PRESET] Failed to read {p}: {e}") |
|
return None |
|
|
|
def runtime_cache_get(key): |
|
p = _cache_path(RUNTIME_CACHE_DIR, key) |
|
if os.path.exists(p): |
|
try: |
|
with open(p, "r", encoding="utf-8") as f: |
|
print("[CACHE] HIT", os.path.basename(p)) |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"[CACHE] Failed to read {p}: {e}") |
|
return None |
|
|
|
def runtime_cache_put(key, data): |
|
p = _cache_path(RUNTIME_CACHE_DIR, key) |
|
try: |
|
with open(p, "w", encoding="utf-8") as f: |
|
json.dump(data, f, ensure_ascii=False) |
|
return True |
|
except Exception as e: |
|
print(f"[CACHE] Failed to write {p}: {e}") |
|
return False |
|
|
|
def cache_clear(): |
|
n = 0 |
|
for fp in glob.glob(os.path.join(RUNTIME_CACHE_DIR, "*.json")): |
|
try: os.remove(fp); n += 1 |
|
except: pass |
|
return n |
|
|
|
|
|
|
|
|
|
_model = None |
|
_processor = None |
|
|
|
def load_pipeline(): |
|
global _model, _processor |
|
if _model is not None and _processor is not None: |
|
print("[INFO] Pipeline already loaded") |
|
return _processor, _model |
|
print(f"[INFO] Loading processor: {PROCESSOR_ID}") |
|
_processor = AutoProcessor.from_pretrained(PROCESSOR_ID) |
|
print(f"[INFO] Loading model: {MODEL_PATH}") |
|
_model = AutoModelForImageTextToText.from_pretrained( |
|
MODEL_PATH, |
|
torch_dtype=dtype, |
|
_attn_implementation="eager", |
|
low_cpu_mem_usage=True, |
|
).to(device).eval() |
|
print("[INFO] Pipeline ready β
") |
|
return _processor, _model |
|
|
|
def build_messages(video_path: str, question: str): |
|
q = (question or "").strip() or "Caption the video." |
|
return [{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "video", "path": video_path}, |
|
{"type": "text", "text": q}, |
|
], |
|
}] |
|
|
|
|
|
|
|
|
|
def _maybe_clean_uploaded(video_path): |
|
if video_path and os.path.commonpath([os.path.abspath(video_path), PRELOADED_CLEAN_DIR]) == PRELOADED_CLEAN_DIR: |
|
print("[INFO] Using preloaded cleaned video:", video_path) |
|
return video_path |
|
print(f"[INFO] Cleaning uploaded video {video_path} ...") |
|
return clean_single_video(video_path, save_dir=RUNTIME_CLEAN_DIR, overwrite=True) |
|
|
|
def _infer_core(cleaned_path, question, gen_kwargs, use_cache=True): |
|
key = _make_cache_key(cleaned_path, question, gen_kwargs) |
|
|
|
if use_cache: |
|
preset = preset_cache_get(key) |
|
if preset: |
|
return preset["answer"], preset.get("latency_s", 0.0), preset.get("tokens_per_s", 0.0), preset.get("new_tokens", 0) |
|
cached = runtime_cache_get(key) |
|
if cached: |
|
return cached["answer"], cached["latency_s"], cached["tokens_per_s"], cached["new_tokens"] |
|
|
|
processor, model = load_pipeline() |
|
print("[INFO] Tokenizing ...") |
|
messages = build_messages(cleaned_path, question) |
|
inputs = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
return_tensors="pt", |
|
video_load_backend="decord", |
|
).to(device, dtype=dtype) |
|
|
|
print(f"[INFO] Generating with {gen_kwargs}") |
|
start = time.time() |
|
with torch.inference_mode(): |
|
generated_ids = model.generate(**inputs, **gen_kwargs) |
|
elapsed = time.time() - start |
|
print(f"[INFO] Generation took {elapsed:.2f}s") |
|
|
|
raw_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
out_text = raw_text.split("Assistant:", 1)[1].strip() if "Assistant:" in raw_text else raw_text.strip() |
|
|
|
new_tokens = int(generated_ids.shape[-1] - inputs["input_ids"].shape[-1]) |
|
tps = (new_tokens / elapsed) if elapsed > 0 else 0.0 |
|
print(f"[INFO] Decoded {new_tokens} tokens @ {tps:.2f} tok/s") |
|
|
|
if use_cache: |
|
runtime_cache_put(key, { |
|
"answer": out_text, |
|
"latency_s": round(elapsed, 3), |
|
"tokens_per_s": round(tps, 2), |
|
"new_tokens": new_tokens, |
|
}) |
|
return out_text, round(elapsed, 3), round(tps, 2), new_tokens |
|
|
|
|
|
def infer_upload(video, question, max_new_tokens, do_sample, temperature, top_p, use_cache): |
|
if video is None: |
|
return "Please upload a video.", 0.0, 0.0, 0 |
|
cleaned_path = _maybe_clean_uploaded(video) |
|
if cleaned_path is None: |
|
return "Video cleaning failed.", 0.0, 0.0, 0 |
|
gen_kwargs = dict(do_sample=bool(do_sample), max_new_tokens=int(max_new_tokens)) |
|
if do_sample: |
|
gen_kwargs.update(temperature=float(temperature), top_p=float(top_p)) |
|
return _infer_core(cleaned_path, question, gen_kwargs, use_cache=bool(use_cache)) |
|
|
|
|
|
def infer_preloaded(preloaded_name, question, max_new_tokens, do_sample, temperature, top_p, use_cache): |
|
if not preloaded_name: |
|
return "Pick a preloaded video.", 0.0, 0.0, 0 |
|
cleaned_path = os.path.join(PRELOADED_CLEAN_DIR, preloaded_name) |
|
if not os.path.exists(cleaned_path): |
|
return f"Video not found: {cleaned_path}", 0.0, 0.0, 0 |
|
gen_kwargs = dict(do_sample=bool(do_sample), max_new_tokens=int(max_new_tokens)) |
|
if do_sample: |
|
gen_kwargs.update(temperature=float(temperature), top_p=float(top_p)) |
|
return _infer_core(cleaned_path, question, gen_kwargs, use_cache=bool(use_cache)) |
|
|
|
def _ui_clear_cache(): |
|
removed = cache_clear() |
|
return gr.update(value=f"Cleared {removed} runtime cached items.") |
|
|
|
|
|
|
|
|
|
def _list_preloaded_clean(): |
|
if not os.path.isdir(PRELOADED_CLEAN_DIR): |
|
return [] |
|
exts = (".avi", ".mp4", ".mov", ".mkv") |
|
return [f for f in sorted(os.listdir(PRELOADED_CLEAN_DIR)) if f.lower().endswith(exts)] |
|
|
|
def _resolve_preloaded_path(name): |
|
if not name: return None |
|
path = os.path.join(PRELOADED_CLEAN_DIR, name) |
|
return path if os.path.exists(path) else None |
|
|
|
|
|
|
|
|
|
with gr.Blocks(fill_height=True, theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π§ͺ Moraqeb β Distilled VLM Demo") |
|
gr.Markdown( |
|
"Choose a **preloaded cleaned video** (fast, uses preset cache if available) or **upload your own**. " |
|
"On a cache hit, the app returns instantly **without loading the model**." |
|
) |
|
|
|
with gr.Tabs(): |
|
|
|
with gr.Tab("Preloaded (recommended)"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
preloaded = gr.Dropdown( |
|
choices=_list_preloaded_clean(), |
|
label="Pick a cleaned video (assets/videos_clean/)", |
|
interactive=True |
|
) |
|
|
|
preview = gr.Video(label="Preview", autoplay=False) |
|
|
|
|
|
preloaded.change( |
|
fn=_resolve_preloaded_path, |
|
inputs=preloaded, |
|
outputs=preview, |
|
) |
|
with gr.Column(scale=1): |
|
question_pre = gr.Textbox( |
|
label="Question", |
|
value="Can you describe the entire video in detail from start to finish?", |
|
lines=3, |
|
) |
|
max_new_tokens_pre = gr.Slider(8, 512, value=128, step=8, label="Max new tokens") |
|
do_sample_pre = gr.Checkbox(value=False, label="do_sample (enable sampling)") |
|
temperature_pre = gr.Slider(0.1, 1.5, value=0.7, step=0.1, label="temperature") |
|
top_p_pre = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="top_p") |
|
use_cache_pre = gr.Checkbox(value=True, label="Use cache") |
|
run_btn_pre = gr.Button("Run on Preloaded Video", variant="primary") |
|
|
|
with gr.Row(): |
|
answer_pre = gr.Textbox(label="Answer", lines=8) |
|
with gr.Row(): |
|
latency_pre = gr.Number(label="Latency (s)") |
|
tps_pre = gr.Number(label="Speed (tokens/sec)") |
|
ntoks_pre = gr.Number(label="New tokens") |
|
|
|
|
|
with gr.Tab("Upload your video"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
video = gr.Video(label="Upload video", sources=["upload"], autoplay=False) |
|
with gr.Column(scale=1): |
|
question = gr.Textbox( |
|
label="Question", |
|
value="Can you describe the entire video in detail from start to finish?", |
|
lines=3, |
|
) |
|
max_new_tokens = gr.Slider(8, 512, value=128, step=8, label="Max new tokens") |
|
do_sample = gr.Checkbox(value=False, label="do_sample (enable sampling)") |
|
temperature = gr.Slider(0.1, 1.5, value=0.7, step=0.1, label="temperature") |
|
top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="top_p") |
|
use_cache = gr.Checkbox(value=True, label="Use cache") |
|
run_btn = gr.Button("Run Inference", variant="primary") |
|
|
|
with gr.Row(): |
|
answer = gr.Textbox(label="Answer", lines=8) |
|
with gr.Row(): |
|
latency = gr.Number(label="Latency (s)") |
|
tps = gr.Number(label="Speed (tokens/sec)") |
|
ntoks = gr.Number(label="New tokens") |
|
|
|
|
|
with gr.Row(): |
|
clear_cache_btn = gr.Button("Clear RUNTIME Cache") |
|
clear_cache_status = gr.Textbox(label="Cache status", interactive=False) |
|
|
|
|
|
run_btn_pre.click( |
|
fn=infer_preloaded, |
|
inputs=[preloaded, question_pre, max_new_tokens_pre, do_sample_pre, temperature_pre, top_p_pre, use_cache_pre], |
|
outputs=[answer_pre, latency_pre, tps_pre, ntoks_pre], |
|
api_name="infer_preloaded", |
|
queue=True, |
|
) |
|
|
|
run_btn.click( |
|
fn=infer_upload, |
|
inputs=[video, question, max_new_tokens, do_sample, temperature, top_p, use_cache], |
|
outputs=[answer, latency, tps, ntoks], |
|
api_name="infer_upload", |
|
queue=True, |
|
) |
|
|
|
clear_cache_btn.click(_ui_clear_cache, inputs=[], outputs=[clear_cache_status]) |
|
gr.Markdown(f"Hardware: **{'CUDA' if has_cuda else 'CPU'}**, dtype **{dtype}**") |
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_port=7860, share=True) |
|
|