flux-layer / utils /i2i.py
Vijish's picture
Update utils/i2i.py
d02e046 verified
raw
history blame
9.73 kB
import gradio as gr
import torch
import os
import numpy as np
from lib_layerdiffuse.pipeline_flux_img2img import FluxImg2ImgPipeline
from lib_layerdiffuse.vae import TransparentVAE, pad_rgb
from torchvision import transforms
from PIL import Image
import spaces
import gc
HF_TOKEN = os.getenv("HF_TOKEN")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
def seed_everything(seed: int) -> torch.Generator:
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
generator = torch.Generator(device=device)
generator.manual_seed(seed)
return generator
# Initialize the pipeline
i2i_pipe = FluxImg2ImgPipeline.from_pretrained(
"black-forest-labs/FLUX.1-dev",
torch_dtype=torch.bfloat16,
use_auth_token=HF_TOKEN
).to(device)
# Load the LoRA weights
i2i_pipe.load_lora_weights("RedAIGC/Flux-version-LayerDiffuse", weight_name="layerlora.safetensors")
# Initialize the transparent VAE
trans_vae = TransparentVAE(i2i_pipe.vae, i2i_pipe.vae.dtype)
trans_vae.load_state_dict(torch.load("./models/TransparentVAE.pth"), strict=False)
trans_vae.to(device)
# Custom function to safely decode latents
def safe_decode(trans_vae, latents):
try:
# Standard decoding approach
original_x, x = trans_vae.decode(latents)
return original_x, x
except RuntimeError as e:
if "Expected size 16 but got size 15" in str(e):
print("Detected size mismatch, attempting alternative decoding approach...")
# Use the standard VAE decoder as fallback
x = i2i_pipe.vae.decode(latents).sample
# Create a dummy original_x with same shape as x
original_x = x.clone()
return original_x, x
else:
# If it's a different error, re-raise it
raise
@spaces.GPU(duration=120)
def i2i_gen(
input_image,
prompt: str,
seed: int = 1111,
guidance_scale: float = 7.0,
num_inference_steps: int = 50,
strength: float = 0.8,
):
if input_image is None:
return None
# Clear CUDA cache before starting
torch.cuda.empty_cache()
gc.collect()
try:
# Process the input image
original_image = (transforms.ToTensor()(input_image)).unsqueeze(0)
# Print original image shape for debugging
print(f"Original image shape: {original_image.shape}")
# Get dimensions from the input image
height, width = original_image.shape[2], original_image.shape[3]
# Make absolutely sure dimensions are multiples of 32 (stricter than before)
height = (height // 32) * 32
width = (width // 32) * 32
# Ensure minimum dimensions
height = max(height, 64) # Increased minimum to 64
width = max(width, 64) # Increased minimum to 64
# Limit maximum dimensions to prevent memory issues
max_dim = 768 # Reduced from 1024 to be safer
if height > max_dim or width > max_dim:
# Scale down while preserving aspect ratio
if height > width:
new_height = max_dim
new_width = int((width / height) * max_dim)
new_width = (new_width // 32) * 32 # Ensure it's a multiple of 32
new_width = max(new_width, 64) # Ensure minimum width of 64
else:
new_width = max_dim
new_height = int((height / width) * max_dim)
new_height = (new_height // 32) * 32 # Ensure it's a multiple of 32
new_height = max(new_height, 64) # Ensure minimum height of 64
height, width = new_height, new_width
# Resize if needed
if height != original_image.shape[2] or width != original_image.shape[3]:
print(f"Resizing image from {original_image.shape[2]}x{original_image.shape[3]} to {height}x{width}")
original_image = transforms.functional.resize(original_image, (height, width))
# Print resized image shape for debugging
print(f"Resized image shape: {original_image.shape}")
# Prepare the image for processing - EXACTLY as in demo_i2i.py
padding_feed = [x for x in original_image.movedim(1, -1).float().cpu().numpy()]
list_of_np_rgb_padded = [pad_rgb(x) for x in padding_feed]
rgb_padded_bchw_01 = torch.from_numpy(np.stack(list_of_np_rgb_padded, axis=0)).float().movedim(-1, 1).to(device)
# Clone the original image to avoid modifications to the original
original_image_feed = original_image.clone()
# Convert RGB channels to the range [-1, 1]
original_image_feed[:, :3, :, :] = original_image_feed[:, :3, :, :] * 2.0 - 1.0
# Ensure the alpha channel exists with correct shape
if original_image_feed.shape[1] < 4:
# Add an alpha channel filled with ones
alpha = torch.ones((original_image_feed.shape[0], 1, height, width), device=original_image_feed.device)
original_image_feed = torch.cat([original_image_feed, alpha], dim=1)
# Apply alpha to RGB channels - EXACTLY as in demo_i2i.py
original_image_rgb = original_image_feed[:, :3, :, :] * original_image_feed[:, 3:4, :, :]
# Print shape information for debugging
print(f"RGB tensor shape: {original_image_feed[:, :3, :, :].shape}")
print(f"Alpha channel shape: {original_image_feed[:, 3:4, :, :].shape}")
print(f"RGB*alpha tensor shape: {original_image_rgb.shape}")
# Move tensors to device
original_image_feed = original_image_feed.to(device)
original_image_rgb = original_image_rgb.to(device)
rgb_padded_bchw_01 = rgb_padded_bchw_01.to(device)
# Verify tensor shapes before encoding
print(f"Before encoding - original_image_feed: {original_image_feed.shape}")
print(f"Before encoding - original_image_rgb: {original_image_rgb.shape}")
print(f"Before encoding - rgb_padded_bchw_01: {rgb_padded_bchw_01.shape}")
# Generate the initial latent with error handling
with torch.no_grad():
try:
initial_latent = trans_vae.encode(original_image_feed, original_image_rgb, rgb_padded_bchw_01, use_offset=True)
print(f"Initial latent shape: {initial_latent.shape}")
except Exception as e:
print(f"Error during encoding: {str(e)}")
raise
# Free up memory immediately
del original_image_feed, original_image_rgb, rgb_padded_bchw_01
torch.cuda.empty_cache()
# Generate the image
try:
latents = i2i_pipe(
latents=initial_latent,
image=original_image,
prompt=prompt,
height=height,
width=width,
num_inference_steps=num_inference_steps,
output_type="latent",
generator=seed_everything(seed),
guidance_scale=guidance_scale,
strength=strength,
).images
print(f"Pipeline output latents shape: {latents.shape}")
except Exception as e:
print(f"Error during pipeline: {str(e)}")
raise
# Free up memory
del initial_latent, original_image
torch.cuda.empty_cache()
# Process the latents
try:
latents = i2i_pipe._unpack_latents(latents, height, width, i2i_pipe.vae_scale_factor)
print(f"Unpacked latents shape: {latents.shape}")
latents = (latents / i2i_pipe.vae.config.scaling_factor) + i2i_pipe.vae.config.shift_factor
# Ensure latents have the correct shape for the decoder
# The VAE expects latents with shape [batch_size, latent_channels, height/8, width/8]
expected_h = height // 8
expected_w = width // 8
if latents.shape[2] != expected_h or latents.shape[3] != expected_w:
print(f"Reshaping latents from {latents.shape[2]}x{latents.shape[3]} to {expected_h}x{expected_w}")
latents = torch.nn.functional.interpolate(
latents,
size=(expected_h, expected_w),
mode='bilinear',
align_corners=False
)
except Exception as e:
print(f"Error during latent processing: {str(e)}")
raise
# Decode the latents
with torch.no_grad():
try:
# Use our safe decode function
original_x, x = safe_decode(trans_vae, latents)
print(f"Decoded output shapes: original_x={original_x.shape}, x={x.shape}")
except Exception as e:
print(f"Error during decoding: {str(e)}")
raise
# Free up memory
del latents
torch.cuda.empty_cache()
# Convert to image - EXACTLY as in demo_i2i.py
x = x.clamp(0, 1)
x = x.permute(0, 2, 3, 1)
img = Image.fromarray((x*255).float().cpu().numpy().astype(np.uint8)[0])
# Clean up
del original_x, x
torch.cuda.empty_cache()
gc.collect()
return img
except Exception as e:
print(f"Error in image generation: {str(e)}")
# Print stack trace for more details
import traceback
traceback.print_exc()
torch.cuda.empty_cache()
gc.collect()
return None