FastStableDifussion / src /backend /lcm_text_to_image.py
YoBatM's picture
Upload folder using huggingface_hub
99b955f verified
import gc
from math import ceil
from typing import Any, List
import random
import numpy as np
import torch
import logging
from backend.device import is_openvino_device
from backend.lora import load_lora_weight
from backend.controlnet import (
load_controlnet_adapters,
update_controlnet_arguments,
)
from backend.models.lcmdiffusion_setting import (
DiffusionTask,
LCMDiffusionSetting,
LCMLora,
)
from backend.openvino.pipelines import (
get_ov_image_to_image_pipeline,
get_ov_text_to_image_pipeline,
ov_load_taesd,
)
from backend.pipelines.lcm import (
get_image_to_image_pipeline,
get_lcm_model_pipeline,
load_taesd,
)
from backend.pipelines.lcm_lora import get_lcm_lora_pipeline
from constants import DEVICE, GGUF_THREADS
from diffusers import LCMScheduler
from image_ops import resize_pil_image
from backend.openvino.flux_pipeline import get_flux_pipeline
from backend.openvino.ov_hc_stablediffusion_pipeline import OvHcLatentConsistency
from backend.gguf.gguf_diffusion import (
GGUFDiffusion,
ModelConfig,
Txt2ImgConfig,
SampleMethod,
)
from paths import get_app_path
from pprint import pprint
try:
# support for token merging; keeping it optional for now
import tomesd
except ImportError:
print("tomesd library unavailable; disabling token merging support")
tomesd = None
class LCMTextToImage:
def __init__(
self,
device: str = "cpu",
) -> None:
self.pipeline = None
self.use_openvino = False
self.device = ""
self.previous_model_id = None
self.previous_use_tae_sd = False
self.previous_use_lcm_lora = False
self.previous_ov_model_id = ""
self.previous_token_merging = 0.0
self.previous_safety_checker = False
self.previous_use_openvino = False
self.img_to_img_pipeline = None
self.is_openvino_init = False
self.previous_lora = None
self.task_type = DiffusionTask.text_to_image
self.previous_use_gguf_model = False
self.previous_gguf_model = None
self.torch_data_type = (
torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16
)
self.ov_model_id = None
print(f"Torch datatype : {self.torch_data_type}")
def _pipeline_to_device(self):
print(f"Pipeline device : {DEVICE}")
print(f"Pipeline dtype : {self.torch_data_type}")
self.pipeline.to(
torch_device=DEVICE,
torch_dtype=self.torch_data_type,
)
def _add_freeu(self):
pipeline_class = self.pipeline.__class__.__name__
if isinstance(self.pipeline.scheduler, LCMScheduler):
if pipeline_class == "StableDiffusionPipeline":
print("Add FreeU - SD")
self.pipeline.enable_freeu(
s1=0.9,
s2=0.2,
b1=1.2,
b2=1.4,
)
elif pipeline_class == "StableDiffusionXLPipeline":
print("Add FreeU - SDXL")
self.pipeline.enable_freeu(
s1=0.6,
s2=0.4,
b1=1.1,
b2=1.2,
)
def _enable_vae_tiling(self):
self.pipeline.vae.enable_tiling()
def _update_lcm_scheduler_params(self):
if isinstance(self.pipeline.scheduler, LCMScheduler):
self.pipeline.scheduler = LCMScheduler.from_config(
self.pipeline.scheduler.config,
beta_start=0.001,
beta_end=0.01,
)
def _is_hetero_pipeline(self) -> bool:
return "square" in self.ov_model_id.lower()
def _load_ov_hetero_pipeline(self):
print("Loading Heterogeneous Compute pipeline")
if DEVICE.upper()=="NPU":
device = ["NPU", "NPU", "NPU"]
self.pipeline = OvHcLatentConsistency(self.ov_model_id,device)
else:
self.pipeline = OvHcLatentConsistency(self.ov_model_id)
def _generate_images_hetero_compute(
self,
lcm_diffusion_setting: LCMDiffusionSetting,
):
print("Using OpenVINO ")
if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value:
return [
self.pipeline.generate(
prompt=lcm_diffusion_setting.prompt,
neg_prompt=lcm_diffusion_setting.negative_prompt,
init_image=None,
strength=1.0,
num_inference_steps=lcm_diffusion_setting.inference_steps,
)
]
else:
return [
self.pipeline.generate(
prompt=lcm_diffusion_setting.prompt,
neg_prompt=lcm_diffusion_setting.negative_prompt,
init_image=lcm_diffusion_setting.init_image,
strength=lcm_diffusion_setting.strength,
num_inference_steps=lcm_diffusion_setting.inference_steps,
)
]
def _is_valid_mode(
self,
modes: List,
) -> bool:
return modes.count(True) == 1 or modes.count(False) == 3
def _validate_mode(
self,
modes: List,
) -> None:
if not self._is_valid_mode(modes):
raise ValueError("Invalid mode,delete configs/settings.yaml and retry!")
def init(
self,
device: str = "cpu",
lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(),
) -> None:
# Mode validation either LCM LoRA or OpenVINO or GGUF
modes = [
lcm_diffusion_setting.use_gguf_model,
lcm_diffusion_setting.use_openvino,
lcm_diffusion_setting.use_lcm_lora,
]
self._validate_mode(modes)
self.device = device
self.use_openvino = lcm_diffusion_setting.use_openvino
model_id = lcm_diffusion_setting.lcm_model_id
use_local_model = lcm_diffusion_setting.use_offline_model
use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder
use_lora = lcm_diffusion_setting.use_lcm_lora
lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora
token_merging = lcm_diffusion_setting.token_merging
self.ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id
if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value:
lcm_diffusion_setting.init_image = resize_pil_image(
lcm_diffusion_setting.init_image,
lcm_diffusion_setting.image_width,
lcm_diffusion_setting.image_height,
)
if (
self.pipeline is None
or self.previous_model_id != model_id
or self.previous_use_tae_sd != use_tiny_auto_encoder
or self.previous_lcm_lora_base_id != lcm_lora.base_model_id
or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id
or self.previous_use_lcm_lora != use_lora
or self.previous_ov_model_id != self.ov_model_id
or self.previous_token_merging != token_merging
or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker
or self.previous_use_openvino != lcm_diffusion_setting.use_openvino
or self.previous_use_gguf_model != lcm_diffusion_setting.use_gguf_model
or self.previous_gguf_model != lcm_diffusion_setting.gguf_model
or (
self.use_openvino
and (
self.previous_task_type != lcm_diffusion_setting.diffusion_task
or self.previous_lora != lcm_diffusion_setting.lora
)
)
or lcm_diffusion_setting.rebuild_pipeline
):
if self.use_openvino and is_openvino_device():
if self.pipeline:
del self.pipeline
self.pipeline = None
gc.collect()
self.is_openvino_init = True
if (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.text_to_image.value
):
print(
f"***** Init Text to image (OpenVINO) - {self.ov_model_id} *****"
)
if "flux" in self.ov_model_id.lower():
print("Loading OpenVINO Flux pipeline")
self.pipeline = get_flux_pipeline(
self.ov_model_id,
lcm_diffusion_setting.use_tiny_auto_encoder,
)
elif self._is_hetero_pipeline():
self._load_ov_hetero_pipeline()
else:
self.pipeline = get_ov_text_to_image_pipeline(
self.ov_model_id,
use_local_model,
)
elif (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.image_to_image.value
):
if not self.pipeline and self._is_hetero_pipeline():
self._load_ov_hetero_pipeline()
else:
print(
f"***** Image to image (OpenVINO) - {self.ov_model_id} *****"
)
self.pipeline = get_ov_image_to_image_pipeline(
self.ov_model_id,
use_local_model,
)
elif lcm_diffusion_setting.use_gguf_model:
model = lcm_diffusion_setting.gguf_model.diffusion_path
print(f"***** Init Text to image (GGUF) - {model} *****")
# if self.pipeline:
# self.pipeline.terminate()
# del self.pipeline
# self.pipeline = None
self._init_gguf_diffusion(lcm_diffusion_setting)
else:
if self.pipeline or self.img_to_img_pipeline:
self.pipeline = None
self.img_to_img_pipeline = None
gc.collect()
controlnet_args = load_controlnet_adapters(lcm_diffusion_setting)
if use_lora:
print(
f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****"
)
self.pipeline = get_lcm_lora_pipeline(
lcm_lora.base_model_id,
lcm_lora.lcm_lora_id,
use_local_model,
torch_data_type=self.torch_data_type,
pipeline_args=controlnet_args,
)
else:
print(f"***** Init LCM Model pipeline - {model_id} *****")
self.pipeline = get_lcm_model_pipeline(
model_id,
use_local_model,
controlnet_args,
)
self.img_to_img_pipeline = get_image_to_image_pipeline(self.pipeline)
if tomesd and token_merging > 0.001:
print(f"***** Token Merging: {token_merging} *****")
tomesd.apply_patch(self.pipeline, ratio=token_merging)
tomesd.apply_patch(self.img_to_img_pipeline, ratio=token_merging)
if use_tiny_auto_encoder:
if self.use_openvino and is_openvino_device():
if self.pipeline.__class__.__name__ != "OVFluxPipeline":
print("Using Tiny Auto Encoder (OpenVINO)")
ov_load_taesd(
self.pipeline,
use_local_model,
)
else:
print("Using Tiny Auto Encoder")
load_taesd(
self.pipeline,
use_local_model,
self.torch_data_type,
)
load_taesd(
self.img_to_img_pipeline,
use_local_model,
self.torch_data_type,
)
if not self.use_openvino and not is_openvino_device():
self._pipeline_to_device()
if not self._is_hetero_pipeline():
if (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.image_to_image.value
and lcm_diffusion_setting.use_openvino
):
self.pipeline.scheduler = LCMScheduler.from_config(
self.pipeline.scheduler.config,
)
else:
if not lcm_diffusion_setting.use_gguf_model:
self._update_lcm_scheduler_params()
if use_lora:
self._add_freeu()
self.previous_model_id = model_id
self.previous_ov_model_id = self.ov_model_id
self.previous_use_tae_sd = use_tiny_auto_encoder
self.previous_lcm_lora_base_id = lcm_lora.base_model_id
self.previous_lcm_lora_id = lcm_lora.lcm_lora_id
self.previous_use_lcm_lora = use_lora
self.previous_token_merging = lcm_diffusion_setting.token_merging
self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker
self.previous_use_openvino = lcm_diffusion_setting.use_openvino
self.previous_task_type = lcm_diffusion_setting.diffusion_task
self.previous_lora = lcm_diffusion_setting.lora.model_copy(deep=True)
self.previous_use_gguf_model = lcm_diffusion_setting.use_gguf_model
self.previous_gguf_model = lcm_diffusion_setting.gguf_model.model_copy(
deep=True
)
lcm_diffusion_setting.rebuild_pipeline = False
if (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.text_to_image.value
):
print(f"Pipeline : {self.pipeline}")
elif (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.image_to_image.value
):
if self.use_openvino and is_openvino_device():
print(f"Pipeline : {self.pipeline}")
else:
print(f"Pipeline : {self.img_to_img_pipeline}")
if self.use_openvino:
if lcm_diffusion_setting.lora.enabled:
print("Warning: Lora models not supported on OpenVINO mode")
elif not lcm_diffusion_setting.use_gguf_model:
adapters = self.pipeline.get_active_adapters()
print(f"Active adapters : {adapters}")
def _get_timesteps(self):
time_steps = self.pipeline.scheduler.config.get("timesteps")
time_steps_value = [int(time_steps)] if time_steps else None
return time_steps_value
def generate(
self,
lcm_diffusion_setting: LCMDiffusionSetting,
reshape: bool = False,
) -> Any:
guidance_scale = lcm_diffusion_setting.guidance_scale
img_to_img_inference_steps = lcm_diffusion_setting.inference_steps
check_step_value = int(
lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength
)
if (
lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value
and check_step_value < 1
):
img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength)
print(
f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}"
)
pipeline_extra_args = {}
if lcm_diffusion_setting.use_seed:
cur_seed = lcm_diffusion_setting.seed
# for multiple images with a fixed seed, use sequential seeds
seeds = [(cur_seed + i) for i in range(lcm_diffusion_setting.number_of_images)]
else:
seeds = [random.randint(0,999999999) for i in range(lcm_diffusion_setting.number_of_images)]
if self.use_openvino:
# no support for generators; try at least to ensure reproducible results for single images
np.random.seed(seeds[0])
if self._is_hetero_pipeline():
torch.manual_seed(seeds[0])
lcm_diffusion_setting.seed=seeds[0]
else:
pipeline_extra_args['generator'] = [
torch.Generator(device=self.device).manual_seed(s) for s in seeds]
is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device()
if is_openvino_pipe and not self._is_hetero_pipeline():
print("Using OpenVINO")
if reshape and not self.is_openvino_init:
print("Reshape and compile")
self.pipeline.reshape(
batch_size=-1,
height=lcm_diffusion_setting.image_height,
width=lcm_diffusion_setting.image_width,
num_images_per_prompt=lcm_diffusion_setting.number_of_images,
)
self.pipeline.compile()
if self.is_openvino_init:
self.is_openvino_init = False
if is_openvino_pipe and self._is_hetero_pipeline():
return self._generate_images_hetero_compute(lcm_diffusion_setting)
elif lcm_diffusion_setting.use_gguf_model:
return self._generate_images_gguf(lcm_diffusion_setting)
if lcm_diffusion_setting.clip_skip > 1:
# We follow the convention that "CLIP Skip == 2" means "skip
# the last layer", so "CLIP Skip == 1" means "no skipping"
pipeline_extra_args["clip_skip"] = lcm_diffusion_setting.clip_skip - 1
if not lcm_diffusion_setting.use_safety_checker:
self.pipeline.safety_checker = None
if (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.image_to_image.value
and not is_openvino_pipe
):
self.img_to_img_pipeline.safety_checker = None
if (
not lcm_diffusion_setting.use_lcm_lora
and not lcm_diffusion_setting.use_openvino
and lcm_diffusion_setting.guidance_scale != 1.0
):
print("Not using LCM-LoRA so setting guidance_scale 1.0")
guidance_scale = 1.0
controlnet_args = update_controlnet_arguments(lcm_diffusion_setting)
if lcm_diffusion_setting.use_openvino:
if (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.text_to_image.value
):
result_images = self.pipeline(
prompt=lcm_diffusion_setting.prompt,
negative_prompt=lcm_diffusion_setting.negative_prompt,
num_inference_steps=lcm_diffusion_setting.inference_steps,
guidance_scale=guidance_scale,
width=lcm_diffusion_setting.image_width,
height=lcm_diffusion_setting.image_height,
num_images_per_prompt=lcm_diffusion_setting.number_of_images,
).images
elif (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.image_to_image.value
):
result_images = self.pipeline(
image=lcm_diffusion_setting.init_image,
strength=lcm_diffusion_setting.strength,
prompt=lcm_diffusion_setting.prompt,
negative_prompt=lcm_diffusion_setting.negative_prompt,
num_inference_steps=img_to_img_inference_steps * 3,
guidance_scale=guidance_scale,
num_images_per_prompt=lcm_diffusion_setting.number_of_images,
).images
else:
if (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.text_to_image.value
):
result_images = self.pipeline(
prompt=lcm_diffusion_setting.prompt,
negative_prompt=lcm_diffusion_setting.negative_prompt,
num_inference_steps=lcm_diffusion_setting.inference_steps,
guidance_scale=guidance_scale,
width=lcm_diffusion_setting.image_width,
height=lcm_diffusion_setting.image_height,
num_images_per_prompt=lcm_diffusion_setting.number_of_images,
timesteps=self._get_timesteps(),
**pipeline_extra_args,
**controlnet_args,
).images
elif (
lcm_diffusion_setting.diffusion_task
== DiffusionTask.image_to_image.value
):
result_images = self.img_to_img_pipeline(
image=lcm_diffusion_setting.init_image,
strength=lcm_diffusion_setting.strength,
prompt=lcm_diffusion_setting.prompt,
negative_prompt=lcm_diffusion_setting.negative_prompt,
num_inference_steps=img_to_img_inference_steps,
guidance_scale=guidance_scale,
width=lcm_diffusion_setting.image_width,
height=lcm_diffusion_setting.image_height,
num_images_per_prompt=lcm_diffusion_setting.number_of_images,
**pipeline_extra_args,
**controlnet_args,
).images
for (i, seed) in enumerate(seeds):
result_images[i].info['image_seed'] = seed
return result_images
def _init_gguf_diffusion(
self,
lcm_diffusion_setting: LCMDiffusionSetting,
):
config = ModelConfig()
config.model_path = lcm_diffusion_setting.gguf_model.diffusion_path
config.diffusion_model_path = lcm_diffusion_setting.gguf_model.diffusion_path
config.clip_l_path = lcm_diffusion_setting.gguf_model.clip_path
config.t5xxl_path = lcm_diffusion_setting.gguf_model.t5xxl_path
config.vae_path = lcm_diffusion_setting.gguf_model.vae_path
config.n_threads = GGUF_THREADS
print(f"GGUF Threads : {GGUF_THREADS} ")
print("GGUF - Model config")
pprint(lcm_diffusion_setting.gguf_model.model_dump())
self.pipeline = GGUFDiffusion(
get_app_path(), # Place DLL in fastsdcpu folder
config,
True,
)
def _generate_images_gguf(
self,
lcm_diffusion_setting: LCMDiffusionSetting,
):
if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value:
t2iconfig = Txt2ImgConfig()
t2iconfig.prompt = lcm_diffusion_setting.prompt
t2iconfig.batch_count = lcm_diffusion_setting.number_of_images
t2iconfig.cfg_scale = lcm_diffusion_setting.guidance_scale
t2iconfig.height = lcm_diffusion_setting.image_height
t2iconfig.width = lcm_diffusion_setting.image_width
t2iconfig.sample_steps = lcm_diffusion_setting.inference_steps
t2iconfig.sample_method = SampleMethod.EULER
if lcm_diffusion_setting.use_seed:
t2iconfig.seed = lcm_diffusion_setting.seed
else:
t2iconfig.seed = -1
return self.pipeline.generate_text2mg(t2iconfig)