import requests |
import json |
import time |
import argparse |
import os |
import re |
def check_comfyui_started(url): |
""" |
Checks if ComfyUI is running at the given URL. |
""" |
try: |
response = requests.get(url) |
return response.status_code == 200 |
except requests.ConnectionError: |
return False |
def load_and_update_prompt(file_path, width, height, length, batch_size, shift_amount, guidance_amount, steps_amount, denoise_amount, frame_rate, prompt): |
""" |
Loads the prompt payload from a JSON file and updates the specified fields. |
Args: |
file_path (str): Path to the prompt-payload.json file. |
width (int): Width to set in the prompt. |
height (int): Height to set in the prompt. |
length (int): Length (frames) to set in the prompt. |
batch_size (int): Batch size to set in the prompt. |
shift_amount (float): Shift amount to set in the prompt. |
guidance_amount (float): Guidance amount to set in the prompt. |
steps_amount (int): Steps amount to set in the prompt. |
denoise_amount (float): Denoise amount to set in the prompt. |
frame_rate (int): Frame rate to set in the prompt. |
prompt (str): Prompt text to set in the prompt. |
Returns: |
dict: Updated prompt payload as a dictionary. |
""" |
try: |
with open(file_path, 'r', encoding='utf-8') as f: |
payload = json.load(f) |
except Exception as e: |
print(f"Error loading JSON file: {e}") |
return None |
payload['prompt']['232']['inputs']['width'] = width |
payload['prompt']['232']['inputs']['height'] = height |
payload['prompt']['295']['inputs']['int'] = length |
payload['prompt']['232']['inputs']['batch_size'] = batch_size |
payload['prompt']['67']['inputs']['shift'] = shift_amount |
payload['prompt']['26']['inputs']['guidance'] = guidance_amount |
payload['prompt']['17']['inputs']['steps'] = steps_amount |
payload['prompt']['17']['inputs']['denoise'] = denoise_amount |
payload['prompt']['82']['inputs']['frame_rate'] = frame_rate |
payload['prompt']['44']['inputs']['text'] = prompt |
return payload |
def post_prompt(url, payload): |
""" |
Posts the prompt payload to the ComfyUI API. |
Args: |
url (str): ComfyUI API endpoint URL. |
payload (dict): Prompt payload as a dictionary. |
Returns: |
str: Prompt ID if successful, None otherwise. |
""" |
try: |
response = requests.post(url, json=payload) |
response.raise_for_status() |
return response.json().get('prompt_id') |
except requests.exceptions.RequestException as e: |
print(f"Error posting prompt: {e}") |
return None |
def get_queue_progress(url): |
""" |
Retrieves the queue progress from the ComfyUI API. |
Args: |
url (str): ComfyUI API queue endpoint URL. |
Returns: |
tuple: (queue_pending, queue_running) where queue_pending and queue_running are lists, |
or (None, None) if an error occurred. |
""" |
try: |
response = requests.get(url) |
response.raise_for_status() |
queue_data = response.json() |
queue_pending = queue_data.get('queue_pending', []) |
queue_running = queue_data.get('queue_running', []) |
return queue_pending, queue_running |
except requests.exceptions.RequestException as e: |
print(f"Error getting queue progress: {e}") |
return None, None |
def get_history(url, prompt_id): |
""" |
Retrieves the history from the ComfyUI API. |
Args: |
url (str): ComfyUI API history endpoint URL. |
prompt_id (str): Prompt ID to look for in the history. |
Returns: |
tuple: (status, video_path) where status is a string ("success" or "failure") and video_path is the path to the video, |
or (None, None) if an error occurred. |
""" |
try: |
response = requests.get(url) |
response.raise_for_status() |
history_data = response.json() |
if history_data: |
for history_item_id, history_item in history_data.items(): |
if history_item_id == prompt_id: |
status = history_item['status']['status_str'] |
completed = history_item['status']['completed'] |
if status == "success" and completed: |
try: |
for output in history_item['outputs'].values(): |
for gif in output.get('gifs', []): |
video_path = gif['fullpath'] |
return "success", video_path |
except (KeyError, TypeError): |
print("Video path not found in history data.") |
return "failure", None |
else: |
print("Video generation failed according to history data.") |
return "failure", None |
print("Prompt ID not found in history data.") |
return "failure", None |
else: |
print("No history data found.") |
return "failure", None |
except requests.exceptions.RequestException as e: |
print(f"Error getting history: {e}") |
return None, None |
def extract_video_number(filename): |
""" |
Extracts the video number from the filename using a regular expression. |
""" |
match = re.search(r'_(\d+)\.mp4', filename) |
if match: |
return match.group(1) |
return None |
def main(): |
parser = argparse.ArgumentParser(description="Run ComfyUI with specified parameters.") |
parser.add_argument("--width", type=int, default=320, help="Width of the generated image.") |
parser.add_argument("--height", type=int, default=480, help="Height of the generated image.") |
parser.add_argument("--length", type=int, default=45, help="Length (number of frames) of the video.") |
parser.add_argument("--batchSize", type=int, default=1, help="Batch size for video generation.") |
parser.add_argument("--shiftAmount", type=float, default=9.0, help="Shift amount for ModelSamplingSD3.") |
parser.add_argument("--guidanceAmount", type=float, default=7.5, help="Guidance amount for FluxGuidance.") |
parser.add_argument("--stepsAmount", type=int, default=12, help="Steps amount for BasicScheduler.") |
parser.add_argument("--denoiseAmount", type=float, default=1.0, help="Denoise amount for BasicScheduler.") |
parser.add_argument("--frameRate", type=int, default=24, help="Frame rate for video combine.") |
parser.add_argument("--prompt", type=str, required=True, help="Text prompt for video generation.") |
parser.add_argument("--upscale", type=bool, default=False, help="Whether to use upscale or not.") |
parser.add_argument("--comfyui_url", type=str, default="http://localhost:8188", help="ComfyUI base URL.") |
args = parser.parse_args() |
API_URL = f"{args.comfyui_url}/api/prompt" |
QUEUE_URL = f"{args.comfyui_url}/api/queue" |
HISTORY_URL = f"{args.comfyui_url}/api/history?max_items=1" |
if args.upscale: |
prompt_file = "prompt-payload.json" |
filename_prefix = "Hunyuan_upscaled" |
else: |
prompt_file = "prompt-payload-no-upscale.json" |
filename_prefix = "Hunyuan_raw" |
print("Checking if ComfyUI is running...") |
while not check_comfyui_started(args.comfyui_url): |
print("ComfyUI not yet running. Waiting...") |
time.sleep(5) |
print("ComfyUI is running.") |
print("Loading and updating prompt payload...") |
prompt_file_path = os.path.join(os.path.dirname(__file__), prompt_file) |
try: |
prompt_payload = load_and_update_prompt(prompt_file_path, args.width, args.height, args.length, args.batchSize, args.shiftAmount, args.guidanceAmount, args.stepsAmount, args.denoiseAmount, args.frameRate, args.prompt) |
except Exception as e: |
print(f"Error loading and updating prompt: {e}") |
return |
print("Posting prompt to ComfyUI...") |
prompt_id = post_prompt(API_URL, prompt_payload) |
if prompt_id: |
print(f"Prompt submitted successfully with ID: {prompt_id}") |
print("Monitoring queue...") |
while True: |
queue_pending, queue_running = get_queue_progress(QUEUE_URL) |
if queue_pending is None or queue_running is None: |
print("Failed to get queue progress. Aborting.") |
break |
if not queue_pending and not queue_running: |
print("Queue is empty.") |
break |
else: |
print(f"Queue pending: {len(queue_pending)}, running: {len(queue_running)}. Waiting...") |
time.sleep(5) |
print("Getting history...") |
status, video_path = get_history(HISTORY_URL, prompt_id) |
if status == "success": |
print(f"Video generation complete! Video path: {video_path}") |
video_number = extract_video_number(video_path) |
if video_number: |
output_filename = f"{filename_prefix}_{video_number}.mp4" |
else: |
output_filename = f"{filename_prefix}_latest.mp4" |
print(output_filename) |
else: |
print("Video generation failed.") |
else: |
print("Failed to submit prompt.") |
if __name__ == "__main__": |
main() |