Spaces:
Build error
Build error
import spaces | |
""" | |
Copyright NewGenAI | |
Code can't be included in commercial app used for monetary gain. No derivative code allowed. | |
""" | |
import gc | |
import json | |
import torch | |
import tqdm | |
import gradio as gr | |
import random | |
import time | |
from datetime import datetime | |
import os | |
from diffusers.utils import export_to_video | |
from diffusers import LTXPipeline | |
from transformers import T5EncoderModel, T5Tokenizer | |
from pathlib import Path | |
from datetime import datetime | |
from huggingface_hub import hf_hub_download | |
STATE_FILE = "LTX091_state.json" | |
queue = [] | |
def load_state(): | |
if os.path.exists(STATE_FILE): | |
with open(STATE_FILE, "r") as file: | |
return json.load(file) | |
return {} | |
# Function to save the current state | |
def save_state(state): | |
with open(STATE_FILE, "w") as file: | |
json.dump(state, file) | |
# Load initial state | |
initial_state = load_state() | |
def add_to_queue(prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed): | |
task = { | |
"prompt": prompt, | |
"negative_prompt": negative_prompt, | |
"height": height, | |
"width": width, | |
"num_frames": num_frames, | |
"num_inference_steps": num_inference_steps, | |
"fps": fps, | |
"seed": seed, | |
} | |
queue.append(task) | |
return f"Task added to queue. Current queue length: {len(queue)}" | |
def clear_queue(): | |
queue.clear() | |
return "Queue cleared." | |
def process_queue(): | |
if not queue: | |
return "Queue is empty." | |
for i, task in tqdm(enumerate(queue)): | |
generate_video(**task) | |
time.sleep(1) # Simulate processing time | |
queue.clear() | |
tqdm.close() | |
return "All tasks in the queue have been processed." | |
def save_ui_state(prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed): | |
state = { | |
"prompt": prompt, | |
"negative_prompt": negative_prompt, | |
"height": height, | |
"width": width, | |
"num_frames": num_frames, | |
"num_inference_steps": num_inference_steps, | |
"fps": fps, | |
"seed": seed, | |
} | |
save_state(state) | |
return "State saved!" | |
repo_id = "a-r-r-o-w/LTX-Video-0.9.1-diffusers" | |
base_path = repo_id | |
files_to_download = [ | |
"model_index.json", | |
"scheduler/scheduler_config.json", | |
"text_encoder/config.json", | |
"text_encoder/model-00001-of-00004.safetensors", | |
"text_encoder/model-00002-of-00004.safetensors", | |
"text_encoder/model-00003-of-00004.safetensors", | |
"text_encoder/model-00004-of-00004.safetensors", | |
"text_encoder/model.safetensors.index.json", | |
"tokenizer/added_tokens.json", | |
"tokenizer/special_tokens_map.json", | |
"tokenizer/spiece.model", | |
"tokenizer/tokenizer_config.json", | |
"transformer/config.json", | |
"transformer/diffusion_pytorch_model.safetensors", | |
"vae/config.json", | |
"vae/diffusion_pytorch_model.safetensors", | |
] | |
os.makedirs(base_path, exist_ok=True) | |
for file_path in files_to_download: | |
try: | |
# Create the full directory path for this file | |
full_dir = os.path.join(base_path, os.path.dirname(file_path)) | |
os.makedirs(full_dir, exist_ok=True) | |
# Download the file | |
downloaded_path = hf_hub_download( | |
repo_id=repo_id, | |
filename=file_path, | |
local_dir=base_path, | |
) | |
print(f"Successfully downloaded: {file_path}") | |
except Exception as e: | |
print(f"Error downloading {file_path}: {str(e)}") | |
raise | |
# Download model from different repo | |
try: | |
# Create the full directory path for this file | |
full_dir = os.path.join(base_path, os.path.dirname(file_path)) | |
os.makedirs(full_dir, exist_ok=True) | |
# Download the file | |
downloaded_path = hf_hub_download( | |
repo_id="Lightricks/LTX-Video", | |
filename="ltx-video-2b-v0.9.1.safetensors", | |
local_dir=repo_id, | |
) | |
except Exception as e: | |
print(f"Error downloading 0.9.1 model: {str(e)}") | |
raise | |
single_file_url = repo_id+"/ltx-video-2b-v0.9.1.safetensors" | |
text_encoder = T5EncoderModel.from_pretrained( | |
repo_id, subfolder="text_encoder", torch_dtype=torch.bfloat16 | |
) | |
tokenizer = T5Tokenizer.from_pretrained( | |
repo_id, subfolder="tokenizer", torch_dtype=torch.bfloat16 | |
) | |
pipe = LTXPipeline.from_single_file( | |
single_file_url, | |
text_encoder=text_encoder, | |
tokenizer=tokenizer, | |
torch_dtype=torch.bfloat16 | |
) | |
pipe.vae.enable_tiling() | |
pipe.to("cuda") | |
# pipe.load_lora_weights("TODO/TODO", adapter_name="ltx-lora") | |
# pipe.set_adapters(["lrx-lora"], adapter_weights=[1.0]) | |
def generate_video(prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed, progress=gr.Progress(track_tqdm=True)): | |
INTERRUPT_PIPELINE = False | |
progress_steps = [] | |
# Randomize seed if seed is 0 | |
if seed == 0: | |
seed = randomize_seed() | |
torch.cuda.empty_cache() | |
torch.cuda.synchronize() | |
# Generating the video <Does not support seed :( > | |
video = pipe( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
num_frames=num_frames, | |
num_inference_steps=num_inference_steps, | |
generator=torch.Generator(device='cuda').manual_seed(seed), | |
).frames[0] | |
# Create output filename based on prompt and timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{prompt[:10]}_{timestamp}.mp4" | |
# Save the video to the output folder | |
os.makedirs("output_LTX091", exist_ok=True) | |
output_path = f"./output_LTX091/{filename}" | |
export_to_video(video, output_path, fps=fps) | |
torch.cuda.empty_cache() | |
gc.collect() | |
return output_path | |
# Gradio UI setup | |
def randomize_seed(): | |
return random.randint(0, 999999) | |
with gr.Blocks() as demo: | |
with gr.Tabs(): | |
with gr.Tab("Generate Video"): | |
with gr.Row(): | |
prompt = gr.Textbox(label="Prompt", lines=3, value=initial_state.get("prompt", "A dramatic view of the pyramids at Giza during sunset.")) | |
negative_prompt = gr.Textbox(label="Negative Prompt", lines=3, value=initial_state.get("negative_prompt", "worst quality, blurry, distorted")) | |
with gr.Row(): | |
height = gr.Slider(label="Height", minimum=224, maximum=768, step=32, value=initial_state.get("height", 384)) | |
width = gr.Slider(label="Width", minimum=320, maximum=1280, step=32, value=initial_state.get("width", 640)) | |
with gr.Row(): | |
num_frames = gr.Slider(label="Number of Frames", minimum=1, maximum=121, step=1, value=initial_state.get("num_frames", 49)) | |
num_inference_steps = gr.Slider(label="Number of Inference Steps", minimum=1, maximum=30, step=1, value=initial_state.get("num_inference_steps", 20)) | |
with gr.Row(): | |
fps = gr.Slider(label="FPS", minimum=1, maximum=30, step=1, value=initial_state.get("fps", 16)) | |
seed = gr.Number(label="Seed", value=initial_state.get("seed", 0)) | |
random_seed_button = gr.Button("Randomize Seed") | |
output_video = gr.Video(label="Generated Video", show_label=True) | |
generate_button = gr.Button("Generate Video") | |
save_state_button = gr.Button("Save State") | |
random_seed_button.click(randomize_seed, outputs=seed) | |
generate_button.click( | |
generate_video, | |
inputs=[prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed], | |
outputs=output_video | |
) | |
save_state_button.click( | |
save_ui_state, | |
inputs=[prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed], | |
outputs=gr.Text(label="State Status") | |
) | |
with gr.Tab("Batch Processing"): | |
with gr.Row(): | |
batch_prompt = gr.Textbox(label="Prompt", lines=3, value="A batch of videos depicting different landscapes.") | |
batch_negative_prompt = gr.Textbox(label="Negative Prompt", lines=3, value="low quality, inconsistent, jittery") | |
with gr.Row(): | |
batch_height = gr.Slider(label="Height", minimum=224, maximum=768, step=32, value=384) | |
batch_width = gr.Slider(label="Width", minimum=320, maximum=1280, step=32, value=640) | |
with gr.Row(): | |
batch_num_frames = gr.Slider(label="Number of Frames", minimum=1, maximum=121, step=1, value=49) | |
batch_num_inference_steps = gr.Slider(label="Number of Inference Steps", minimum=1, maximum=30, step=1, value=20) | |
with gr.Row(): | |
batch_fps = gr.Slider(label="FPS", minimum=1, maximum=30, step=1, value=16) | |
batch_seed = gr.Number(label="Seed", value=0) | |
random_seed_batch_button = gr.Button("Randomize Seed") | |
add_to_queue_button = gr.Button("Add to Queue") | |
clear_queue_button = gr.Button("Clear Queue") | |
process_queue_button = gr.Button("Process Queue") | |
queue_status = gr.Text(label="Queue Status") | |
random_seed_batch_button.click(randomize_seed, outputs=batch_seed) | |
add_to_queue_button.click( | |
add_to_queue, | |
inputs=[batch_prompt, batch_negative_prompt, batch_height, batch_width, batch_num_frames, batch_num_inference_steps, batch_fps, batch_seed], | |
outputs=queue_status | |
) | |
clear_queue_button.click(clear_queue, outputs=queue_status) | |
process_queue_button.click(process_queue, outputs=queue_status) | |
demo.launch() |