|
import requests |
|
import json |
|
import time |
|
import argparse |
|
import os |
|
import re |
|
|
|
def check_comfyui_started(url): |
|
""" |
|
Checks if ComfyUI is running at the given URL. |
|
""" |
|
try: |
|
response = requests.get(url) |
|
return response.status_code == 200 |
|
except requests.ConnectionError: |
|
return False |
|
|
|
def load_and_update_prompt(file_path, width, height, length, batch_size, shift_amount, guidance_amount, steps_amount, denoise_amount, frame_rate, prompt): |
|
""" |
|
Loads the prompt payload from a JSON file and updates the specified fields. |
|
|
|
Args: |
|
file_path (str): Path to the prompt-payload.json file. |
|
width (int): Width to set in the prompt. |
|
height (int): Height to set in the prompt. |
|
length (int): Length (frames) to set in the prompt. |
|
batch_size (int): Batch size to set in the prompt. |
|
shift_amount (float): Shift amount to set in the prompt. |
|
guidance_amount (float): Guidance amount to set in the prompt. |
|
steps_amount (int): Steps amount to set in the prompt. |
|
denoise_amount (float): Denoise amount to set in the prompt. |
|
frame_rate (int): Frame rate to set in the prompt. |
|
prompt (str): Prompt text to set in the prompt. |
|
|
|
Returns: |
|
dict: Updated prompt payload as a dictionary. |
|
""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
payload = json.load(f) |
|
except Exception as e: |
|
print(f"Error loading JSON file: {e}") |
|
return None |
|
|
|
|
|
payload['prompt']['232']['inputs']['width'] = width |
|
payload['prompt']['232']['inputs']['height'] = height |
|
|
|
|
|
payload['prompt']['295']['inputs']['int'] = length |
|
|
|
|
|
payload['prompt']['232']['inputs']['batch_size'] = batch_size |
|
|
|
|
|
payload['prompt']['67']['inputs']['shift'] = shift_amount |
|
|
|
|
|
payload['prompt']['26']['inputs']['guidance'] = guidance_amount |
|
|
|
|
|
payload['prompt']['17']['inputs']['steps'] = steps_amount |
|
|
|
|
|
payload['prompt']['17']['inputs']['denoise'] = denoise_amount |
|
|
|
|
|
payload['prompt']['82']['inputs']['frame_rate'] = frame_rate |
|
|
|
|
|
payload['prompt']['44']['inputs']['text'] = prompt |
|
|
|
return payload |
|
|
|
def post_prompt(url, payload): |
|
""" |
|
Posts the prompt payload to the ComfyUI API. |
|
|
|
Args: |
|
url (str): ComfyUI API endpoint URL. |
|
payload (dict): Prompt payload as a dictionary. |
|
|
|
Returns: |
|
str: Prompt ID if successful, None otherwise. |
|
""" |
|
try: |
|
response = requests.post(url, json=payload) |
|
response.raise_for_status() |
|
return response.json().get('prompt_id') |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error posting prompt: {e}") |
|
return None |
|
|
|
def get_queue_progress(url): |
|
""" |
|
Retrieves the queue progress from the ComfyUI API. |
|
|
|
Args: |
|
url (str): ComfyUI API queue endpoint URL. |
|
|
|
Returns: |
|
tuple: (queue_pending, queue_running) where queue_pending and queue_running are lists, |
|
or (None, None) if an error occurred. |
|
""" |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
queue_data = response.json() |
|
queue_pending = queue_data.get('queue_pending', []) |
|
queue_running = queue_data.get('queue_running', []) |
|
return queue_pending, queue_running |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error getting queue progress: {e}") |
|
return None, None |
|
|
|
def get_history(url, prompt_id): |
|
""" |
|
Retrieves the history from the ComfyUI API. |
|
|
|
Args: |
|
url (str): ComfyUI API history endpoint URL. |
|
prompt_id (str): Prompt ID to look for in the history. |
|
|
|
Returns: |
|
tuple: (status, video_path) where status is a string ("success" or "failure") and video_path is the path to the video, |
|
or (None, None) if an error occurred. |
|
""" |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
history_data = response.json() |
|
|
|
if history_data: |
|
|
|
for history_item_id, history_item in history_data.items(): |
|
if history_item_id == prompt_id: |
|
status = history_item['status']['status_str'] |
|
completed = history_item['status']['completed'] |
|
|
|
if status == "success" and completed: |
|
try: |
|
for output in history_item['outputs'].values(): |
|
for gif in output.get('gifs', []): |
|
video_path = gif['fullpath'] |
|
return "success", video_path |
|
except (KeyError, TypeError): |
|
print("Video path not found in history data.") |
|
return "failure", None |
|
else: |
|
print("Video generation failed according to history data.") |
|
return "failure", None |
|
print("Prompt ID not found in history data.") |
|
return "failure", None |
|
else: |
|
print("No history data found.") |
|
return "failure", None |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error getting history: {e}") |
|
return None, None |
|
|
|
def extract_video_number(filename): |
|
""" |
|
Extracts the video number from the filename using a regular expression. |
|
""" |
|
match = re.search(r'_(\d+)\.mp4', filename) |
|
if match: |
|
return match.group(1) |
|
return None |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Run ComfyUI with specified parameters.") |
|
parser.add_argument("--width", type=int, default=320, help="Width of the generated image.") |
|
parser.add_argument("--height", type=int, default=480, help="Height of the generated image.") |
|
parser.add_argument("--length", type=int, default=45, help="Length (number of frames) of the video.") |
|
parser.add_argument("--batchSize", type=int, default=1, help="Batch size for video generation.") |
|
parser.add_argument("--shiftAmount", type=float, default=9.0, help="Shift amount for ModelSamplingSD3.") |
|
parser.add_argument("--guidanceAmount", type=float, default=7.5, help="Guidance amount for FluxGuidance.") |
|
parser.add_argument("--stepsAmount", type=int, default=12, help="Steps amount for BasicScheduler.") |
|
parser.add_argument("--denoiseAmount", type=float, default=1.0, help="Denoise amount for BasicScheduler.") |
|
parser.add_argument("--frameRate", type=int, default=24, help="Frame rate for video combine.") |
|
parser.add_argument("--prompt", type=str, required=True, help="Text prompt for video generation.") |
|
parser.add_argument("--upscale", type=bool, default=False, help="Whether to use upscale or not.") |
|
parser.add_argument("--comfyui_url", type=str, default="http://localhost:8188", help="ComfyUI base URL.") |
|
|
|
args = parser.parse_args() |
|
|
|
API_URL = f"{args.comfyui_url}/api/prompt" |
|
QUEUE_URL = f"{args.comfyui_url}/api/queue" |
|
HISTORY_URL = f"{args.comfyui_url}/api/history?max_items=1" |
|
|
|
if args.upscale: |
|
prompt_file = "prompt-payload.json" |
|
filename_prefix = "Hunyuan_upscaled" |
|
else: |
|
prompt_file = "prompt-payload-no-upscale.json" |
|
filename_prefix = "Hunyuan_raw" |
|
|
|
|
|
print("Checking if ComfyUI is running...") |
|
while not check_comfyui_started(args.comfyui_url): |
|
print("ComfyUI not yet running. Waiting...") |
|
time.sleep(5) |
|
print("ComfyUI is running.") |
|
|
|
|
|
print("Loading and updating prompt payload...") |
|
prompt_file_path = os.path.join(os.path.dirname(__file__), prompt_file) |
|
try: |
|
prompt_payload = load_and_update_prompt(prompt_file_path, args.width, args.height, args.length, args.batchSize, args.shiftAmount, args.guidanceAmount, args.stepsAmount, args.denoiseAmount, args.frameRate, args.prompt) |
|
except Exception as e: |
|
print(f"Error loading and updating prompt: {e}") |
|
return |
|
|
|
|
|
print("Posting prompt to ComfyUI...") |
|
prompt_id = post_prompt(API_URL, prompt_payload) |
|
|
|
if prompt_id: |
|
print(f"Prompt submitted successfully with ID: {prompt_id}") |
|
|
|
|
|
print("Monitoring queue...") |
|
while True: |
|
queue_pending, queue_running = get_queue_progress(QUEUE_URL) |
|
if queue_pending is None or queue_running is None: |
|
print("Failed to get queue progress. Aborting.") |
|
break |
|
|
|
if not queue_pending and not queue_running: |
|
print("Queue is empty.") |
|
break |
|
else: |
|
print(f"Queue pending: {len(queue_pending)}, running: {len(queue_running)}. Waiting...") |
|
time.sleep(5) |
|
|
|
|
|
print("Getting history...") |
|
status, video_path = get_history(HISTORY_URL, prompt_id) |
|
|
|
if status == "success": |
|
print(f"Video generation complete! Video path: {video_path}") |
|
video_number = extract_video_number(video_path) |
|
if video_number: |
|
output_filename = f"{filename_prefix}_{video_number}.mp4" |
|
else: |
|
output_filename = f"{filename_prefix}_latest.mp4" |
|
print(output_filename) |
|
else: |
|
print("Video generation failed.") |
|
else: |
|
print("Failed to submit prompt.") |
|
|
|
if __name__ == "__main__": |
|
main() |