|  | import glob | 
					
						
						|  | import os | 
					
						
						|  | import shutil | 
					
						
						|  | import time | 
					
						
						|  | from random import randint | 
					
						
						|  |  | 
					
						
						|  | import cv2 | 
					
						
						|  | import numpy as np | 
					
						
						|  | import torch | 
					
						
						|  | from densepose import add_densepose_config | 
					
						
						|  | from densepose.vis.base import CompoundVisualizer | 
					
						
						|  | from densepose.vis.densepose_results import DensePoseResultsFineSegmentationVisualizer | 
					
						
						|  | from densepose.vis.extractor import CompoundExtractor, create_extractor | 
					
						
						|  | from detectron2.config import get_cfg | 
					
						
						|  | from detectron2.data.detection_utils import read_image | 
					
						
						|  | from detectron2.engine.defaults import DefaultPredictor | 
					
						
						|  | from PIL import Image | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class DensePose: | 
					
						
						|  | """ | 
					
						
						|  | DensePose used in this project is from Detectron2 (https://github.com/facebookresearch/detectron2). | 
					
						
						|  | These codes are modified from https://github.com/facebookresearch/detectron2/tree/main/projects/DensePose. | 
					
						
						|  | The checkpoint is downloaded from https://github.com/facebookresearch/detectron2/blob/main/projects/DensePose/doc/DENSEPOSE_IUV.md#ModelZoo. | 
					
						
						|  |  | 
					
						
						|  | We use the model R_50_FPN_s1x with id 165712039, but other models should also work. | 
					
						
						|  | The config file is downloaded from https://github.com/facebookresearch/detectron2/tree/main/projects/DensePose/configs. | 
					
						
						|  | Noted that the config file should match the model checkpoint and Base-DensePose-RCNN-FPN.yaml is also needed. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | def __init__(self, model_path="./checkpoints/densepose_", device="cuda"): | 
					
						
						|  | self.device = device | 
					
						
						|  | self.config_path = os.path.join(model_path, "densepose_rcnn_R_50_FPN_s1x.yaml") | 
					
						
						|  | self.model_path = os.path.join(model_path, "model_final_162be9.pkl") | 
					
						
						|  | self.visualizations = ["dp_segm"] | 
					
						
						|  | self.VISUALIZERS = {"dp_segm": DensePoseResultsFineSegmentationVisualizer} | 
					
						
						|  | self.min_score = 0.8 | 
					
						
						|  |  | 
					
						
						|  | self.cfg = self.setup_config() | 
					
						
						|  | self.predictor = DefaultPredictor(self.cfg) | 
					
						
						|  | self.predictor.model.to(self.device) | 
					
						
						|  |  | 
					
						
						|  | def setup_config(self): | 
					
						
						|  | opts = ["MODEL.ROI_HEADS.SCORE_THRESH_TEST", str(self.min_score)] | 
					
						
						|  | cfg = get_cfg() | 
					
						
						|  | add_densepose_config(cfg) | 
					
						
						|  | cfg.merge_from_file(self.config_path) | 
					
						
						|  | cfg.merge_from_list(opts) | 
					
						
						|  | cfg.MODEL.WEIGHTS = self.model_path | 
					
						
						|  | cfg.freeze() | 
					
						
						|  | return cfg | 
					
						
						|  |  | 
					
						
						|  | @staticmethod | 
					
						
						|  | def _get_input_file_list(input_spec: str): | 
					
						
						|  | if os.path.isdir(input_spec): | 
					
						
						|  | file_list = [ | 
					
						
						|  | os.path.join(input_spec, fname) | 
					
						
						|  | for fname in os.listdir(input_spec) | 
					
						
						|  | if os.path.isfile(os.path.join(input_spec, fname)) | 
					
						
						|  | ] | 
					
						
						|  | elif os.path.isfile(input_spec): | 
					
						
						|  | file_list = [input_spec] | 
					
						
						|  | else: | 
					
						
						|  | file_list = glob.glob(input_spec) | 
					
						
						|  | return file_list | 
					
						
						|  |  | 
					
						
						|  | def create_context(self, cfg, output_path): | 
					
						
						|  | vis_specs = self.visualizations | 
					
						
						|  | visualizers = [] | 
					
						
						|  | extractors = [] | 
					
						
						|  | for vis_spec in vis_specs: | 
					
						
						|  | texture_atlas = texture_atlases_dict = None | 
					
						
						|  | vis = self.VISUALIZERS[vis_spec]( | 
					
						
						|  | cfg=cfg, | 
					
						
						|  | texture_atlas=texture_atlas, | 
					
						
						|  | texture_atlases_dict=texture_atlases_dict, | 
					
						
						|  | alpha=1.0, | 
					
						
						|  | ) | 
					
						
						|  | visualizers.append(vis) | 
					
						
						|  | extractor = create_extractor(vis) | 
					
						
						|  | extractors.append(extractor) | 
					
						
						|  | visualizer = CompoundVisualizer(visualizers) | 
					
						
						|  | extractor = CompoundExtractor(extractors) | 
					
						
						|  | context = { | 
					
						
						|  | "extractor": extractor, | 
					
						
						|  | "visualizer": visualizer, | 
					
						
						|  | "out_fname": output_path, | 
					
						
						|  | "entry_idx": 0, | 
					
						
						|  | } | 
					
						
						|  | return context | 
					
						
						|  |  | 
					
						
						|  | def execute_on_outputs(self, context, entry, outputs): | 
					
						
						|  | extractor = context["extractor"] | 
					
						
						|  |  | 
					
						
						|  | data = extractor(outputs) | 
					
						
						|  |  | 
					
						
						|  | H, W, _ = entry["image"].shape | 
					
						
						|  | result = np.zeros((H, W), dtype=np.uint8) | 
					
						
						|  |  | 
					
						
						|  | data, box = data[0] | 
					
						
						|  | x, y, w, h = [int(_) for _ in box[0].cpu().numpy()] | 
					
						
						|  | i_array = data[0].labels[None].cpu().numpy()[0] | 
					
						
						|  | result[y : y + h, x : x + w] = i_array | 
					
						
						|  | result = Image.fromarray(result) | 
					
						
						|  | result.save(context["out_fname"]) | 
					
						
						|  |  | 
					
						
						|  | def __call__(self, image_or_path, resize=512) -> Image.Image: | 
					
						
						|  | """ | 
					
						
						|  | :param image_or_path: Path of the input image. | 
					
						
						|  | :param resize: Resize the input image if its max size is larger than this value. | 
					
						
						|  | :return: Dense pose image. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | tmp_path = f"./densepose_/tmp/" | 
					
						
						|  | if not os.path.exists(tmp_path): | 
					
						
						|  | os.makedirs(tmp_path) | 
					
						
						|  |  | 
					
						
						|  | image_path = os.path.join( | 
					
						
						|  | tmp_path, f"{int(time.time())}-{self.device}-{randint(0, 100000)}.png" | 
					
						
						|  | ) | 
					
						
						|  | if isinstance(image_or_path, str): | 
					
						
						|  | assert image_or_path.split(".")[-1] in [ | 
					
						
						|  | "jpg", | 
					
						
						|  | "png", | 
					
						
						|  | ], "Only support jpg and png images." | 
					
						
						|  | shutil.copy(image_or_path, image_path) | 
					
						
						|  | elif isinstance(image_or_path, Image.Image): | 
					
						
						|  | image_or_path.save(image_path) | 
					
						
						|  | else: | 
					
						
						|  | shutil.rmtree(tmp_path) | 
					
						
						|  | raise TypeError("image_path must be str or PIL.Image.Image") | 
					
						
						|  |  | 
					
						
						|  | output_path = image_path.replace(".png", "_dense.png").replace( | 
					
						
						|  | ".jpg", "_dense.png" | 
					
						
						|  | ) | 
					
						
						|  | w, h = Image.open(image_path).size | 
					
						
						|  |  | 
					
						
						|  | file_list = self._get_input_file_list(image_path) | 
					
						
						|  | assert len(file_list), "No input images found!" | 
					
						
						|  | context = self.create_context(self.cfg, output_path) | 
					
						
						|  | for file_name in file_list: | 
					
						
						|  | img = read_image(file_name, format="BGR") | 
					
						
						|  |  | 
					
						
						|  | if (_ := max(img.shape)) > resize: | 
					
						
						|  | scale = resize / _ | 
					
						
						|  | img = cv2.resize( | 
					
						
						|  | img, (int(img.shape[1] * scale), int(img.shape[0] * scale)) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | with torch.no_grad(): | 
					
						
						|  | outputs = self.predictor(img)["instances"] | 
					
						
						|  | try: | 
					
						
						|  | self.execute_on_outputs( | 
					
						
						|  | context, {"file_name": file_name, "image": img}, outputs | 
					
						
						|  | ) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | null_gray = Image.new("L", (1, 1)) | 
					
						
						|  | null_gray.save(output_path) | 
					
						
						|  |  | 
					
						
						|  | dense_gray = Image.open(output_path).convert("L") | 
					
						
						|  | dense_gray = dense_gray.resize((w, h), Image.NEAREST) | 
					
						
						|  |  | 
					
						
						|  | os.remove(image_path) | 
					
						
						|  | os.remove(output_path) | 
					
						
						|  |  | 
					
						
						|  | return dense_gray | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  | pass | 
					
						
						|  |  |