|
from PIL import Image |
|
import requests |
|
import os, glob |
|
import pandas as pd |
|
import numpy as np |
|
import re |
|
from transformers import CLIPProcessor, CLIPModel |
|
import importlib |
|
import torch |
|
|
|
from eta_diffusion import FineTunedModel, StableDiffuser |
|
|
|
class ExperimentImageSet: |
|
def __init__(self, stable_diffusion, eta_0_image, attack_images, interference_images = None, prompt: str = None, interference_prompt1 = None, interference_prompt2 = None, seed: int = None): |
|
self.stable_diffusion: np.ndarray = stable_diffusion |
|
self.eta_0_image: np.ndarray = eta_0_image |
|
self.attack_images: np.ndarray = attack_images |
|
self.interference_images: np.ndarray = interference_images |
|
self.target_prompt = prompt |
|
self.seed = seed |
|
self.interference_prompt1 = interference_prompt1 |
|
self.interference_prompt2 = interference_prompt2 |
|
|
|
def erased_gen(target_csv_path, target_model_path, train_method, etas, num_prompts): |
|
|
|
target_data = pd.read_csv(target_csv_path) |
|
|
|
torch.cuda.empty_cache() |
|
variance_scales = [1.0] |
|
|
|
|
|
total_images = [] |
|
total_experiment_sets = [] |
|
ct = 0 |
|
|
|
|
|
state_dict = torch.load(target_model_path) |
|
diffuser = StableDiffuser(scheduler='DDIM').to('cuda') |
|
finetuner = FineTunedModel(diffuser, train_method=train_method) |
|
finetuner.load_state_dict(state_dict) |
|
|
|
|
|
for index, row in target_data.head(num_prompts).iterrows(): |
|
prompt = row['prompt'] |
|
seed = int(row['evaluation_seed']) |
|
|
|
|
|
stable_diffusion, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
prompt, |
|
n_steps=50, |
|
generator=torch.manual_seed(seed), |
|
eta=0.0, |
|
variance_scale=0.0 |
|
) |
|
total_images.append(stable_diffusion) |
|
|
|
|
|
with finetuner: |
|
finetuned_no_attack, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
prompt, |
|
n_steps=50, |
|
generator=torch.manual_seed(seed), |
|
eta=0.0, |
|
variance_scale=0.0 |
|
) |
|
total_images.append(finetuned_no_attack) |
|
|
|
attack_images = [] |
|
for eta in etas: |
|
for variance_scale in variance_scales: |
|
eta_image, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
prompt, |
|
n_steps=50, |
|
generator=torch.manual_seed(seed), |
|
eta=eta, |
|
variance_scale=variance_scale |
|
) |
|
attack_images.append(eta_image) |
|
total_images.extend(attack_images) |
|
|
|
|
|
experiment_set = ExperimentImageSet( |
|
stable_diffusion=stable_diffusion, |
|
eta_0_image=finetuned_no_attack, |
|
attack_images=np.array(attack_images), |
|
interference_images=None, |
|
prompt=prompt, |
|
seed=seed, |
|
interference_prompt1 = None, |
|
interference_prompt2 = None |
|
) |
|
total_experiment_sets.append(experiment_set) |
|
|
|
ct += 1 + len(etas) |
|
print(f"diffusion-count {ct} for prompt: {prompt}") |
|
|
|
|
|
total_images = np.array(total_images) |
|
|
|
|
|
fixed_images = [] |
|
for image in total_images: |
|
fixed_images.append(image[0][49]) |
|
|
|
|
|
fixed_images = np.array(fixed_images) |
|
|
|
print("Image grid shape:", fixed_images.shape) |
|
|
|
return fixed_images, total_experiment_sets |
|
|
|
from transformers import CLIPModel, CLIPProcessor |
|
import torch |
|
import numpy as np |
|
|
|
from transformers import CLIPModel, CLIPProcessor |
|
import torch |
|
import numpy as np |
|
|
|
def process_images(model, processor, prompt: str, images: list): |
|
"""Processes images and returns CLIP scores.""" |
|
images = np.array(images) |
|
images = images.squeeze() |
|
print(images.shape) |
|
images = [image[49] for image in images] |
|
inputs = processor(text=[prompt], images=images, return_tensors="pt", padding=True) |
|
outputs = model(**inputs) |
|
return [clip_score.item() for clip_score in outputs.logits_per_image] |
|
|
|
def calculate_experiment_scores(experiment, model, processor): |
|
"""Calculates CLIP scores for each image set in the experiment.""" |
|
targeted_images = [experiment.stable_diffusion, experiment.eta_0_image] |
|
targeted_images.extend(experiment.attack_images) |
|
clip_scores = process_images(model, processor, experiment.target_prompt, targeted_images) |
|
|
|
scores = { |
|
'SD': clip_scores[0], |
|
'ETA_0': clip_scores[1], |
|
'ATTACK': max(clip_scores[2:]), |
|
} |
|
|
|
if experiment.interference_images: |
|
interference_images = experiment.interference_images |
|
interference_images = np.array(interference_images) |
|
interference_images = interference_images.squeeze() |
|
interference_images = [interference_image[49] for interference_image in interference_images] |
|
inputs = processor(text=[experiment.interference_prompt1], images=interference_images[0], return_tensors="pt", padding=True) |
|
outputs = model(**inputs) |
|
interference_1 = outputs.logits_per_image.item() |
|
|
|
inputs = processor(text=[experiment.interference_prompt2], images=interference_images[1], return_tensors="pt", padding=True) |
|
outputs = model(**inputs) |
|
interference_2 = outputs.logits_per_image.item() |
|
scores['INTERFERENCE1'] = interference_1 |
|
scores['INTERFERENCE2'] = interference_2 |
|
|
|
return scores |
|
|
|
def get_clip_scores(experiment_sets: list['ExperimentImageSet']): |
|
"""Processes a list of experiments and returns mean CLIP scores.""" |
|
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") |
|
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") |
|
|
|
total_clip_scores = {'SD': 0, 'ETA_0': 0, 'ATTACK': 0, 'INTERFERENCE1': 0, 'INTERFERENCE2' : 0} |
|
experiment_count = len(experiment_sets) |
|
|
|
for experiment in experiment_sets: |
|
experiment_scores = calculate_experiment_scores(experiment, model, processor) |
|
for key in total_clip_scores: |
|
total_clip_scores[key] += experiment_scores.get(key, 0) |
|
|
|
|
|
mean_clip_scores = {key: score / experiment_count for key, score in total_clip_scores.items()} |
|
return mean_clip_scores |
|
|
|
def get_simple_clip_scores(images_list, prompts): |
|
""" |
|
Processes a list of images and prompts and returns the mean CLIP score for each prompt-image pair. |
|
|
|
Args: |
|
images_list (list of lists): List of image sets where each sublist contains images for one prompt. |
|
prompts (list of str): List of prompts corresponding to each image set. |
|
|
|
Returns: |
|
mean_clip_score (float): Mean CLIP score across all image-prompt pairs. |
|
""" |
|
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") |
|
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") |
|
|
|
total_score = 0.0 |
|
total_images = 0 |
|
full_clip_set = [] |
|
for images, prompt in zip(images_list, prompts): |
|
inputs = processor(text=[prompt], images=images, return_tensors="pt", padding=True) |
|
outputs = model(**inputs) |
|
clip_scores = [clip_score.item() for clip_score in outputs.logits_per_image] |
|
full_clip_set.extend(np.round(clip_scores, 2)) |
|
|
|
|
|
return full_clip_set |
|
|
|
import matplotlib.pyplot as plt |
|
import matplotlib.pyplot as plt |
|
|
|
def show_image_grid_with_scores(img_files, subtitles=None, clip_scores=None, num_rows=3, num_cols=4, fig_size=(15, 10)): |
|
""" |
|
Displays a grid of images with subtitles and optional CLIP scores. |
|
|
|
Args: |
|
img_files (list of np.ndarray): List of images to display. |
|
subtitles (list of str): List of labels for the images. |
|
clip_scores (list of float): List of CLIP scores for the images. |
|
num_rows (int): Number of rows in the grid. |
|
num_cols (int): Number of columns in the grid. |
|
fig_size (tuple): Size of the figure. |
|
""" |
|
|
|
fig, axes = plt.subplots(num_rows, num_cols, figsize=fig_size) |
|
if not subtitles and clip_scores: |
|
subtitles = ['SD', 'Finetuned', 'ETA', "ETA", "ETA", 'eta']*(len(clip_scores)//6) |
|
else: |
|
subtitles = ['SD', 'Finetuned', 'ETA', "ETA", "ETA", 'eta'] |
|
|
|
for i, ax in enumerate(axes.flatten()): |
|
img_index = i |
|
if img_index < len(img_files): |
|
img = img_files[img_index] |
|
ax.imshow(img) |
|
|
|
|
|
if subtitles and img_index < len(subtitles): |
|
subtitle = subtitles[img_index] |
|
if clip_scores and img_index < len(clip_scores): |
|
subtitle += f" CLIP: {clip_scores[img_index]:.3f}" |
|
ax.set_title(subtitle, fontsize=14) |
|
|
|
ax.axis('off') |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def interference_gen(target_csv_path, interference_path1, interference_path2, target_model_path, train_method, etas, num_prompts): |
|
|
|
target_data = pd.read_csv(target_csv_path) |
|
interference_data1 = pd.read_csv(interference_path1) |
|
interference_data2 = pd.read_csv(interference_path2) |
|
|
|
torch.cuda.empty_cache() |
|
variance_scales = [1.0] |
|
|
|
|
|
total_images = [] |
|
total_experiment_sets = [] |
|
ct = 0 |
|
|
|
|
|
state_dict = torch.load(target_model_path) |
|
diffuser = StableDiffuser(scheduler='DDIM').to('cuda') |
|
finetuner = FineTunedModel(diffuser, train_method=train_method) |
|
finetuner.load_state_dict(state_dict) |
|
|
|
|
|
for (index, row), (index1, row1), (index2, row2) in zip( |
|
target_data.head(num_prompts).iterrows(), |
|
interference_data1.head(num_prompts).iterrows(), |
|
interference_data2.head(num_prompts).iterrows() |
|
): |
|
|
|
prompt = row['prompt'] |
|
seed = int(row['evaluation_seed']) |
|
|
|
interference_prompt1 = row1['prompt'] |
|
interference_seed1 = int(row1['evaluation_seed']) |
|
|
|
interference_prompt2 = row2['prompt'] |
|
interference_seed2 = int(row2['evaluation_seed']) |
|
|
|
|
|
stable_diffusion, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
prompt, |
|
n_steps=50, |
|
generator=torch.manual_seed(seed), |
|
eta=0.0, |
|
variance_scale=0.0 |
|
) |
|
total_images.append(stable_diffusion) |
|
|
|
|
|
with finetuner: |
|
finetuned_no_attack, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
prompt, |
|
n_steps=50, |
|
generator=torch.manual_seed(seed), |
|
eta=0.0, |
|
variance_scale=0.0 |
|
) |
|
total_images.append(finetuned_no_attack) |
|
|
|
attack_images = [] |
|
for eta in etas: |
|
for variance_scale in variance_scales: |
|
eta_image, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
prompt, |
|
n_steps=50, |
|
generator=torch.manual_seed(seed), |
|
eta=eta, |
|
variance_scale=variance_scale |
|
) |
|
attack_images.append(eta_image) |
|
total_images.extend(attack_images) |
|
|
|
|
|
interference_image1, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
interference_prompt1, |
|
n_steps=50, |
|
generator=torch.manual_seed(interference_seed1), |
|
eta=0.0, |
|
variance_scale=0.0 |
|
) |
|
total_images.append(interference_image1) |
|
|
|
interference_image2, images_steps, decoded_latents, latents, noise_preds, output_steps = diffuser( |
|
interference_prompt2, |
|
n_steps=50, |
|
generator=torch.manual_seed(interference_seed2), |
|
eta=0.0, |
|
variance_scale=0.0 |
|
) |
|
total_images.append(interference_image2) |
|
|
|
|
|
experiment_set = ExperimentImageSet( |
|
stable_diffusion=stable_diffusion, |
|
eta_0_image=finetuned_no_attack, |
|
attack_images=np.array(attack_images), |
|
interference_images=[interference_image1, interference_image2], |
|
prompt=prompt, |
|
seed=seed, |
|
interference_prompt1=interference_prompt1, |
|
interference_prompt2=interference_prompt2 |
|
) |
|
total_experiment_sets.append(experiment_set) |
|
|
|
ct += 1 + len(etas) |
|
print(f"diffusion-count {ct} for prompt: {prompt}") |
|
|
|
|
|
total_images = np.array(total_images) |
|
|
|
|
|
fixed_images = [] |
|
for image in total_images: |
|
fixed_images.append(image[0][49]) |
|
|
|
|
|
fixed_images = np.array(fixed_images) |
|
|
|
print("Image grid shape:", fixed_images.shape) |
|
|
|
return fixed_images, total_experiment_sets |
|
|