import gradio as gr import torch import os import numpy as np from lib_layerdiffuse.pipeline_flux_img2img import FluxImg2ImgPipeline from lib_layerdiffuse.vae import TransparentVAE, pad_rgb from torchvision import transforms from PIL import Image import spaces import gc HF_TOKEN = os.getenv("HF_TOKEN") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") def seed_everything(seed: int) -> torch.Generator: torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) generator = torch.Generator(device=device) generator.manual_seed(seed) return generator # Initialize the pipeline i2i_pipe = FluxImg2ImgPipeline.from_pretrained( "black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16, use_auth_token=HF_TOKEN ).to(device) # Load the LoRA weights i2i_pipe.load_lora_weights("RedAIGC/Flux-version-LayerDiffuse", weight_name="layerlora.safetensors") # Initialize the transparent VAE trans_vae = TransparentVAE(i2i_pipe.vae, i2i_pipe.vae.dtype) trans_vae.load_state_dict(torch.load("./models/TransparentVAE.pth"), strict=False) trans_vae.to(device) # Custom function to safely decode latents def safe_decode(trans_vae, latents): try: # Standard decoding approach original_x, x = trans_vae.decode(latents) return original_x, x except RuntimeError as e: if "Expected size 16 but got size 15" in str(e): print("Detected size mismatch, attempting alternative decoding approach...") # Use the standard VAE decoder as fallback x = i2i_pipe.vae.decode(latents).sample # Create a dummy original_x with same shape as x original_x = x.clone() return original_x, x else: # If it's a different error, re-raise it raise @spaces.GPU(duration=120) def i2i_gen( input_image, prompt: str, seed: int = 1111, guidance_scale: float = 7.0, num_inference_steps: int = 50, strength: float = 0.8, ): if input_image is None: return None # Clear CUDA cache before starting torch.cuda.empty_cache() gc.collect() try: # Process the input image original_image = (transforms.ToTensor()(input_image)).unsqueeze(0) # Print original image shape for debugging print(f"Original image shape: {original_image.shape}") # Get dimensions from the input image height, width = original_image.shape[2], original_image.shape[3] # Make absolutely sure dimensions are multiples of 32 (stricter than before) height = (height // 32) * 32 width = (width // 32) * 32 # Ensure minimum dimensions height = max(height, 64) # Increased minimum to 64 width = max(width, 64) # Increased minimum to 64 # Limit maximum dimensions to prevent memory issues max_dim = 768 # Reduced from 1024 to be safer if height > max_dim or width > max_dim: # Scale down while preserving aspect ratio if height > width: new_height = max_dim new_width = int((width / height) * max_dim) new_width = (new_width // 32) * 32 # Ensure it's a multiple of 32 new_width = max(new_width, 64) # Ensure minimum width of 64 else: new_width = max_dim new_height = int((height / width) * max_dim) new_height = (new_height // 32) * 32 # Ensure it's a multiple of 32 new_height = max(new_height, 64) # Ensure minimum height of 64 height, width = new_height, new_width # Resize if needed if height != original_image.shape[2] or width != original_image.shape[3]: print(f"Resizing image from {original_image.shape[2]}x{original_image.shape[3]} to {height}x{width}") original_image = transforms.functional.resize(original_image, (height, width)) # Print resized image shape for debugging print(f"Resized image shape: {original_image.shape}") # Prepare the image for processing - EXACTLY as in demo_i2i.py padding_feed = [x for x in original_image.movedim(1, -1).float().cpu().numpy()] list_of_np_rgb_padded = [pad_rgb(x) for x in padding_feed] rgb_padded_bchw_01 = torch.from_numpy(np.stack(list_of_np_rgb_padded, axis=0)).float().movedim(-1, 1).to(device) # Clone the original image to avoid modifications to the original original_image_feed = original_image.clone() # Convert RGB channels to the range [-1, 1] original_image_feed[:, :3, :, :] = original_image_feed[:, :3, :, :] * 2.0 - 1.0 # Ensure the alpha channel exists with correct shape if original_image_feed.shape[1] < 4: # Add an alpha channel filled with ones alpha = torch.ones((original_image_feed.shape[0], 1, height, width), device=original_image_feed.device) original_image_feed = torch.cat([original_image_feed, alpha], dim=1) # Apply alpha to RGB channels - EXACTLY as in demo_i2i.py original_image_rgb = original_image_feed[:, :3, :, :] * original_image_feed[:, 3:4, :, :] # Print shape information for debugging print(f"RGB tensor shape: {original_image_feed[:, :3, :, :].shape}") print(f"Alpha channel shape: {original_image_feed[:, 3:4, :, :].shape}") print(f"RGB*alpha tensor shape: {original_image_rgb.shape}") # Move tensors to device original_image_feed = original_image_feed.to(device) original_image_rgb = original_image_rgb.to(device) rgb_padded_bchw_01 = rgb_padded_bchw_01.to(device) # Verify tensor shapes before encoding print(f"Before encoding - original_image_feed: {original_image_feed.shape}") print(f"Before encoding - original_image_rgb: {original_image_rgb.shape}") print(f"Before encoding - rgb_padded_bchw_01: {rgb_padded_bchw_01.shape}") # Generate the initial latent with error handling with torch.no_grad(): try: initial_latent = trans_vae.encode(original_image_feed, original_image_rgb, rgb_padded_bchw_01, use_offset=True) print(f"Initial latent shape: {initial_latent.shape}") except Exception as e: print(f"Error during encoding: {str(e)}") raise # Free up memory immediately del original_image_feed, original_image_rgb, rgb_padded_bchw_01 torch.cuda.empty_cache() # Generate the image try: latents = i2i_pipe( latents=initial_latent, image=original_image, prompt=prompt, height=height, width=width, num_inference_steps=num_inference_steps, output_type="latent", generator=seed_everything(seed), guidance_scale=guidance_scale, strength=strength, ).images print(f"Pipeline output latents shape: {latents.shape}") except Exception as e: print(f"Error during pipeline: {str(e)}") raise # Free up memory del initial_latent, original_image torch.cuda.empty_cache() # Process the latents try: latents = i2i_pipe._unpack_latents(latents, height, width, i2i_pipe.vae_scale_factor) print(f"Unpacked latents shape: {latents.shape}") latents = (latents / i2i_pipe.vae.config.scaling_factor) + i2i_pipe.vae.config.shift_factor # Ensure latents have the correct shape for the decoder # The VAE expects latents with shape [batch_size, latent_channels, height/8, width/8] expected_h = height // 8 expected_w = width // 8 if latents.shape[2] != expected_h or latents.shape[3] != expected_w: print(f"Reshaping latents from {latents.shape[2]}x{latents.shape[3]} to {expected_h}x{expected_w}") latents = torch.nn.functional.interpolate( latents, size=(expected_h, expected_w), mode='bilinear', align_corners=False ) except Exception as e: print(f"Error during latent processing: {str(e)}") raise # Decode the latents with torch.no_grad(): try: # Use our safe decode function original_x, x = safe_decode(trans_vae, latents) print(f"Decoded output shapes: original_x={original_x.shape}, x={x.shape}") except Exception as e: print(f"Error during decoding: {str(e)}") raise # Free up memory del latents torch.cuda.empty_cache() # Convert to image - EXACTLY as in demo_i2i.py x = x.clamp(0, 1) x = x.permute(0, 2, 3, 1) img = Image.fromarray((x*255).float().cpu().numpy().astype(np.uint8)[0]) # Clean up del original_x, x torch.cuda.empty_cache() gc.collect() return img except Exception as e: print(f"Error in image generation: {str(e)}") # Print stack trace for more details import traceback traceback.print_exc() torch.cuda.empty_cache() gc.collect() return None