from typing import cast, Union

import PIL.Image
import torch

from diffusers import AutoencoderKL
from diffusers.image_processor import VaeImageProcessor


class EndpointHandler:
    def __init__(self, path=""):
        self.device = "cuda"
        self.dtype = torch.bfloat16
        self.vae = cast(AutoencoderKL, AutoencoderKL.from_pretrained(path, torch_dtype=self.dtype).to(self.device, self.dtype).eval())

        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)

    @staticmethod
    def _unpack_latents(latents, height, width, vae_scale_factor):
        batch_size, num_patches, channels = latents.shape

        # VAE applies 8x compression on images but we must also account for packing which requires
        # latent height and width to be divisible by 2.
        height = 2 * (int(height) // (vae_scale_factor * 2))
        width = 2 * (int(width) // (vae_scale_factor * 2))

        latents = latents.view(batch_size, height // 2, width // 2, channels // 4, 2, 2)
        latents = latents.permute(0, 3, 1, 4, 2, 5)

        latents = latents.reshape(batch_size, channels // (2 * 2), height, width)

        return latents

    @torch.no_grad()
    def __call__(self, data) -> Union[torch.Tensor, PIL.Image.Image]:
        """
        Args:
            data (:obj:):
                includes the input data and the parameters for the inference.
        """
        tensor = cast(torch.Tensor, data["inputs"])
        parameters = cast(dict, data.get("parameters", {}))
        if tensor.ndim == 3 and ("height" not in parameters or "width" not in parameters):
            raise ValueError("Expected `height` and `width` in parameters.")
        height = cast(int, parameters.get("height", 0))
        width = cast(int, parameters.get("width", 0))
        do_scaling = cast(bool, parameters.get("do_scaling", True))
        output_type = cast(str, parameters.get("output_type", "pil"))
        partial_postprocess = cast(bool, parameters.get("partial_postprocess", False))
        if partial_postprocess and output_type != "pt":
            output_type = "pt"

        tensor = tensor.to(self.device, self.dtype)
        if tensor.ndim == 3:
            tensor = self._unpack_latents(tensor, height, width, self.vae_scale_factor)

        if do_scaling:
            tensor = (
                tensor / self.vae.config.scaling_factor
            ) + self.vae.config.shift_factor

        with torch.no_grad():
            image = cast(torch.Tensor, self.vae.decode(tensor, return_dict=False)[0])

        if partial_postprocess:
            image = (image * 0.5 + 0.5).clamp(0, 1)
            image = image.permute(0, 2, 3, 1).contiguous().float()
            image = (image * 255).round().to(torch.uint8)
        elif output_type == "pil":
            image = cast(PIL.Image.Image, self.image_processor.postprocess(image, output_type="pil")[0])

        return image