File size: 6,253 Bytes
d01863a
1e2c1e5
d01863a
 
 
 
 
 
 
 
 
18244e5
 
 
d01863a
ef1e28d
0ad8fd5
d01863a
 
ef1e28d
 
 
 
 
 
0ad8fd5
d01863a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8db1685
 
f153ba9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
---
license: mit
language:
- en
library_name: diffusers
tags:
- Class conditioned Diffusion
- CIFAR10 Diffusion
---
Here is Custom Pipeline for Class conditioned diffusion model. For training script, pipeline, tutorial nb and sampling please check my Github Repo:- https://github.com/KetanMann/Class_Conditioned_Diffusion_Training_Script
Here is Class Conditional Diffusion Pipeline and Sampling.
<div align="center">
    <img src="grid_images.gif" alt="Class Conditioned Diffusion GIF">
</div>

Firstly install Diffusers
```bash
!pip install git+https://github.com/huggingface/diffusers 
```
Then login to your huggingface account.
```bash
from huggingface_hub import notebook_login
notebook_login()
```
Finally for sampling and model testing. Run these lines of code.
```bash
from diffusers import UNet2DModel, DDPMScheduler
from diffusers.utils.torch_utils import randn_tensor
from diffusers.pipelines.pipeline_utils import DiffusionPipeline, ImagePipelineOutput
from huggingface_hub import hf_hub_download
import torch
import os
from PIL import Image
import matplotlib.pyplot as plt
from typing import List, Optional, Tuple, Union

class DDPMPipelinenew(DiffusionPipeline):
    def __init__(self, unet, scheduler, num_classes: int):
        super().__init__()
        self.register_modules(unet=unet, scheduler=scheduler)
        self.num_classes = num_classes
        self._device = unet.device  # Ensure the pipeline knows the device

    @torch.no_grad()
    def __call__(
        self,
        batch_size: int = 64,
        class_labels: Optional[torch.Tensor] = None,
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        num_inference_steps: int = 1000,
        output_type: Optional[str] = "pil",
        return_dict: bool = True,
    ) -> Union[ImagePipelineOutput, Tuple]:
        
        # Ensure class_labels is on the same device as the model
        class_labels = class_labels.to(self._device)
        if class_labels.ndim == 0:
            class_labels = class_labels.unsqueeze(0).expand(batch_size)
        else:
            class_labels = class_labels.expand(batch_size)

        # Sample gaussian noise to begin loop
        if isinstance(self.unet.config.sample_size, int):
            image_shape = (
                batch_size,
                self.unet.config.in_channels,
                self.unet.config.sample_size,
                self.unet.config.sample_size,
            )
        else:
            image_shape = (batch_size, self.unet.config.in_channels, *self.unet.config.sample_size)

        image = randn_tensor(image_shape, generator=generator, device=self._device)

        # Set step values
        self.scheduler.set_timesteps(num_inference_steps)

        for t in self.progress_bar(self.scheduler.timesteps):
            # Ensure the class labels are correctly broadcast to match the input tensor shape
            model_output = self.unet(image, t, class_labels).sample

            image = self.scheduler.step(model_output, t, image, generator=generator).prev_sample

        image = (image / 2 + 0.5).clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).numpy()
        if output_type == "pil":
            image = self.numpy_to_pil(image)

        if not return_dict:
            return (image,)

        return ImagePipelineOutput(images=image)

    def to(self, device: torch.device):
        self._device = device
        self.unet.to(device)
        return self

def load_pipeline(repo_id, num_classes, device):
    unet = UNet2DModel.from_pretrained(repo_id, subfolder="unet").to(device)
    scheduler = DDPMScheduler.from_pretrained(repo_id, subfolder="scheduler")
    pipeline = DDPMPipelinenew(unet=unet, scheduler=scheduler, num_classes=num_classes)
    return pipeline.to(device)  # Move the entire pipeline to the device

def save_images_locally(images, save_dir, epoch, class_label):
    os.makedirs(save_dir, exist_ok=True)
    for i, image in enumerate(images):
        image_path = os.path.join(save_dir, f"image_epoch{epoch}_class{class_label}_idx{i}.png")
        image.save(image_path)

def generate_images(pipeline, class_label, batch_size, num_inference_steps, save_dir, epoch):
    generator = torch.Generator(device=pipeline._device).manual_seed(0)
    class_labels = torch.tensor([class_label] * batch_size).to(pipeline._device)
    images = pipeline(
        generator=generator,
        batch_size=batch_size,
        num_inference_steps=num_inference_steps,
        class_labels=class_labels,
        output_type="pil",
    ).images
    save_images_locally(images, save_dir, epoch, class_label)
    return images

def create_image_grid(images, grid_size, save_path):
    assert len(images) == grid_size ** 2, "Number of images must be equal to grid_size squared"
    width, height = images[0].size
    grid_img = Image.new('RGB', (grid_size * width, grid_size * height))
    
    for i, image in enumerate(images):
        x = i % grid_size * width
        y = i // grid_size * height
        grid_img.paste(image, (x, y))
    
    grid_img.save(save_path)
    return grid_img

if __name__ == "__main__":
    repo_id = "Ketansomewhere/cifar10_conditional_diffusion1"
    num_classes = 10  # Adjust to your number of classes
    batch_size = 64
    num_inference_steps = 1000  # Can be as low as 50 for faster generation
    save_dir = "generated_images"
    epoch = 0
    grid_size = 8  # 8x8 grid

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    pipeline = load_pipeline(repo_id, num_classes, device)
    
    for class_label in range(num_classes):
        images = generate_images(pipeline, class_label, batch_size, num_inference_steps, save_dir, epoch)
        
        # Create and save the grid image
        grid_img_path = os.path.join(save_dir, f"grid_image_class{class_label}.png")
        grid_img = create_image_grid(images, grid_size, grid_img_path)
        
        # Plot the grid image
        plt.figure(figsize=(10, 10))
        plt.imshow(grid_img)
        plt.axis('off')
        plt.title(f'Class {class_label}')
        plt.savefig(os.path.join(save_dir, f"grid_image_class{class_label}.png"))
        plt.show()
```

Also, check this nb for the above implementation *testing.ipynb* .