File size: 3,356 Bytes
4efc065
 
0129d6f
4efc065
 
 
 
 
 
 
 
 
 
 
 
 
 
0129d6f
4efc065
 
 
 
 
 
 
 
 
0129d6f
 
 
 
 
 
4efc065
 
 
 
 
 
 
 
 
 
79bb82a
 
4efc065
 
 
 
 
 
 
 
0129d6f
75d29d6
4efc065
 
79bb82a
0129d6f
a678ae0
0129d6f
75d29d6
0129d6f
9d60b6e
0129d6f
79bb82a
0129d6f
 
 
 
 
 
 
 
75d29d6
0129d6f
3f9c34e
0129d6f
 
 
 
 
 
3f9c34e
 
 
127301d
3f9c34e
4efc065
 
3f9c34e
4efc065
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from typing import  Dict, List, Any
import torch
from diffusers import DPMSolverMultistepScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image
from PIL import Image
import base64
from io import BytesIO


# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if device.type != 'cuda':
    raise ValueError("need to run on GPU")

class EndpointHandler():
    def __init__(self, path=""):
        # load StableDiffusionInpaintPipeline pipeline
        self.pipe = AutoPipelineForInpainting.from_pretrained(
            "runwayml/stable-diffusion-inpainting",
            revision="fp16",
            torch_dtype=torch.float16,
        )
        # use DPMSolverMultistepScheduler
        self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config)
        # move to device        
        self.pipe = self.pipe.to(device)

        self.pipe2 = AutoPipelineForInpainting.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True)
        self.pipe2.to("cuda")

        self.pipe3 = AutoPipelineForImage2Image.from_pipe(self.pipe2)
        


    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        :param data: A dictionary contains `inputs` and optional `image` field.
        :return: A dictionary with `image` field contains image in base64.
        """
        encoded_image = data.pop("image", None)
        encoded_mask_image = data.pop("mask_image", None)
        
        prompt = data.pop("prompt", "")

        negative_prompt = data.pop("negative_prompt", "")
        
        # process image
        if encoded_image is not None and encoded_mask_image is not None:
            image = self.decode_base64_image(encoded_image)
            mask_image = self.decode_base64_image(encoded_mask_image)
        else:
            image = None
            mask_image = None 

        self.pipe.enable_xformers_memory_efficient_attention()
        
        # run inference pipeline
        out = self.pipe(prompt=prompt, negative_prompt=negative_prompt, image=image, mask_image=mask_image)

        image = out.images[0].resize((1024, 1024))

        self.pipe2.enable_xformers_memory_efficient_attention()

        image = self.pipe2(
            prompt=prompt,
            negative_prompt=negative_prompt,
            image=image,
            mask_image=mask_image,
            guidance_scale=8.0,
            num_inference_steps=100,
            strength=0.2,
            output_type="latent",  # let's keep in latent to save some VRAM
        ).images[0]
        
        self.pipe3.enable_xformers_memory_efficient_attention()
        
        image2 = self.pipe3(
            prompt=prompt,
            image=image,
            guidance_scale=8.0,
            num_inference_steps=100,
            strength=0.2,
        ).images[0]

        result = {
            "final_image": image2,
            "pipe1_img": out.images[0],
        }
            
        # return first generate PIL image
        return result
    
    # helper to decode input image
    def decode_base64_image(self, image_string):
        base64_image = base64.b64decode(image_string)
        buffer = BytesIO(base64_image)
        image = Image.open(buffer)
        return image