Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
import os | |
import numpy as np | |
from lib_layerdiffuse.pipeline_flux_img2img import FluxImg2ImgPipeline | |
from lib_layerdiffuse.vae import TransparentVAE, pad_rgb | |
from torchvision import transforms | |
from PIL import Image | |
import spaces | |
import gc | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
def seed_everything(seed: int) -> torch.Generator: | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
np.random.seed(seed) | |
generator = torch.Generator(device=device) | |
generator.manual_seed(seed) | |
return generator | |
# Initialize the pipeline | |
i2i_pipe = FluxImg2ImgPipeline.from_pretrained( | |
"black-forest-labs/FLUX.1-dev", | |
torch_dtype=torch.bfloat16, | |
use_auth_token=HF_TOKEN | |
).to(device) | |
# Load the LoRA weights | |
i2i_pipe.load_lora_weights("RedAIGC/Flux-version-LayerDiffuse", weight_name="layerlora.safetensors") | |
# Initialize the transparent VAE | |
trans_vae = TransparentVAE(i2i_pipe.vae, i2i_pipe.vae.dtype) | |
trans_vae.load_state_dict(torch.load("./models/TransparentVAE.pth"), strict=False) | |
trans_vae.to(device) | |
# Custom function to safely decode latents | |
def safe_decode(trans_vae, latents): | |
try: | |
# Standard decoding approach | |
original_x, x = trans_vae.decode(latents) | |
return original_x, x | |
except RuntimeError as e: | |
if "Expected size 16 but got size 15" in str(e): | |
print("Detected size mismatch, attempting alternative decoding approach...") | |
# Use the standard VAE decoder as fallback | |
x = i2i_pipe.vae.decode(latents).sample | |
# Create a dummy original_x with same shape as x | |
original_x = x.clone() | |
return original_x, x | |
else: | |
# If it's a different error, re-raise it | |
raise | |
def i2i_gen( | |
input_image, | |
prompt: str, | |
seed: int = 1111, | |
guidance_scale: float = 7.0, | |
num_inference_steps: int = 50, | |
strength: float = 0.8, | |
): | |
if input_image is None: | |
return None | |
# Clear CUDA cache before starting | |
torch.cuda.empty_cache() | |
gc.collect() | |
try: | |
# Process the input image | |
original_image = (transforms.ToTensor()(input_image)).unsqueeze(0) | |
# Print original image shape for debugging | |
print(f"Original image shape: {original_image.shape}") | |
# Get dimensions from the input image | |
height, width = original_image.shape[2], original_image.shape[3] | |
# Make absolutely sure dimensions are multiples of 32 (stricter than before) | |
height = (height // 32) * 32 | |
width = (width // 32) * 32 | |
# Ensure minimum dimensions | |
height = max(height, 64) # Increased minimum to 64 | |
width = max(width, 64) # Increased minimum to 64 | |
# Limit maximum dimensions to prevent memory issues | |
max_dim = 768 # Reduced from 1024 to be safer | |
if height > max_dim or width > max_dim: | |
# Scale down while preserving aspect ratio | |
if height > width: | |
new_height = max_dim | |
new_width = int((width / height) * max_dim) | |
new_width = (new_width // 32) * 32 # Ensure it's a multiple of 32 | |
new_width = max(new_width, 64) # Ensure minimum width of 64 | |
else: | |
new_width = max_dim | |
new_height = int((height / width) * max_dim) | |
new_height = (new_height // 32) * 32 # Ensure it's a multiple of 32 | |
new_height = max(new_height, 64) # Ensure minimum height of 64 | |
height, width = new_height, new_width | |
# Resize if needed | |
if height != original_image.shape[2] or width != original_image.shape[3]: | |
print(f"Resizing image from {original_image.shape[2]}x{original_image.shape[3]} to {height}x{width}") | |
original_image = transforms.functional.resize(original_image, (height, width)) | |
# Print resized image shape for debugging | |
print(f"Resized image shape: {original_image.shape}") | |
# Prepare the image for processing - EXACTLY as in demo_i2i.py | |
padding_feed = [x for x in original_image.movedim(1, -1).float().cpu().numpy()] | |
list_of_np_rgb_padded = [pad_rgb(x) for x in padding_feed] | |
rgb_padded_bchw_01 = torch.from_numpy(np.stack(list_of_np_rgb_padded, axis=0)).float().movedim(-1, 1).to(device) | |
# Clone the original image to avoid modifications to the original | |
original_image_feed = original_image.clone() | |
# Convert RGB channels to the range [-1, 1] | |
original_image_feed[:, :3, :, :] = original_image_feed[:, :3, :, :] * 2.0 - 1.0 | |
# Ensure the alpha channel exists with correct shape | |
if original_image_feed.shape[1] < 4: | |
# Add an alpha channel filled with ones | |
alpha = torch.ones((original_image_feed.shape[0], 1, height, width), device=original_image_feed.device) | |
original_image_feed = torch.cat([original_image_feed, alpha], dim=1) | |
# Apply alpha to RGB channels - EXACTLY as in demo_i2i.py | |
original_image_rgb = original_image_feed[:, :3, :, :] * original_image_feed[:, 3:4, :, :] | |
# Print shape information for debugging | |
print(f"RGB tensor shape: {original_image_feed[:, :3, :, :].shape}") | |
print(f"Alpha channel shape: {original_image_feed[:, 3:4, :, :].shape}") | |
print(f"RGB*alpha tensor shape: {original_image_rgb.shape}") | |
# Move tensors to device | |
original_image_feed = original_image_feed.to(device) | |
original_image_rgb = original_image_rgb.to(device) | |
rgb_padded_bchw_01 = rgb_padded_bchw_01.to(device) | |
# Verify tensor shapes before encoding | |
print(f"Before encoding - original_image_feed: {original_image_feed.shape}") | |
print(f"Before encoding - original_image_rgb: {original_image_rgb.shape}") | |
print(f"Before encoding - rgb_padded_bchw_01: {rgb_padded_bchw_01.shape}") | |
# Generate the initial latent with error handling | |
with torch.no_grad(): | |
try: | |
initial_latent = trans_vae.encode(original_image_feed, original_image_rgb, rgb_padded_bchw_01, use_offset=True) | |
print(f"Initial latent shape: {initial_latent.shape}") | |
except Exception as e: | |
print(f"Error during encoding: {str(e)}") | |
raise | |
# Free up memory immediately | |
del original_image_feed, original_image_rgb, rgb_padded_bchw_01 | |
torch.cuda.empty_cache() | |
# Generate the image | |
try: | |
latents = i2i_pipe( | |
latents=initial_latent, | |
image=original_image, | |
prompt=prompt, | |
height=height, | |
width=width, | |
num_inference_steps=num_inference_steps, | |
output_type="latent", | |
generator=seed_everything(seed), | |
guidance_scale=guidance_scale, | |
strength=strength, | |
).images | |
print(f"Pipeline output latents shape: {latents.shape}") | |
except Exception as e: | |
print(f"Error during pipeline: {str(e)}") | |
raise | |
# Free up memory | |
del initial_latent, original_image | |
torch.cuda.empty_cache() | |
# Process the latents | |
try: | |
latents = i2i_pipe._unpack_latents(latents, height, width, i2i_pipe.vae_scale_factor) | |
print(f"Unpacked latents shape: {latents.shape}") | |
latents = (latents / i2i_pipe.vae.config.scaling_factor) + i2i_pipe.vae.config.shift_factor | |
# Ensure latents have the correct shape for the decoder | |
# The VAE expects latents with shape [batch_size, latent_channels, height/8, width/8] | |
expected_h = height // 8 | |
expected_w = width // 8 | |
if latents.shape[2] != expected_h or latents.shape[3] != expected_w: | |
print(f"Reshaping latents from {latents.shape[2]}x{latents.shape[3]} to {expected_h}x{expected_w}") | |
latents = torch.nn.functional.interpolate( | |
latents, | |
size=(expected_h, expected_w), | |
mode='bilinear', | |
align_corners=False | |
) | |
except Exception as e: | |
print(f"Error during latent processing: {str(e)}") | |
raise | |
# Decode the latents | |
with torch.no_grad(): | |
try: | |
# Use our safe decode function | |
original_x, x = safe_decode(trans_vae, latents) | |
print(f"Decoded output shapes: original_x={original_x.shape}, x={x.shape}") | |
except Exception as e: | |
print(f"Error during decoding: {str(e)}") | |
raise | |
# Free up memory | |
del latents | |
torch.cuda.empty_cache() | |
# Convert to image - EXACTLY as in demo_i2i.py | |
x = x.clamp(0, 1) | |
x = x.permute(0, 2, 3, 1) | |
img = Image.fromarray((x*255).float().cpu().numpy().astype(np.uint8)[0]) | |
# Clean up | |
del original_x, x | |
torch.cuda.empty_cache() | |
gc.collect() | |
return img | |
except Exception as e: | |
print(f"Error in image generation: {str(e)}") | |
# Print stack trace for more details | |
import traceback | |
traceback.print_exc() | |
torch.cuda.empty_cache() | |
gc.collect() | |
return None |