# t2v_cpu_gradio.py import os, math, tempfile, uuid, time from pathlib import Path from typing import List import torch from diffusers import StableDiffusionPipeline, DDIMScheduler from PIL import Image import imageio import gradio as gr from tqdm import tqdm # ------------------------- # CONFIG (tune for speed) # ------------------------- MODEL_ID = "runwayml/stable-diffusion-v1-5" # or other small sd model DEVICE = "cpu" STEPS = 20 # small # of steps -> much faster but lower quality WIDTH, HEIGHT = 512, 320 # smaller height helps speed; keep multiples of 8 NUM_FRAMES = 8 # base key frames to generate INTERPOLATION_FACTOR = 2 # output FPS multiplier (optional, via RIFE later) SEED = None # None = random OUTPUT_DIR = Path("outputs") OUTPUT_DIR.mkdir(exist_ok=True, parents=True) # ------------------------- torch.set_num_threads(max(1, os.cpu_count()//2)) # limit threads to avoid oversubscription def make_pipeline(): # Use DDIMScheduler for faster sampling (fewer steps usually OK) scheduler = DDIMScheduler.from_pretrained(MODEL_ID, subfolder="scheduler") pipe = StableDiffusionPipeline.from_pretrained(MODEL_ID, safety_checker=None, torch_dtype=torch.float32) pipe.scheduler = scheduler pipe = pipe.to(DEVICE) # reduce cache, disable progress bars from underlying libs pipe.enable_attention_slicing() # reduces memory usage (useful on CPU) return pipe def seed_context(seed): if seed is None: seed = int.from_bytes(os.urandom(2), "big") # small random seed generator = torch.Generator(device=DEVICE).manual_seed(seed) return generator def generate_frames(prompt: str, steps:int, width:int, height:int, n_frames:int, seed:int, progress: gr.Progress): pipe = make_pipeline() gen = seed_context(seed) # create one seed per frame for variance (fast) frame_paths = [] for i in range(n_frames): prog = (i)/max(1, n_frames) progress((prog, f"Generating frame {i+1}/{n_frames}")) # small prompt schedule example — could add motion prompts prompt_i = f"{prompt} --frame:{i}" out = pipe(prompt_i, num_inference_steps=steps, width=width, height=height, generator=gen) img = out.images[0].convert("RGB") fname = OUTPUT_DIR / f"frame_{i:03d}.png" img.save(fname) frame_paths.append(fname) # tiny sleep so UI shows updates smoothly on busy CPUs time.sleep(0.05) progress((1.0, "Done generating keyframes")) return frame_paths def simple_rife_interp(frame_paths: List[Path], factor:int, progress: gr.Progress): """ Placeholder: calls out to a RIFE binary or python function if available. NOTE: RIFE often needs GPU; CPU versions exist but are slow. If RIFE not available, perform simple linear crossfade (cheap, low-quality). """ interp_frames = [] total_pairs = len(frame_paths)-1 for idx in range(total_pairs): progress((idx/total_pairs, f"Interpolating pair {idx+1}/{total_pairs}")) a = Image.open(frame_paths[idx]).convert("RGB") b = Image.open(frame_paths[idx+1]).convert("RGB") interp_frames.append(frame_paths[idx]) # keep first of pair # linear crossfade steps (very cheap) for t in range(1, factor): alpha = t / factor im = Image.blend(a, b, alpha) temp = OUTPUT_DIR / f"interp_{idx:03d}_{t:02d}.png" im.save(temp) interp_frames.append(temp) interp_frames.append(frame_paths[-1]) progress((1.0, "Done interpolation")) return interp_frames def assemble_video(frame_paths: List[Path], fps:int=8): out_vid = OUTPUT_DIR / f"video_{uuid.uuid4().hex[:8]}.mp4" frames = [imageio.imread(str(p)) for p in frame_paths] imageio.mimsave(out_vid, frames, fps=fps) return out_vid # ------------------------- # Gradio UI # ------------------------- def run_pipeline(prompt: str, steps: int, width: int, height: int, n_frames: int, interp_factor: int, seed_input: int, progress=gr.Progress()): start = time.time() seed = seed_input if seed_input>0 else None # generate keyframes frames = generate_frames(prompt, steps, width, height, n_frames, seed, progress) # interpolation (cheap fallback implemented) if interp_factor and interp_factor>1: frames_interp = simple_rife_interp(frames, interp_factor, progress) else: frames_interp = frames # assemble progress((0.95, "Assembling video...")) vid = assemble_video(frames_interp, fps=4*interp_factor if interp_factor>0 else 4) elapsed = time.time()-start progress((1.0, f"Finished in {elapsed:.1f}s -> {vid.name}")) return str(vid) with gr.Blocks() as demo: gr.Markdown("## CPU Text→Video (fast settings) — Stable Diffusion + Gradio progress") with gr.Row(): with gr.Column(): prompt = gr.Textbox(label="Prompt", value="A cinematic short looping scene, 3D lighting, minimal text") steps = gr.Slider(label="Steps (lower=faster)", minimum=5, maximum=50, value=STEPS, step=1) width_in = gr.Dropdown([256,384,512], value=WIDTH, label="Width") height_in = gr.Dropdown([192,256,320], value=HEIGHT, label="Height") n_frames = gr.Slider(label="Base frames", minimum=2, maximum=12, value=NUM_FRAMES, step=1) interp = gr.Slider(label="Interp factor (optional)", minimum=1, maximum=6, value=INTERPOLATION_FACTOR, step=1) seed_box = gr.Number(label="Seed (0=random)", value=0) run_btn = gr.Button("Generate") with gr.Column(): out_video = gr.Video(label="Result video") logs = gr.Textbox(label="Log (last message shown)", interactive=False) # attach function with progress param run_btn.click(fn=run_pipeline, inputs=[prompt, steps, width_in, height_in, n_frames, interp, seed_box], outputs=[out_video], api_name="generate") demo.launch()