Spaces:
Running
on
Zero
Running
on
Zero
# utils/ai_generator_diffusers_flux.py | |
import os | |
import utils.constants as constants | |
#import spaces | |
import gradio as gr | |
from torch import __version__ as torch_version_, version, cuda, bfloat16, float32, Generator, no_grad, backends | |
from diffusers import FluxPipeline,FluxImg2ImgPipeline,FluxControlPipeline | |
import accelerate | |
from transformers import AutoTokenizer | |
import safetensors | |
#import xformers | |
#from diffusers.utils import load_image | |
#from huggingface_hub import hf_hub_download | |
from PIL import Image | |
from tempfile import NamedTemporaryFile | |
from utils.image_utils import ( | |
crop_and_resize_image, | |
) | |
from utils.version_info import ( | |
get_torch_info, | |
# get_diffusers_version, | |
# get_transformers_version, | |
# get_xformers_version, | |
initialize_cuda, | |
release_torch_resources | |
) | |
import gc | |
from utils.lora_details import get_trigger_words, approximate_token_count, split_prompt_precisely | |
#from utils.color_utils import detect_color_format | |
#import utils.misc as misc | |
#from pathlib import Path | |
import warnings | |
warnings.filterwarnings("ignore", message=".*Torch was not compiled with flash attention.*") | |
#print(torch_version_) # Ensure it's 2.0 or newer | |
#print(cuda.is_available()) # Ensure CUDA is available | |
PIPELINE_CLASSES = { | |
"FluxPipeline": FluxPipeline, | |
"FluxImg2ImgPipeline": FluxImg2ImgPipeline, | |
"FluxControlPipeline": FluxControlPipeline | |
} | |
#@spaces.GPU() | |
def generate_image_from_text( | |
text, | |
model_name="black-forest-labs/FLUX.1-dev", | |
lora_weights=None, | |
conditioned_image=None, | |
image_width=1344, | |
image_height=848, | |
guidance_scale=3.5, | |
num_inference_steps=50, | |
seed=0, | |
additional_parameters=None, | |
progress=gr.Progress(track_tqdm=True) | |
): | |
from src.condition import Condition | |
device = "cuda" if cuda.is_available() else "cpu" | |
print(f"device:{device}\nmodel_name:{model_name}\n") | |
# Initialize the pipeline | |
pipe = FluxPipeline.from_pretrained( | |
model_name, | |
torch_dtype=bfloat16 if device == "cuda" else float32 | |
).to(device) | |
pipe.enable_model_cpu_offload() | |
# Access the tokenizer from the pipeline | |
tokenizer = pipe.tokenizer | |
# Handle add_prefix_space attribute | |
if getattr(tokenizer, 'add_prefix_space', False): | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
# Update the pipeline's tokenizer | |
pipe.tokenizer = tokenizer | |
# Load and apply LoRA weights | |
if lora_weights: | |
for lora_weight in lora_weights: | |
lora_configs = constants.LORA_DETAILS.get(lora_weight, []) | |
if lora_configs: | |
for config in lora_configs: | |
weight_name = config.get("weight_name") | |
adapter_name = config.get("adapter_name") | |
pipe.load_lora_weights( | |
lora_weight, | |
weight_name=weight_name, | |
adapter_name=adapter_name, | |
use_auth_token=constants.HF_API_TOKEN | |
) | |
else: | |
pipe.load_lora_weights(lora_weight, use_auth_token=constants.HF_API_TOKEN) | |
# Set the random seed for reproducibility | |
generator = Generator(device=device).manual_seed(seed) | |
conditions = [] | |
# Handle conditioned image if provided | |
if conditioned_image is not None: | |
conditioned_image = crop_and_resize_image(conditioned_image, 1024, 1024) | |
condition = Condition("subject", conditioned_image) | |
conditions.append(condition) | |
# Prepare parameters for image generation | |
generate_params = { | |
"prompt": text, | |
"height": image_height, | |
"width": image_width, | |
"guidance_scale": guidance_scale, | |
"num_inference_steps": num_inference_steps, | |
"generator": generator, | |
"conditions": conditions if conditions else None | |
} | |
if additional_parameters: | |
generate_params.update(additional_parameters) | |
generate_params = {k: v for k, v in generate_params.items() if v is not None} | |
# Generate the image | |
result = pipe(**generate_params) | |
image = result.images[0] | |
pipe.unload_lora_weights() | |
# Clean up | |
del result | |
del conditions | |
del generator | |
del pipe | |
cuda.empty_cache() | |
cuda.ipc_collect() | |
return image | |
#@spaces.GPU(progress=gr.Progress(track_tqdm=True)) | |
def generate_image_lowmem( | |
text, | |
neg_prompt=None, | |
model_name="black-forest-labs/FLUX.1-dev", | |
lora_weights=None, | |
conditioned_image=None, | |
image_width=1368, | |
image_height=848, | |
guidance_scale=3.5, | |
num_inference_steps=30, | |
seed=0, | |
true_cfg_scale=1.0, | |
pipeline_name="FluxPipeline", | |
strength=0.75, | |
additional_parameters=None, | |
progress=gr.Progress(track_tqdm=True) | |
): | |
# Retrieve the pipeline class from the mapping | |
pipeline_class = PIPELINE_CLASSES.get(pipeline_name) | |
if not pipeline_class: | |
raise ValueError(f"Unsupported pipeline type '{pipeline_name}'. " | |
f"Available options: {list(PIPELINE_CLASSES.keys())}") | |
initialize_cuda() | |
device = "cuda" if cuda.is_available() else "cpu" | |
from src.condition import Condition | |
print(f"device:{device}\nmodel_name:{model_name}\nlora_weights:{lora_weights}\n") | |
print(f"\n {get_torch_info()}\n") | |
# Disable gradient calculations | |
with no_grad(): | |
# Initialize the pipeline inside the context manager | |
pipe = pipeline_class.from_pretrained( | |
model_name, | |
torch_dtype=bfloat16 if device == "cuda" else float32 | |
).to(device) | |
# Optionally, don't use CPU offload if not necessary | |
# alternative version that may be more efficient | |
# pipe.enable_sequential_cpu_offload() | |
if pipeline_name == "FluxPipeline": | |
pipe.enable_model_cpu_offload() | |
pipe.vae.enable_slicing() | |
pipe.vae.enable_tiling() | |
else: | |
pipe.enable_model_cpu_offload() | |
# Access the tokenizer from the pipeline | |
tokenizer = pipe.tokenizer | |
# Check if add_prefix_space is set and convert to slow tokenizer if necessary | |
if getattr(tokenizer, 'add_prefix_space', False): | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False, device_map = 'cpu') | |
# Update the pipeline's tokenizer | |
pipe.tokenizer = tokenizer | |
pipe.to(device) | |
flash_attention_enabled = backends.cuda.flash_sdp_enabled() | |
if flash_attention_enabled == False: | |
#Enable xFormers memory-efficient attention (optional) | |
#pipe.enable_xformers_memory_efficient_attention() | |
print("\nEnabled xFormers memory-efficient attention.\n") | |
else: | |
pipe.attn_implementation="flash_attention_2" | |
print("\nEnabled flash_attention_2.\n") | |
condition_type = "subject" | |
# Load LoRA weights | |
# note: does not yet handle multiple LoRA weights with different names, needs .set_adapters(["depth", "hyper-sd"], adapter_weights=[0.85, 0.125]) | |
if lora_weights: | |
for lora_weight in lora_weights: | |
lora_configs = constants.LORA_DETAILS.get(lora_weight, []) | |
lora_weight_set = False | |
if lora_configs: | |
for config in lora_configs: | |
# Load LoRA weights with optional weight_name and adapter_name | |
if 'weight_name' in config: | |
weight_name = config.get("weight_name") | |
adapter_name = config.get("adapter_name") | |
lora_collection = config.get("lora_collection") | |
if weight_name and adapter_name and lora_collection and lora_weight_set == False: | |
pipe.load_lora_weights( | |
lora_collection, | |
weight_name=weight_name, | |
adapter_name=adapter_name, | |
token=constants.HF_API_TOKEN | |
) | |
lora_weight_set = True | |
print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}, lora_collection={lora_collection}\n") | |
elif weight_name and adapter_name==None and lora_collection and lora_weight_set == False: | |
pipe.load_lora_weights( | |
lora_collection, | |
weight_name=weight_name, | |
token=constants.HF_API_TOKEN | |
) | |
lora_weight_set = True | |
print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}, lora_collection={lora_collection}\n") | |
elif weight_name and adapter_name and lora_weight_set == False: | |
pipe.load_lora_weights( | |
lora_weight, | |
weight_name=weight_name, | |
adapter_name=adapter_name, | |
token=constants.HF_API_TOKEN | |
) | |
lora_weight_set = True | |
print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}\n") | |
elif weight_name and adapter_name==None and lora_weight_set == False: | |
pipe.load_lora_weights( | |
lora_weight, | |
weight_name=weight_name, | |
token=constants.HF_API_TOKEN | |
) | |
lora_weight_set = True | |
print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}\n") | |
elif lora_weight_set == False: | |
pipe.load_lora_weights( | |
lora_weight, | |
token=constants.HF_API_TOKEN | |
) | |
lora_weight_set = True | |
print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}\n") | |
# Apply 'pipe' configurations if present | |
if 'pipe' in config: | |
pipe_config = config['pipe'] | |
for method_name, params in pipe_config.items(): | |
method = getattr(pipe, method_name, None) | |
if method: | |
print(f"Applying pipe method: {method_name} with params: {params}") | |
method(**params) | |
else: | |
print(f"Method {method_name} not found in pipe.") | |
if 'condition_type' in config: | |
condition_type = config['condition_type'] | |
if condition_type == "coloring": | |
#pipe.enable_coloring() | |
print("\nEnabled coloring.\n") | |
elif condition_type == "deblurring": | |
#pipe.enable_deblurring() | |
print("\nEnabled deblurring.\n") | |
elif condition_type == "fill": | |
#pipe.enable_fill() | |
print("\nEnabled fill.\n") | |
elif condition_type == "depth": | |
#pipe.enable_depth() | |
print("\nEnabled depth.\n") | |
elif condition_type == "canny": | |
#pipe.enable_canny() | |
print("\nEnabled canny.\n") | |
elif condition_type == "subject": | |
#pipe.enable_subject() | |
print("\nEnabled subject.\n") | |
else: | |
print(f"Condition type {condition_type} not implemented.") | |
else: | |
pipe.load_lora_weights(lora_weight, use_auth_token=constants.HF_API_TOKEN) | |
# Set the random seed for reproducibility | |
generator = Generator(device=device).manual_seed(seed) | |
conditions = [] | |
if conditioned_image is not None: | |
conditioned_image = crop_and_resize_image(conditioned_image, image_width, image_height) | |
condition = Condition(condition_type, conditioned_image) | |
conditions.append(condition) | |
print(f"\nAdded conditioned image.\n {conditioned_image.size}") | |
# Prepare the parameters for image generation | |
additional_parameters ={ | |
"strength": strength, | |
"image": conditioned_image, | |
} | |
else: | |
print("\nNo conditioned image provided.") | |
if neg_prompt!=None: | |
true_cfg_scale=1.1 | |
additional_parameters ={ | |
"negative_prompt": neg_prompt, | |
"true_cfg_scale": true_cfg_scale, | |
} | |
# handle long prompts by splitting them | |
if approximate_token_count(text) > 76: | |
prompt, prompt2 = split_prompt_precisely(text) | |
prompt_parameters = { | |
"prompt" : prompt, | |
"prompt_2": prompt2 | |
} | |
else: | |
prompt_parameters = { | |
"prompt" :text | |
} | |
additional_parameters.update(prompt_parameters) | |
# Combine all parameters | |
generate_params = { | |
"height": image_height, | |
"width": image_width, | |
"guidance_scale": guidance_scale, | |
"num_inference_steps": num_inference_steps, | |
"generator": generator, } | |
if additional_parameters: | |
generate_params.update(additional_parameters) | |
generate_params = {k: v for k, v in generate_params.items() if v is not None} | |
print(f"generate_params: {generate_params}") | |
# Generate the image | |
result = pipe(**generate_params) | |
image = result.images[0] | |
# Clean up | |
del result | |
del conditions | |
del generator | |
# Delete the pipeline and clear cache | |
del pipe | |
cuda.empty_cache() | |
cuda.ipc_collect() | |
print(cuda.memory_summary(device=None, abbreviated=False)) | |
return image | |
def generate_ai_image_local ( | |
map_option, | |
prompt_textbox_value, | |
neg_prompt_textbox_value, | |
model="black-forest-labs/FLUX.1-dev", | |
lora_weights=None, | |
conditioned_image=None, | |
height=512, | |
width=912, | |
num_inference_steps=30, | |
guidance_scale=3.5, | |
seed=777, | |
pipeline_name="FluxPipeline", | |
strength=0.75, | |
progress=gr.Progress(track_tqdm=True) | |
): | |
release_torch_resources() | |
print(f"Generating image with lowmem") | |
try: | |
if map_option != "Prompt": | |
prompt = constants.PROMPTS[map_option] | |
negative_prompt = constants.NEGATIVE_PROMPTS.get(map_option, "") | |
else: | |
prompt = prompt_textbox_value | |
negative_prompt = neg_prompt_textbox_value or "" | |
#full_prompt = f"{prompt} {negative_prompt}" | |
additional_parameters = {} | |
if lora_weights: | |
for lora_weight in lora_weights: | |
lora_configs = constants.LORA_DETAILS.get(lora_weight, []) | |
for config in lora_configs: | |
if 'parameters' in config: | |
additional_parameters.update(config['parameters']) | |
elif 'trigger_words' in config: | |
trigger_words = get_trigger_words(lora_weight) | |
prompt = f"{trigger_words} {prompt}" | |
for key, value in additional_parameters.items(): | |
if key in ['height', 'width', 'num_inference_steps', 'max_sequence_length']: | |
additional_parameters[key] = int(value) | |
elif key in ['guidance_scale','true_cfg_scale']: | |
additional_parameters[key] = float(value) | |
height = additional_parameters.pop('height', height) | |
width = additional_parameters.pop('width', width) | |
num_inference_steps = additional_parameters.pop('num_inference_steps', num_inference_steps) | |
guidance_scale = additional_parameters.pop('guidance_scale', guidance_scale) | |
print("Generating image with the following parameters:") | |
print(f"Model: {model}") | |
print(f"LoRA Weights: {lora_weights}") | |
print(f"Prompt: {prompt}") | |
print(f"Neg Prompt: {negative_prompt}") | |
print(f"Height: {height}") | |
print(f"Width: {width}") | |
print(f"Number of Inference Steps: {num_inference_steps}") | |
print(f"Guidance Scale: {guidance_scale}") | |
print(f"Seed: {seed}") | |
print(f"Additional Parameters: {additional_parameters}") | |
print(f"Conditioned Image: {conditioned_image}") | |
print(f"Conditioned Image Strength: {strength}") | |
print(f"pipeline: {pipeline_name}") | |
image = generate_image_lowmem( | |
text=prompt, | |
model_name=model, | |
neg_prompt=negative_prompt, | |
lora_weights=lora_weights, | |
conditioned_image=conditioned_image, | |
image_width=width, | |
image_height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
seed=seed, | |
pipeline_name=pipeline_name, | |
strength=strength, | |
additional_parameters=additional_parameters | |
) | |
with NamedTemporaryFile(delete=False, suffix=".png") as tmp: | |
image.save(tmp.name, format="PNG") | |
constants.temp_files.append(tmp.name) | |
print(f"Image saved to {tmp.name}") | |
gc.collect() | |
return tmp.name | |
except Exception as e: | |
print(f"Error generating AI image: {e}") | |
gc.collect() | |
return None | |
# does not work | |
def merge_LoRA_weights(model="black-forest-labs/FLUX.1-dev", | |
lora_weights="Borcherding/FLUX.1-dev-LoRA-FractalLand-v0.1"): | |
model_suffix = model.split("/")[-1] | |
if model_suffix not in lora_weights: | |
raise ValueError(f"The model suffix '{model_suffix}' must be in the lora_weights string '{lora_weights}' to proceed.") | |
pipe = FluxPipeline.from_pretrained(model, torch_dtype=bfloat16) | |
pipe.load_lora_weights(lora_weights) | |
pipe.save_lora_weights(os.getenv("TMPDIR")) | |
lora_name = lora_weights.split("/")[-1] + "-merged" | |
pipe.save_pretrained(lora_name) | |
pipe.unload_lora_weights() | |