|
import pandas as pd |
|
import torch |
|
import numpy as np |
|
from diffusers import StableDiffusionPipeline, DDIMScheduler |
|
class ExperimentImageSet: |
|
def __init__(self, stable_diffusion, eta_0_image, attack_images, original_interference_images = None, interference_images = None, prompt: str = None, interference_prompt1 = None, interference_prompt2 = None, seed: int = None): |
|
self.stable_diffusion: np.ndarray = stable_diffusion |
|
self.eta_0_image: np.ndarray = eta_0_image |
|
self.attack_images: np.ndarray = attack_images |
|
self.original_interference_images: np.ndarray=original_interference_images |
|
self.interference_images: np.ndarray = interference_images |
|
self.target_prompt = prompt |
|
self.seed = seed |
|
self.interference_prompt1 = interference_prompt1 |
|
self.interference_prompt2 = interference_prompt2 |
|
self.clip_scores = None |
|
|
|
def pipeline_erased_gen(target_csv_path, target_prompt, target_model_path, etas, num_prompts): |
|
|
|
target_data = pd.read_csv(target_csv_path) |
|
|
|
torch.cuda.empty_cache() |
|
variance_scales = [1.0] |
|
|
|
|
|
total_images = [] |
|
total_experiment_sets = [] |
|
ct = 0 |
|
original_pipeline = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4") |
|
original_pipeline.scheduler = DDIMScheduler.from_config(original_pipeline.scheduler.config) |
|
original_pipeline.safety_checker = None |
|
original_pipeline = original_pipeline.to("cuda") |
|
pipeline = StableDiffusionPipeline.from_pretrained(target_model_path) |
|
pipeline.scheduler = DDIMScheduler.from_config(pipeline.scheduler.config) |
|
pipeline.safety_checker = None |
|
pipeline = pipeline.to("cuda") |
|
|
|
|
|
for index, row in target_data.head(num_prompts).iterrows(): |
|
|
|
prompt = row['prompt'] |
|
seed = int(row['evaluation_seed']) |
|
|
|
|
|
generator = torch.manual_seed(seed) |
|
|
|
|
|
stable_diffusion = original_pipeline(prompt, num_inference_steps=50, generator=generator, eta=0.0).images[0] |
|
stable_diffusion = np.array(stable_diffusion) |
|
total_images.append(stable_diffusion) |
|
|
|
|
|
finetuned_no_attack = pipeline(prompt, num_inference_steps=50, generator=generator, eta=0.0).images[0] |
|
finetuned_no_attack = np.array(finetuned_no_attack) |
|
total_images.append(finetuned_no_attack) |
|
|
|
|
|
attack_images = [] |
|
for eta in etas: |
|
for variance_scale in variance_scales: |
|
attacked_image = pipeline( |
|
prompt, |
|
num_inference_steps=50, |
|
generator=generator, |
|
eta=eta, |
|
variance_scale=variance_scale |
|
).images[0] |
|
attacked_image = np.array(attacked_image) |
|
attack_images.append(attacked_image) |
|
attack_images = np.array(attack_images) |
|
total_images.extend(attack_images) |
|
|
|
|
|
experiment_set = ExperimentImageSet( |
|
stable_diffusion=stable_diffusion, |
|
eta_0_image=finetuned_no_attack, |
|
attack_images=attack_images, |
|
original_interference_images= None, |
|
interference_images=None, |
|
prompt=target_prompt, |
|
seed=seed, |
|
interference_prompt1=None, |
|
interference_prompt2=None |
|
) |
|
total_experiment_sets.append(experiment_set) |
|
|
|
ct += 1 + len(etas) * len(variance_scales) |
|
print(f"diffusion-count {ct} for prompt: {prompt}") |
|
|
|
|
|
total_images = np.array(total_images) |
|
|
|
|
|
fixed_images = [image for image in total_images] |
|
fixed_images = np.array(fixed_images) |
|
|
|
print("Image grid shape:", fixed_images.shape) |
|
|
|
return fixed_images, total_experiment_sets |
|
|
|
|
|
|
|
def interference_gen(target_csv_path, interference_path1, interference_path2, target_model_path, etas, num_prompts): |
|
|
|
target_data = pd.read_csv(target_csv_path) |
|
interference_data1 = pd.read_csv(interference_path1) |
|
interference_data2 = pd.read_csv(interference_path2) |
|
|
|
torch.cuda.empty_cache() |
|
variance_scales = [1.0] |
|
|
|
|
|
total_images = [] |
|
total_experiment_sets = [] |
|
ct = 0 |
|
original_pipeline = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4") |
|
original_pipeline.scheduler = DDIMScheduler.from_config(original_pipeline.scheduler.config) |
|
original_pipeline.safety_checker = None |
|
original_pipeline = original_pipeline.to("cuda") |
|
pipeline = StableDiffusionPipeline.from_pretrained(target_model_path) |
|
pipeline.scheduler = DDIMScheduler.from_config(pipeline.scheduler.config) |
|
pipeline.safety_checker = None |
|
pipeline = pipeline.to("cuda") |
|
|
|
|
|
for (index, row), (index1, row1), (index2, row2) in zip( |
|
target_data.head(num_prompts).iterrows(), |
|
interference_data1.head(num_prompts).iterrows(), |
|
interference_data2.head(num_prompts).iterrows() |
|
): |
|
|
|
prompt = row['prompt'] |
|
seed = int(row['evaluation_seed']) |
|
|
|
interference_prompt1 = row1['prompt'] |
|
interference_seed1 = int(row1['evaluation_seed']) |
|
|
|
interference_prompt2 = row2['prompt'] |
|
interference_seed2 = int(row2['evaluation_seed']) |
|
|
|
|
|
generator = torch.manual_seed(seed) |
|
|
|
|
|
stable_diffusion = original_pipeline(prompt, num_inference_steps=50, generator=generator, eta=0.0).images[0] |
|
stable_diffusion = np.array(stable_diffusion) |
|
total_images.append(stable_diffusion) |
|
|
|
|
|
finetuned_no_attack = pipeline(prompt, num_inference_steps=50, generator=generator, eta=0.0).images[0] |
|
finetuned_no_attack = np.array(finetuned_no_attack) |
|
total_images.append(finetuned_no_attack) |
|
|
|
|
|
attack_images = [] |
|
for eta in etas: |
|
for variance_scale in variance_scales: |
|
attacked_image = pipeline( |
|
prompt, |
|
num_inference_steps=50, |
|
generator=generator, |
|
eta=eta, |
|
variance_scale=variance_scale |
|
).images[0] |
|
attacked_image = np.array(attacked_image) |
|
attack_images.append(attacked_image) |
|
attack_images = np.array(attack_images) |
|
total_images.extend(attack_images) |
|
|
|
|
|
|
|
generator1 = torch.manual_seed(interference_seed1) |
|
original_interference_image1 = pipeline( |
|
interference_prompt1, |
|
num_inference_steps=50, |
|
generator=generator1, |
|
eta=0.0, |
|
variance_scale=0.0 |
|
).images[0] |
|
|
|
original_interference_image1 = np.array(original_interference_image1) |
|
total_images.append(original_interference_image1) |
|
|
|
interference_image1 = pipeline( |
|
interference_prompt1, |
|
num_inference_steps=50, |
|
generator=generator1, |
|
eta=0.0, |
|
variance_scale=0.0 |
|
).images[0] |
|
interference_image1 = np.array(interference_image1) |
|
total_images.append(interference_image1) |
|
|
|
generator2 = torch.manual_seed(interference_seed2) |
|
original_interference_image2 = pipeline( |
|
interference_prompt2, |
|
num_inference_steps=50, |
|
generator=generator2, |
|
eta=0.0, |
|
variance_scale=0.0 |
|
).images[0] |
|
original_interference_image2 = np.array(original_interference_image2) |
|
total_images.append(original_interference_image2) |
|
|
|
interference_image2 = pipeline( |
|
interference_prompt2, |
|
num_inference_steps=50, |
|
generator=generator2, |
|
eta=0.0, |
|
variance_scale=0.0 |
|
).images[0] |
|
interference_image2 = np.array(interference_image2) |
|
total_images.append(interference_image2) |
|
|
|
|
|
experiment_set = ExperimentImageSet( |
|
stable_diffusion=stable_diffusion, |
|
eta_0_image=finetuned_no_attack, |
|
attack_images=attack_images, |
|
original_interference_images=[original_interference_image1, original_interference_image2], |
|
interference_images=[interference_image1, interference_image2], |
|
prompt="art in the style of Van Gogh", |
|
seed=seed, |
|
interference_prompt1="art in the style of Picasso", |
|
interference_prompt2="art in the style of Andy Warhol" |
|
) |
|
total_experiment_sets.append(experiment_set) |
|
|
|
ct += 1 + len(etas) * len(variance_scales) |
|
print(f"diffusion-count {ct} for prompt: {prompt}") |
|
|
|
|
|
total_images = np.array(total_images) |
|
|
|
|
|
fixed_images = [image for image in total_images] |
|
fixed_images = np.array(fixed_images) |
|
|
|
print("Image grid shape:", fixed_images.shape) |
|
|
|
return fixed_images, total_experiment_sets |
|
|