Spaces:
Build error
Build error
import cv2 | |
import os | |
import numpy as np | |
from utils.commons.multiprocess_utils import multiprocess_run_tqdm | |
from scipy.ndimage import binary_erosion, binary_dilation | |
from tasks.eg3ds.loss_utils.segment_loss.mp_segmenter import MediapipeSegmenter | |
seg_model = MediapipeSegmenter() | |
def inpaint_torso_job(video_name, idx=None, total=None): | |
raw_img_dir = video_name.replace(".mp4", "").replace("/video/","/gt_imgs/") | |
img_names = glob.glob(os.path.join(raw_img_dir, "*.jpg")) | |
for image_path in tqdm.tqdm(img_names): | |
# read ori image | |
ori_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) # [H, W, 3] | |
segmap = seg_model._cal_seg_map(cv2.cvtColor(ori_image, cv2.COLOR_BGR2RGB)) | |
head_part = (segmap[1] + segmap[3] + segmap[5]).astype(np.bool) | |
torso_part = (segmap[4]).astype(np.bool) | |
neck_part = (segmap[2]).astype(np.bool) | |
bg_part = segmap[0].astype(np.bool) | |
head_image = cv2.imread(image_path.replace("/gt_imgs/", "/head_imgs/"), cv2.IMREAD_UNCHANGED) # [H, W, 3] | |
torso_image = cv2.imread(image_path.replace("/gt_imgs/", "/torso_imgs/"), cv2.IMREAD_UNCHANGED) # [H, W, 3] | |
bg_image = cv2.imread(image_path.replace("/gt_imgs/", "/bg_imgs/"), cv2.IMREAD_UNCHANGED) # [H, W, 3] | |
# head_part = (head_image[...,0] != 0) & (head_image[...,1] != 0) & (head_image[...,2] != 0) | |
# torso_part = (torso_image[...,0] != 0) & (torso_image[...,1] != 0) & (torso_image[...,2] != 0) | |
# bg_part = (bg_image[...,0] != 0) & (bg_image[...,1] != 0) & (bg_image[...,2] != 0) | |
# get gt image | |
gt_image = ori_image.copy() | |
gt_image[bg_part] = bg_image[bg_part] | |
cv2.imwrite(image_path.replace('ori_imgs', 'gt_imgs'), gt_image) | |
# get torso image | |
torso_image = gt_image.copy() # rgb | |
torso_image[head_part] = 0 | |
torso_alpha = 255 * np.ones((gt_image.shape[0], gt_image.shape[1], 1), dtype=np.uint8) # alpha | |
# torso part "vertical" in-painting... | |
L = 8 + 1 | |
torso_coords = np.stack(np.nonzero(torso_part), axis=-1) # [M, 2] | |
# lexsort: sort 2D coords first by y then by x, | |
# ref: https://stackoverflow.com/questions/2706605/sorting-a-2d-numpy-array-by-multiple-axes | |
inds = np.lexsort((torso_coords[:, 0], torso_coords[:, 1])) | |
torso_coords = torso_coords[inds] | |
# choose the top pixel for each column | |
u, uid, ucnt = np.unique(torso_coords[:, 1], return_index=True, return_counts=True) | |
top_torso_coords = torso_coords[uid] # [m, 2] | |
# only keep top-is-head pixels | |
top_torso_coords_up = top_torso_coords.copy() - np.array([1, 0]) # [N, 2] | |
mask = head_part[tuple(top_torso_coords_up.T)] | |
if mask.any(): | |
top_torso_coords = top_torso_coords[mask] | |
# get the color | |
top_torso_colors = gt_image[tuple(top_torso_coords.T)] # [m, 3] | |
# construct inpaint coords (vertically up, or minus in x) | |
inpaint_torso_coords = top_torso_coords[None].repeat(L, 0) # [L, m, 2] | |
inpaint_offsets = np.stack([-np.arange(L), np.zeros(L, dtype=np.int32)], axis=-1)[:, None] # [L, 1, 2] | |
inpaint_torso_coords += inpaint_offsets | |
inpaint_torso_coords = inpaint_torso_coords.reshape(-1, 2) # [Lm, 2] | |
inpaint_torso_colors = top_torso_colors[None].repeat(L, 0) # [L, m, 3] | |
darken_scaler = 0.98 ** np.arange(L).reshape(L, 1, 1) # [L, 1, 1] | |
inpaint_torso_colors = (inpaint_torso_colors * darken_scaler).reshape(-1, 3) # [Lm, 3] | |
# set color | |
torso_image[tuple(inpaint_torso_coords.T)] = inpaint_torso_colors | |
inpaint_torso_mask = np.zeros_like(torso_image[..., 0]).astype(bool) | |
inpaint_torso_mask[tuple(inpaint_torso_coords.T)] = True | |
else: | |
inpaint_torso_mask = None | |
# neck part "vertical" in-painting... | |
push_down = 4 | |
L = 48 + push_down + 1 | |
neck_part = binary_dilation(neck_part, structure=np.array([[0, 1, 0], [0, 1, 0], [0, 1, 0]], dtype=bool), iterations=3) | |
neck_coords = np.stack(np.nonzero(neck_part), axis=-1) # [M, 2] | |
# lexsort: sort 2D coords first by y then by x, | |
# ref: https://stackoverflow.com/questions/2706605/sorting-a-2d-numpy-array-by-multiple-axes | |
inds = np.lexsort((neck_coords[:, 0], neck_coords[:, 1])) | |
neck_coords = neck_coords[inds] | |
# choose the top pixel for each column | |
u, uid, ucnt = np.unique(neck_coords[:, 1], return_index=True, return_counts=True) | |
top_neck_coords = neck_coords[uid] # [m, 2] | |
# only keep top-is-head pixels | |
top_neck_coords_up = top_neck_coords.copy() - np.array([1, 0]) | |
mask = head_part[tuple(top_neck_coords_up.T)] | |
top_neck_coords = top_neck_coords[mask] | |
# push these top down for 4 pixels to make the neck inpainting more natural... | |
offset_down = np.minimum(ucnt[mask] - 1, push_down) | |
top_neck_coords += np.stack([offset_down, np.zeros_like(offset_down)], axis=-1) | |
# get the color | |
top_neck_colors = gt_image[tuple(top_neck_coords.T)] # [m, 3] | |
# construct inpaint coords (vertically up, or minus in x) | |
inpaint_neck_coords = top_neck_coords[None].repeat(L, 0) # [L, m, 2] | |
inpaint_offsets = np.stack([-np.arange(L), np.zeros(L, dtype=np.int32)], axis=-1)[:, None] # [L, 1, 2] | |
inpaint_neck_coords += inpaint_offsets | |
inpaint_neck_coords = inpaint_neck_coords.reshape(-1, 2) # [Lm, 2] | |
inpaint_neck_colors = top_neck_colors[None].repeat(L, 0) # [L, m, 3] | |
darken_scaler = 0.98 ** np.arange(L).reshape(L, 1, 1) # [L, 1, 1] | |
inpaint_neck_colors = (inpaint_neck_colors * darken_scaler).reshape(-1, 3) # [Lm, 3] | |
# set color | |
torso_image[tuple(inpaint_neck_coords.T)] = inpaint_neck_colors | |
# apply blurring to the inpaint area to avoid vertical-line artifects... | |
inpaint_mask = np.zeros_like(torso_image[..., 0]).astype(bool) | |
inpaint_mask[tuple(inpaint_neck_coords.T)] = True | |
blur_img = torso_image.copy() | |
blur_img = cv2.GaussianBlur(blur_img, (5, 5), cv2.BORDER_DEFAULT) | |
torso_image[inpaint_mask] = blur_img[inpaint_mask] | |
# set mask | |
mask = (neck_part | torso_part | inpaint_mask) | |
if inpaint_torso_mask is not None: | |
mask = mask | inpaint_torso_mask | |
torso_image[~mask] = 0 | |
torso_alpha[~mask] = 0 | |
cv2.imwrite("0.png", np.concatenate([torso_image, torso_alpha], axis=-1)) | |
print(f'[INFO] ===== extracted torso and gt images =====') | |
def out_exist_job(vid_name): | |
out_dir1 = vid_name.replace("/video/", "/inpaint_torso_imgs/").replace(".mp4","") | |
out_dir2 = vid_name.replace("/video/", "/inpaint_torso_with_bg_imgs/").replace(".mp4","") | |
out_dir3 = vid_name.replace("/video/", "/torso_imgs/").replace(".mp4","") | |
out_dir4 = vid_name.replace("/video/", "/torso_with_bg_imgs/").replace(".mp4","") | |
if os.path.exists(out_dir1) and os.path.exists(out_dir1) and os.path.exists(out_dir2) and os.path.exists(out_dir3) and os.path.exists(out_dir4): | |
num_frames = len(os.listdir(out_dir1)) | |
if len(os.listdir(out_dir1)) == num_frames and len(os.listdir(out_dir2)) == num_frames and len(os.listdir(out_dir3)) == num_frames and len(os.listdir(out_dir4)) == num_frames: | |
return None | |
else: | |
return vid_name | |
else: | |
return vid_name | |
def get_todo_vid_names(vid_names): | |
todo_vid_names = [] | |
for i, res in multiprocess_run_tqdm(out_exist_job, vid_names, num_workers=16): | |
if res is not None: | |
todo_vid_names.append(res) | |
return todo_vid_names | |
if __name__ == '__main__': | |
import argparse, glob, tqdm, random | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--vid_dir", default='/home/tiger/datasets/raw/CelebV-HQ/video') | |
parser.add_argument("--ds_name", default='CelebV-HQ') | |
parser.add_argument("--num_workers", default=48, type=int) | |
parser.add_argument("--seed", default=0, type=int) | |
parser.add_argument("--process_id", default=0, type=int) | |
parser.add_argument("--total_process", default=1, type=int) | |
parser.add_argument("--reset", action='store_true') | |
inpaint_torso_job('/home/tiger/datasets/raw/CelebV-HQ/video/dgdEr-mXQT4_8.mp4') | |
# args = parser.parse_args() | |
# vid_dir = args.vid_dir | |
# ds_name = args.ds_name | |
# if ds_name in ['lrs3_trainval']: | |
# mp4_name_pattern = os.path.join(vid_dir, "*/*.mp4") | |
# if ds_name in ['TH1KH_512', 'CelebV-HQ']: | |
# vid_names = glob.glob(os.path.join(vid_dir, "*.mp4")) | |
# elif ds_name in ['lrs2', 'lrs3', 'voxceleb2']: | |
# vid_name_pattern = os.path.join(vid_dir, "*/*/*.mp4") | |
# vid_names = glob.glob(vid_name_pattern) | |
# vid_names = sorted(vid_names) | |
# random.seed(args.seed) | |
# random.shuffle(vid_names) | |
# process_id = args.process_id | |
# total_process = args.total_process | |
# if total_process > 1: | |
# assert process_id <= total_process -1 | |
# num_samples_per_process = len(vid_names) // total_process | |
# if process_id == total_process: | |
# vid_names = vid_names[process_id * num_samples_per_process : ] | |
# else: | |
# vid_names = vid_names[process_id * num_samples_per_process : (process_id+1) * num_samples_per_process] | |
# if not args.reset: | |
# vid_names = get_todo_vid_names(vid_names) | |
# print(f"todo videos number: {len(vid_names)}") | |
# fn_args = [(vid_name,i,len(vid_names)) for i, vid_name in enumerate(vid_names)] | |
# for vid_name in multiprocess_run_tqdm(inpaint_torso_job ,fn_args, desc=f"Root process {args.process_id}: extracting segment images", num_workers=args.num_workers): | |
# pass |